跳转至

Message Response Service

controller.application.services.message_response_service

消息响应服务 - Application层 处理串口接收的数据,解码并更新状态服务

MessageResponseService

MessageResponseService(message_display: MessageDisplay, serial_domain_service: SerialDomainService, message_domain_service: MessageDomainService, robot_state_service: RobotStateDomainService, motion_runner: MotionRunner, motion_constructor: MotionConstructor, trajectory_planner: TrajectoryPlanningService, trajectory_repository: TrajectoryRepository)

Bases: BaseService

消息响应服务 - Application层。

处理串口接收的数据,解码并更新状态服务。

职责: 1. 接收串口数据 2. 拼接缓冲,检测完整帧 3. 解码消息 4. 更新状态服务(统一入口) 5. 根据操作模式分发处理(执行/保存/预览)

属性:

名称 类型 描述
get_current_position_signal pyqtSignal

获取当前位置信号(用于UI显示)。

trajectory_preview_signal pyqtSignal

轨迹预览数据信号。

初始化消息响应服务。

源代码位于: src/controller/controller/application/services/message_response_service.py
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
def __init__(
    self, 
    message_display: MessageDisplay,
    serial_domain_service: SerialDomainService,
    message_domain_service: MessageDomainService,
    robot_state_service: RobotStateDomainService,
    motion_runner: MotionRunner,
    motion_constructor: MotionConstructor,
    trajectory_planner: TrajectoryPlanningService,
    trajectory_repository: TrajectoryRepository
):
    """初始化消息响应服务。"""
    super().__init__(message_display)
    self.serial_domain_service = serial_domain_service
    self.message_domain_service = message_domain_service
    self.robot_state_service = robot_state_service
    self.motion_runner = motion_runner
    self.motion_constructor = motion_constructor
    self.trajectory_planner = trajectory_planner
    self.trajectory_repository = trajectory_repository

    self._connect_signals()
    self.message_buffer = ""

    # 操作处理器映射(策略模式)
    self._operation_handlers = {
        MotionOperationMode.EXECUTE: self._handle_execute,
        MotionOperationMode.SAVE: self._handle_save,
        MotionOperationMode.PREVIEW: self._handle_preview,
    }

handle_message

handle_message(message_in: str)

处理接收到的消息。

流程: 1. 拼接缓冲 2. 检测完整帧(AA55...0D0A) 3. 解码消息 4. 更新状态服务 5. 处理运动消息

参数:

名称 类型 描述 默认
message_in str

输入的原始串口数据。

必需
源代码位于: src/controller/controller/application/services/message_response_service.py
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
def handle_message(self, message_in: str):
    """处理接收到的消息。

    流程:
    1. 拼接缓冲
    2. 检测完整帧(AA55...0D0A)
    3. 解码消息
    4. 更新状态服务
    5. 处理运动消息

    Args:
        message_in (str): 输入的原始串口数据。
    """
    self.message_buffer += message_in
    if "0D0A" in self.message_buffer:
        if self.message_buffer.startswith("AA55") and len(self.message_buffer) < 120:
            return
        lines = self.message_buffer.rsplit("0D0A", 1)            
        self.message_buffer = lines[-1]
        command_line = lines[0]
        self._display_message(command_line + "0D0A", "接收")

        if command_line.startswith("AA55"):
            try:
                # 解码消息
                decoded_message = self.message_domain_service.decode_message(
                    command_line + "0D0A"
                )
                # 更新统一的状态服务(单一入口)
                self.robot_state_service.update_state(decoded_message)

                # 处理运动消息
                if decoded_message.control == 0x07 and decoded_message.mode == 0x08:
                    self.handle_motion_message(decoded_message.positions)

            except Exception as e:
                self._display_message(f"解码消息失败: {str(e)}", "错误")
        else:
            try:
                ascii_text = bytes.fromhex(command_line).decode('ascii', errors='replace')
                printable_text = ''.join(
                    c if c.isprintable() or c in '\n\r\t' 
                    else f'\\x{ord(c):02x}' 
                    for c in ascii_text
                )
                if printable_text.strip():
                    self._display_message(f"ASCII: {printable_text}", "接收")
            except Exception:
                pass

handle_motion_message

handle_motion_message(current_position)

处理运动消息(获取当前位置的回复)。

根据状态决定行为: 1. 有待处理的操作 → 调用对应的处理器 2. 无操作 → 仅发射信号供UI显示

参数:

名称 类型 描述 默认
current_position list[float]

当前关节位置(编码器值)。

必需
源代码位于: src/controller/controller/application/services/message_response_service.py
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
def handle_motion_message(self, current_position):
    """处理运动消息(获取当前位置的回复)。

    根据状态决定行为:
    1. 有待处理的操作 → 调用对应的处理器
    2. 无操作 → 仅发射信号供UI显示

    Args:
        current_position (list[float]): 当前关节位置(编码器值)。
    """
    if self.motion_constructor.has_pending_operation():
        # 获取操作模式
        mode = self.motion_constructor.get_operation_mode()

        # 从字典中获取对应的处理函数
        handler = self._operation_handlers.get(mode)

        if handler:
            # 调用处理函数
            handler(current_position)
        else:
            self._display_message(f"未知操作模式: {mode}", "错误")
            self.motion_constructor.clear_operation()
    else:
        # 无操作:仅发射信号供UI显示
        self.get_current_position_signal.emit(current_position)