跳转至

Command Hub Service

controller.application.services.command_hub_service

CommandHubService

CommandHubService(message_domain_service: MessageDomainService, motion_runner: MotionRunner, serial_domain_service: SerialDomainService, message_display: MessageDisplay, motion_constructor: MotionConstructor)

Bases: BaseService

命令中心服务 - Application层。

职责: 1. 分发用户命令到不同的处理逻辑 2. 协调运动任务的准备和执行 3. 管理消息显示

属性:

名称 类型 描述
message_domain_service

消息领域服务。

motion_runner

运动执行器。

serial_domain_service

串口领域服务。

motion_constructor

运动构造器。

初始化命令中心服务。

源代码位于: src/controller/controller/application/services/command_hub_service.py
28
29
30
31
32
33
34
35
36
37
38
39
def __init__(self, 
message_domain_service: MessageDomainService, 
motion_runner: MotionRunner,
serial_domain_service: SerialDomainService, 
message_display: MessageDisplay,
motion_constructor: MotionConstructor):
    """初始化命令中心服务。"""
    super().__init__(message_display)
    self.message_domain_service = message_domain_service
    self.motion_runner = motion_runner
    self.serial_domain_service = serial_domain_service
    self.motion_constructor = motion_constructor

single_send_command

single_send_command(**kwargs: Any) -> bool

发送单个命令。

参数:

名称 类型 描述 默认
**kwargs Any

命令参数,将被编码为消息帧。

{}

返回:

名称 类型 描述
bool bool

是否发送成功。

源代码位于: src/controller/controller/application/services/command_hub_service.py
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
def single_send_command(self, **kwargs: Any) -> bool:
    """发送单个命令。

    Args:
        **kwargs: 命令参数,将被编码为消息帧。

    Returns:
        bool: 是否发送成功。
    """
    try:
        msg = self.message_domain_service.encode_message(**kwargs)
        self._display_message(msg, "发送")
        success = self.serial_domain_service.send_data(msg)
        if not success:
            self._display_message("发送失败:串口未连接", "错误")
        return success
    except Exception as e:
        self._display_message(f"发送命令失败: {str(e)}", "错误")
        return False

command_distribution

command_distribution(config_dict: dict)

分发用户命令到对应的处理逻辑。

参数:

名称 类型 描述 默认
config_dict dict

配置字典,包含 control, mode 等参数。

必需
源代码位于: src/controller/controller/application/services/command_hub_service.py
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
def command_distribution(self, config_dict: dict):
    """分发用户命令到对应的处理逻辑。

    Args:
        config_dict (dict): 配置字典,包含 control, mode 等参数。
    """
    control = config_dict.get('control')
    mode = config_dict.get('mode')

    if control in [0x00, 0x01, 0x02, 0x03, 0x04]:
        self.single_send_command(**config_dict)
        self.get_current_position()

    elif control == 0x05:
        if mode == 0x0A:
            self._display_message("关闭示教模式", "动力学")
        self.single_send_command(**config_dict)
        self.motion_runner.stop_motion()

    elif control == 0x07:
        if mode == 0x0A:
            self._display_message("开启示教模式", "动力学")
        self.single_send_command(**config_dict)
        if mode != 0x0A:
            self.get_current_position()

    elif control == 0x06:
        if mode == 0x0A:
            torque_values = config_dict.get('torque')
            self._display_message(f"发送力矩: {torque_values}", "动力学")
            self.single_send_command(**config_dict)
        else:
            # 角度控制:默认使用 S 曲线和 0.01 频率
            target_angles = config_dict.get('target_angles')
            task = {
                "type": "motion",
                "target_angles": target_angles,
                "curve_type": "s_curve",
                "frequency": 0.01
            }
            self.motion_constructor.prepare_operation(
                MotionOperationMode.EXECUTE,
                [task]
            )
            self._display_message(f"准备运动到目标位置: {target_angles}", "控制")
            self.get_current_position()

    elif control == 0x08:
        self.single_send_command(**config_dict)
        self.motion_runner.stop_motion()

get_current_position

get_current_position()

请求获取当前位置。

源代码位于: src/controller/controller/application/services/command_hub_service.py
113
114
115
116
117
def get_current_position(self):
    """请求获取当前位置。"""
    self.message_display.clear_messages()
    self._display_message("正在获取当前位置...", "控制")
    self.single_send_command(control=0x07)

run_teach_record

run_teach_record(angles_list: list)

运行示教记录(播放示教轨迹)。

流程: 1. 准备示教运动任务 2. 获取当前位置 3. 异步回调时自动构建平滑轨迹并执行

参数:

名称 类型 描述 默认
angles_list list[list[float]]

示教记录的角度列表 [[θ1,...,θ6], ...]。

必需
源代码位于: src/controller/controller/application/services/command_hub_service.py
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
def run_teach_record(self, angles_list: list):
    """运行示教记录(播放示教轨迹)。

    流程:
    1. 准备示教运动任务
    2. 获取当前位置
    3. 异步回调时自动构建平滑轨迹并执行

    Args:
        angles_list (list[list[float]]): 示教记录的角度列表 [[θ1,...,θ6], ...]。
    """
    task = {
        "type": "teach",
        "teach_data": angles_list
    }
    self.motion_constructor.prepare_operation(
        MotionOperationMode.EXECUTE,
        [task]
    )
    self._display_message(f"准备播放示教轨迹,共 {len(angles_list)} 个点", "示教")
    self.get_current_position()

run_motion_sequence

run_motion_sequence()

开始执行运动序列(用于运动规划方案)。

流程: 1. 查询当前位置 2. 收到位置反馈后,MotionConstructor 自动规划所有任务

源代码位于: src/controller/controller/application/services/command_hub_service.py
141
142
143
144
145
146
147
148
149
def run_motion_sequence(self):
    """开始执行运动序列(用于运动规划方案)。

    流程:
    1. 查询当前位置
    2. 收到位置反馈后,MotionConstructor 自动规划所有任务
    """
    self._display_message("开始执行运动规划方案...", "运动规划")
    self.get_current_position()