跳转至

Serial Reader

controller.infrastructure.communication.serial_reader

串口读取器 - Infrastructure层 负责多线程读取串口数据

SerialReader

SerialReader(serial_port: Optional[Serial] = None)

Bases: QObject

串口读取处理类。

负责在独立线程中读取串口数据,支持多种编码格式。

属性:

名称 类型 描述
data_received pyqtSignal

接收到数据时发送信号,携带十六进制字符串。

初始化串口读取器。

参数:

名称 类型 描述 默认
serial_port Serial

串口对象。

None
源代码位于: src/controller/controller/infrastructure/communication/serial_reader.py
23
24
25
26
27
28
29
30
31
def __init__(self, serial_port: Optional[serial.Serial] = None):
    """初始化串口读取器。

    Args:
        serial_port (serial.Serial, optional): 串口对象。
    """
    super().__init__()
    self.serial_port = serial_port
    self.stop_flag = False

stop

stop() -> None

停止读取。

源代码位于: src/controller/controller/infrastructure/communication/serial_reader.py
35
36
37
def stop(self) -> None:
    """停止读取。"""
    self.stop_flag = True

read_data

read_data() -> None

读取串口数据 - 在独立线程中运行。

源代码位于: src/controller/controller/infrastructure/communication/serial_reader.py
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
def read_data(self) -> None:
    """读取串口数据 - 在独立线程中运行。"""
    self.stop_flag = False

    while not self.stop_flag:
        if not self.serial_port or not self.serial_port.is_open:
            time.sleep(0.1)
            continue

        try:
            waiting_bytes = self.serial_port.in_waiting
        except OSError:
            time.sleep(0.1)
            continue

        if waiting_bytes > 0:
            try:
                data = self.serial_port.read(waiting_bytes)

                # 固定使用十六进制格式
                hex_data = data.hex().upper()
                self.data_received.emit(hex_data)

            except serial.SerialException:
                time.sleep(0.1)
                continue
        else:
            time.sleep(0.01)

        time.sleep(0.01)