跳转至

Serial Writer

controller.infrastructure.communication.serial_writer

串口写入器 - Infrastructure层 负责多线程发送串口数据

SerialWriter

SerialWriter(serial_port: Optional[Serial] = None)

Bases: QObject

串口发送处理类。

负责在独立线程中发送串口数据,使用队列管理发送任务。

属性:

名称 类型 描述
serial_port Optional[Serial]

串口对象。

send_queue Queue

发送任务队列。

初始化串口写入器。

参数:

名称 类型 描述 默认
serial_port Serial

串口对象。

None
源代码位于: src/controller/controller/infrastructure/communication/serial_writer.py
22
23
24
25
26
27
28
29
30
31
def __init__(self, serial_port: Optional[serial.Serial] = None):
    """初始化串口写入器。

    Args:
        serial_port (serial.Serial, optional): 串口对象。
    """
    super().__init__()
    self.serial_port = serial_port
    self.send_queue = Queue()
    self.stop_flag = False

stop

stop() -> None

停止发送。

源代码位于: src/controller/controller/infrastructure/communication/serial_writer.py
33
34
35
def stop(self) -> None:
    """停止发送。"""
    self.stop_flag = True

add_to_queue

add_to_queue(cmd: Union[str, bytes]) -> None

添加命令到发送队列。

参数:

名称 类型 描述 默认
cmd Union[str, bytes]

待发送的命令(十六进制字符串或字节)。

必需
源代码位于: src/controller/controller/infrastructure/communication/serial_writer.py
37
38
39
40
41
42
43
def add_to_queue(self, cmd: Union[str, bytes]) -> None:
    """添加命令到发送队列。

    Args:
        cmd (Union[str, bytes]): 待发送的命令(十六进制字符串或字节)。
    """
    self.send_queue.put(cmd)

send_data

send_data() -> None

发送串口数据 - 在独立线程中运行。

源代码位于: src/controller/controller/infrastructure/communication/serial_writer.py
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
def send_data(self) -> None:
    """发送串口数据 - 在独立线程中运行。"""
    self.stop_flag = False

    while not self.stop_flag:
        try:
            if not self.serial_port or not self.serial_port.is_open:
                time.sleep(0.1)
                continue

            if not self.send_queue.empty():
                try:
                    cmd = self.send_queue.get()
                    if isinstance(cmd, str):
                        cmd = bytes.fromhex(cmd.replace(' ', ''))
                    self.serial_port.write(cmd)
                    self.serial_port.flush()
                    self.send_queue.task_done()
                except Exception:
                    # 静默处理错误,不发送信号
                    pass
            else:
                # 队列为空时短暂休眠,避免CPU空转
                time.sleep(0.01)
        except Exception:
            # 静默处理错误,不发送信号
            time.sleep(0.1)