跳转至

Domain Layer - Communication Services

controller.domain.services.communication.message_domain_service

MessageDomainService

MessageDomainService(message_encoder: MessageEncoder, message_decoder: MessageDecoder)

消息领域服务。

负责消息的编码和解码,作为 MessageEncoder 和 MessageDecoder 的门面。

属性:

名称 类型 描述
message_encoder MessageEncoder

消息编码器。

message_decoder MessageDecoder

消息解码器。

初始化消息领域服务。

参数:

名称 类型 描述 默认
message_encoder MessageEncoder

注入的消息编码器。

必需
message_decoder MessageDecoder

注入的消息解码器。

必需
源代码位于: src/controller/controller/domain/services/communication/message_domain_service.py
14
15
16
17
18
19
20
21
22
def __init__(self, message_encoder: MessageEncoder, message_decoder: MessageDecoder):
    """初始化消息领域服务。

    Args:
        message_encoder (MessageEncoder): 注入的消息编码器。
        message_decoder (MessageDecoder): 注入的消息解码器。
    """
    self.message_encoder = message_encoder
    self.message_decoder = message_decoder

encode_message

encode_message(**message: Any) -> str

编码消息。

参数:

名称 类型 描述 默认
**message Any

消息字段键值对。

{}

返回:

名称 类型 描述
str str

编码后的十六进制消息字符串。

源代码位于: src/controller/controller/domain/services/communication/message_domain_service.py
24
25
26
27
28
29
30
31
32
33
34
def encode_message(self, **message: Any) -> str:
    """编码消息。

    Args:
        **message: 消息字段键值对。

    Returns:
        str: 编码后的十六进制消息字符串。
    """
    m = self.message_encoder.create_message(**message)
    return self.message_encoder.interpret_message(m)

decode_message

decode_message(message: str) -> Any

解码消息。

参数:

名称 类型 描述 默认
message str

十六进制消息字符串。

必需

返回:

名称 类型 描述
Any Any

解码后的消息对象 (MessageIn)。

源代码位于: src/controller/controller/domain/services/communication/message_domain_service.py
36
37
38
39
40
41
42
43
44
45
def decode_message(self, message: str) -> Any:
    """解码消息。

    Args:
        message (str): 十六进制消息字符串。

    Returns:
        Any: 解码后的消息对象 (MessageIn)。
    """
    return self.message_decoder.decode_message(message)

controller.domain.services.communication.serial_domain_service

串口领域服务 - Domain层 提供串口连接、读写操作的核心业务逻辑 可被其他Domain服务复用

SerialReaderThread

SerialReaderThread(serial_reader)

Bases: QThread

串口读取线程

源代码位于: src/controller/controller/domain/services/communication/serial_domain_service.py
14
15
16
def __init__(self, serial_reader):
    super().__init__()
    self.serial_reader = serial_reader

run

run()

运行读取循环

源代码位于: src/controller/controller/domain/services/communication/serial_domain_service.py
18
19
20
21
def run(self):
    """运行读取循环"""
    if self.serial_reader:
        self.serial_reader.read_data()

SerialWriterThread

SerialWriterThread(serial_writer)

Bases: QThread

串口写入线程

源代码位于: src/controller/controller/domain/services/communication/serial_domain_service.py
27
28
29
def __init__(self, serial_writer):
    super().__init__()
    self.serial_writer = serial_writer

run

run()

运行写入循环

源代码位于: src/controller/controller/domain/services/communication/serial_domain_service.py
31
32
33
34
def run(self):
    """运行写入循环"""
    if self.serial_writer:
        self.serial_writer.send_data()

SerialDomainService

SerialDomainService()

Bases: QObject

串口领域服务 - 管理串口连接的核心业务逻辑

初始化串口领域服务

源代码位于: src/controller/controller/domain/services/communication/serial_domain_service.py
44
45
46
47
48
49
50
51
52
53
54
55
def __init__(self):
    """初始化串口领域服务"""
    super().__init__()

    # 串口相关对象
    self.serial_adapter = SerialAdapter()
    self.serial_reader: Optional[SerialReader] = None
    self.serial_writer: Optional[SerialWriter] = None

    # 线程管理
    self.reader_thread: Optional[SerialReaderThread] = None
    self.writer_thread: Optional[SerialWriterThread] = None

scan_available_ports

scan_available_ports() -> List[str]

扫描可用端口列表

返回

List[str]: 可用端口名称列表

源代码位于: src/controller/controller/domain/services/communication/serial_domain_service.py
57
58
59
60
61
62
63
64
65
66
67
def scan_available_ports(self) -> List[str]:
    """
    扫描可用端口列表

    返回:
        List[str]: 可用端口名称列表
    """
    try:
        return PortScanner.scan_ports()
    except Exception as e:
        raise Exception(f"扫描端口失败: {str(e)}")

get_port_info

get_port_info(port_name: str) -> Optional[Dict[str, str]]

获取指定端口信息

参数

port_name: 端口名称

返回

Optional[Dict[str, str]]: 端口信息字典,失败时返回None

源代码位于: src/controller/controller/domain/services/communication/serial_domain_service.py
69
70
71
72
73
74
75
76
77
78
79
80
81
82
def get_port_info(self, port_name: str) -> Optional[Dict[str, str]]:
    """
    获取指定端口信息

    参数:
        port_name: 端口名称

    返回:
        Optional[Dict[str, str]]: 端口信息字典,失败时返回None
    """
    try:
        return PortScanner.get_port_info(port_name)
    except Exception as e:
        raise Exception(f"获取端口信息失败: {str(e)}")

get_current_port

get_current_port() -> Optional[str]

获取当前连接的端口名称

返回

Optional[str]: 端口名称,未连接时返回None

源代码位于: src/controller/controller/domain/services/communication/serial_domain_service.py
84
85
86
87
88
89
90
91
def get_current_port(self) -> Optional[str]:
    """
    获取当前连接的端口名称

    返回:
        Optional[str]: 端口名称,未连接时返回None
    """
    return self.serial_adapter.get_port_name()

is_connected

is_connected() -> bool

检查串口是否已连接

返回

bool: 是否已连接

源代码位于: src/controller/controller/domain/services/communication/serial_domain_service.py
 93
 94
 95
 96
 97
 98
 99
100
def is_connected(self) -> bool:
    """
    检查串口是否已连接

    返回:
        bool: 是否已连接
    """
    return self.serial_adapter.is_connected()

connect_port

connect_port(port: str, config: Dict[str, Any]) -> bool

连接串口

参数

port: 串口名称 config: 串口配置参数

返回

bool: 连接是否成功

源代码位于: src/controller/controller/domain/services/communication/serial_domain_service.py
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
def connect_port(self, port: str, config: Dict[str, Any]) -> bool:
    """
    连接串口

    参数:
        port: 串口名称
        config: 串口配置参数

    返回:
        bool: 连接是否成功
    """
    try:
        # 如果已连接,先断开
        if self.is_connected():
            self.disconnect_port()

        # 尝试连接串口
        if not self.serial_adapter.connect(port, config):
            raise Exception(f"连接端口 {port} 失败")

        # 获取串口对象
        serial_port = self.serial_adapter.get_serial_port()
        if not serial_port:
            raise Exception("获取串口对象失败")

        # 创建读写器
        self.serial_reader = SerialReader(serial_port)
        self.serial_writer = SerialWriter(serial_port)

        # 连接Infrastructure层信号到Domain层
        self._connect_infrastructure_signals()

        # 启动线程
        self._start_threads()

        # 发送连接成功信号
        self.connection_status_changed.emit(True)
        return True

    except Exception as e:
        self.disconnect_port()
        raise Exception(f"连接失败: {str(e)}")

disconnect_port

disconnect_port() -> bool

断开串口连接

返回

bool: 断开是否成功

源代码位于: src/controller/controller/domain/services/communication/serial_domain_service.py
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
def disconnect_port(self) -> bool:
    """
    断开串口连接

    返回:
        bool: 断开是否成功
    """
    try:
        # 停止线程
        self._stop_threads()

        # 断开信号连接
        self._disconnect_infrastructure_signals()

        # 停止读写器
        if self.serial_reader:
            self.serial_reader.stop()
            self.serial_reader = None

        if self.serial_writer:
            self.serial_writer.stop()
            self.serial_writer = None

        # 断开适配器连接
        success = self.serial_adapter.disconnect()

        # 发送断开连接信号
        self.connection_status_changed.emit(False)
        return success

    except Exception as e:
        raise Exception(f"断开连接失败: {str(e)}")

send_data

send_data(data: str) -> bool

发送数据

参数

data: 要发送的数据

返回

bool: 是否成功添加到发送队列

源代码位于: src/controller/controller/domain/services/communication/serial_domain_service.py
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
def send_data(self, data: str) -> bool:
    """
    发送数据

    参数:
        data: 要发送的数据

    返回:
        bool: 是否成功添加到发送队列
    """
    if self.serial_writer and self.is_connected():
        self.serial_writer.add_to_queue(data)
        return True
    else:
        raise Exception("串口未连接,无法发送数据")

cleanup

cleanup() -> None

清理资源

源代码位于: src/controller/controller/domain/services/communication/serial_domain_service.py
242
243
244
245
246
247
def cleanup(self) -> None:
    """清理资源"""
    try:
        self.disconnect_port()
    except Exception:
        pass  # 忽略清理时的异常