跳转至

Domain Layer - Motion Services

controller.domain.services.motion.motion_constructor

运动构造器 - Domain层 协调轨迹规划和运动执行

MotionConstructor

MotionConstructor(motion_runner: MotionRunner, trajectory_planner: TrajectoryPlanningService)

运动构造器 - 领域服务。

职责: 1. 管理操作状态(执行/保存/预览) 2. 协调 TrajectoryPlanningService(规划) 3. 协调 MotionRunner(执行)

属性:

名称 类型 描述
motion_runner MotionRunner

运动执行器。

trajectory_planner TrajectoryPlanningService

轨迹规划服务。

初始化运动构造器。

参数:

名称 类型 描述 默认
motion_runner MotionRunner

运动执行器。

必需
trajectory_planner TrajectoryPlanningService

轨迹规划服务。

必需
源代码位于: src/controller/controller/domain/services/motion/motion_constructor.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
def __init__(
    self, 
    motion_runner: MotionRunner,
    trajectory_planner: TrajectoryPlanningService
):
    """初始化运动构造器。

    Args:
        motion_runner (MotionRunner): 运动执行器。
        trajectory_planner (TrajectoryPlanningService): 轨迹规划服务。
    """
    self.motion_runner = motion_runner
    self.trajectory_planner = trajectory_planner

    # 统一的操作状态
    self._operation_mode = MotionOperationMode.NONE
    self._pending_tasks = []
    self._operation_context = {}

prepare_operation

prepare_operation(mode: MotionOperationMode, tasks: List[Dict], context: Dict = None)

准备操作(统一入口)。

参数:

名称 类型 描述 默认
mode MotionOperationMode

操作模式。 - EXECUTE: 执行运动 - SAVE: 保存轨迹 - PREVIEW: 预览轨迹

必需
tasks List[Dict]

任务列表。

必需
context Dict

操作上下文。 - EXECUTE: {} - SAVE: {"filename": "xxx", "type": "node|plan"} - PREVIEW: {"type": "node|plan", "node_index": 0}

None
源代码位于: src/controller/controller/domain/services/motion/motion_constructor.py
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
def prepare_operation(
    self,
    mode: MotionOperationMode,
    tasks: List[Dict],
    context: Dict = None
):
    """准备操作(统一入口)。

    Args:
        mode (MotionOperationMode): 操作模式。
            - EXECUTE: 执行运动
            - SAVE: 保存轨迹
            - PREVIEW: 预览轨迹
        tasks (List[Dict]): 任务列表。
        context (Dict, optional): 操作上下文。
            - EXECUTE: {}
            - SAVE: {"filename": "xxx", "type": "node|plan"}
            - PREVIEW: {"type": "node|plan", "node_index": 0}
    """
    self._operation_mode = mode
    self._pending_tasks = tasks.copy()
    self._operation_context = context or {}

has_pending_operation

has_pending_operation() -> bool

检查是否有待处理的操作。

返回:

名称 类型 描述
bool bool

True=有待处理的操作, False=无操作。

源代码位于: src/controller/controller/domain/services/motion/motion_constructor.py
66
67
68
69
70
71
72
def has_pending_operation(self) -> bool:
    """检查是否有待处理的操作。

    Returns:
        bool: True=有待处理的操作, False=无操作。
    """
    return self._operation_mode != MotionOperationMode.NONE

get_operation_mode

get_operation_mode() -> MotionOperationMode

获取当前操作模式。

返回:

名称 类型 描述
MotionOperationMode MotionOperationMode

当前操作模式枚举。

源代码位于: src/controller/controller/domain/services/motion/motion_constructor.py
74
75
76
77
78
79
80
def get_operation_mode(self) -> MotionOperationMode:
    """获取当前操作模式。

    Returns:
        MotionOperationMode: 当前操作模式枚举。
    """
    return self._operation_mode

get_pending_tasks

get_pending_tasks() -> List[Dict]

获取待处理的任务列表。

返回:

类型 描述
List[Dict]

List[Dict]: 任务列表的副本。

源代码位于: src/controller/controller/domain/services/motion/motion_constructor.py
82
83
84
85
86
87
88
def get_pending_tasks(self) -> List[Dict]:
    """获取待处理的任务列表。

    Returns:
        List[Dict]: 任务列表的副本。
    """
    return self._pending_tasks.copy()

get_operation_context

get_operation_context() -> Dict

获取操作上下文。

返回:

名称 类型 描述
Dict Dict

上下文字典的副本。

源代码位于: src/controller/controller/domain/services/motion/motion_constructor.py
90
91
92
93
94
95
96
def get_operation_context(self) -> Dict:
    """获取操作上下文。

    Returns:
        Dict: 上下文字典的副本。
    """
    return self._operation_context.copy()

clear_operation

clear_operation()

清除操作状态。

源代码位于: src/controller/controller/domain/services/motion/motion_constructor.py
 98
 99
100
101
102
def clear_operation(self):
    """清除操作状态。"""
    self._operation_mode = MotionOperationMode.NONE
    self._pending_tasks.clear()
    self._operation_context.clear()

execute_motion

execute_motion(start_position: List[float])

执行运动(仅用于EXECUTE模式)。

流程: 1. 按任务顺序规划每个任务 2. 按顺序添加运动数据和夹爪命令 3. 清除状态

参数:

名称 类型 描述 默认
start_position List[float]

起始位置(当前关节角度)。

必需

引发:

类型 描述
RuntimeError

如果当前模式不是EXECUTE。

源代码位于: src/controller/controller/domain/services/motion/motion_constructor.py
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
def execute_motion(self, start_position: List[float]):
    """执行运动(仅用于EXECUTE模式)。

    流程:
    1. 按任务顺序规划每个任务
    2. 按顺序添加运动数据和夹爪命令
    3. 清除状态

    Args:
        start_position (List[float]): 起始位置(当前关节角度)。

    Raises:
        RuntimeError: 如果当前模式不是EXECUTE。
    """
    if self._operation_mode != MotionOperationMode.EXECUTE:
        raise RuntimeError(f"当前模式不是EXECUTE: {self._operation_mode}")

    if not self._pending_tasks:
        return

    # 执行
    self.motion_runner.clear_data()

    # 按顺序处理每个任务,保持原有顺序
    current_position = start_position

    for task in self._pending_tasks:
        if task["type"] == "gripper":
            # 夹爪任务:直接添加夹爪命令(带延迟)
            effector_mode = task["effector_mode"]
            effector_data = task["effector_data"]
            pre_delay = task.get("pre_delay", 0.0)
            post_delay = task.get("post_delay", 1.0)
            self.motion_runner.add_gripper_data(
                effector_mode, 
                effector_data,
                pre_delay=pre_delay,
                post_delay=post_delay
            )
        else:
            # 运动任务:规划轨迹并添加
            positions, end_position = self.trajectory_planner._plan_single_task(
                task, 
                current_position
            )
            # 注意:positions 可能是 numpy 数组,不能直接用 if positions
            if len(positions) > 0:
                self.motion_runner.add_motion_data(positions)
                current_position = end_position

    self.motion_runner.start_motion()

    # 清理
    self.clear_operation()

controller.domain.services.motion.motion_runner

MotionRunner

MotionRunner(serial_domain_service: SerialDomainService, message_domain_service: MessageDomainService)

Bases: QObject

运动执行器。

负责按时间步长定时发送运动和夹爪指令。

属性:

名称 类型 描述
motion_msg_signal pyqtSignal

运动消息发送信号,携带 (消息内容, 类型)。

serial_domain_service

串口领域服务。

message_domain_service

消息领域服务。

data_list list

待发送的数据列表。

data_index int

当前发送索引。

timer QTimer

定时器。

初始化运动执行器。

源代码位于: src/controller/controller/domain/services/motion/motion_runner.py
29
30
31
32
33
34
35
36
37
38
39
40
41
def __init__(
    self, 
    serial_domain_service: SerialDomainService, 
    message_domain_service: MessageDomainService):
    """初始化运动执行器。"""
    super().__init__()
    self.serial_domain_service = serial_domain_service
    self.message_domain_service = message_domain_service
    self.data_list = []
    self.data_index = 0
    self.timer = QTimer()
    self.timer.setInterval(10)
    self.timer.timeout.connect(self.motion_timeout)

add_motion_data

add_motion_data(positions)

添加运动数据点序列。

参数:

名称 类型 描述 默认
positions list | ndarray

关节角度列表序列。

必需
源代码位于: src/controller/controller/domain/services/motion/motion_runner.py
43
44
45
46
47
48
49
50
51
52
53
54
55
56
def add_motion_data(self, positions):
    """添加运动数据点序列。

    Args:
        positions (list | np.ndarray): 关节角度列表序列。
    """
    for position in positions:
        msg_dict = {
            'control': 0x06,
            'mode': 0x08,
            'joint_angles': list(position)
        }
        message = self.message_domain_service.encode_message(**msg_dict)
        self.data_list.append(message)

add_gripper_data

add_gripper_data(effector_mode, effector_data, pre_delay=0.0, post_delay=1.0)

添加夹爪命令。

参数:

名称 类型 描述 默认
effector_mode int

夹爪模式。

必需
effector_data float

夹爪参数。

必需
pre_delay float

前置等待时间(秒). Defaults to 0.0.

0.0
post_delay float

后置等待时间(秒). Defaults to 1.0.

1.0
源代码位于: src/controller/controller/domain/services/motion/motion_runner.py
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
def add_gripper_data(self, effector_mode, effector_data, pre_delay=0.0, post_delay=1.0):
    """添加夹爪命令。

    Args:
        effector_mode (int): 夹爪模式。
        effector_data (float): 夹爪参数。
        pre_delay (float, optional): 前置等待时间(秒). Defaults to 0.0.
        post_delay (float, optional): 后置等待时间(秒). Defaults to 1.0.
    """
    # 添加前置延迟(如果>0)
    if pre_delay > 0:
        self.data_list.append(DelayCommand(delay_s=pre_delay))

    # 添加夹爪命令
    msg_dict = {
        'control': 0x00,
        'mode': 0x08,
        'effector_mode': effector_mode,
        'effector_data': effector_data
    }
    message = self.message_domain_service.encode_message(**msg_dict)
    self.data_list.append(message)

    # 添加后置延迟(如果>0)
    if post_delay > 0:
        self.data_list.append(DelayCommand(delay_s=post_delay))

start_motion

start_motion()

开始执行运动序列。

源代码位于: src/controller/controller/domain/services/motion/motion_runner.py
85
86
87
88
def start_motion(self):
    """开始执行运动序列。"""
    self.data_index = 0
    self.timer.start()

stop_motion

stop_motion()

停止执行运动。

源代码位于: src/controller/controller/domain/services/motion/motion_runner.py
90
91
92
93
def stop_motion(self):
    """停止执行运动。"""
    self.data_index = 0
    self.timer.stop()

clear_data

clear_data()

清空所有待发送数据。

源代码位于: src/controller/controller/domain/services/motion/motion_runner.py
95
96
97
98
def clear_data(self):
    """清空所有待发送数据。"""
    self.data_index = 0
    self.data_list = []

motion_timeout

motion_timeout()

定时器回调函数,负责单步发送或延迟等待。

源代码位于: src/controller/controller/domain/services/motion/motion_runner.py
100
101
102
103
104
105
106
107
108
109
110
111
def motion_timeout(self):
    """定时器回调函数,负责单步发送或延迟等待。"""
    if self.data_index >= len(self.data_list):
        self.timer.stop()
        return
    current_item = self.data_list[self.data_index]
    if hasattr(current_item, 'is_delay'):
        time.sleep(current_item.delay_s)
    else:
        self.serial_domain_service.send_data(current_item)
        self.motion_msg_signal.emit(current_item, "发送")
    self.data_index += 1

controller.domain.services.motion.trajectory_planning_service

轨迹规划服务 - Domain层 纯粹的轨迹规划逻辑,不涉及执行或持久化

TrajectoryPlanningService

TrajectoryPlanningService(s_curve: SCurve, smooth_service: SmoothDomainService, linear_motion_service: LinearMotionDomainService, curve_motion_service: CurveMotionDomainService)

轨迹规划服务 - 领域服务。

职责: 1. 根据任务类型规划轨迹点序列 2. 维护位置状态(起点->终点) 3. 返回纯数据,不执行任何副作用

核心原则:纯函数,可测试,可复用

属性:

名称 类型 描述
s_curve SCurve

S曲线算法服务。

smooth_service SmoothDomainService

平滑算法服务。

linear_motion_service LinearMotionDomainService

直线运动服务。

curve_motion_service CurveMotionDomainService

曲线运动服务。

初始化轨迹规划服务。

参数:

名称 类型 描述 默认
s_curve SCurve

S曲线算法服务。

必需
smooth_service SmoothDomainService

平滑算法服务。

必需
linear_motion_service LinearMotionDomainService

直线运动服务。

必需
curve_motion_service CurveMotionDomainService

曲线运动服务。

必需
源代码位于: src/controller/controller/domain/services/motion/trajectory_planning_service.py
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
def __init__(
    self,
    s_curve: SCurve,
    smooth_service: SmoothDomainService,
    linear_motion_service: LinearMotionDomainService,
    curve_motion_service: CurveMotionDomainService
):
    """初始化轨迹规划服务。

    Args:
        s_curve (SCurve): S曲线算法服务。
        smooth_service (SmoothDomainService): 平滑算法服务。
        linear_motion_service (LinearMotionDomainService): 直线运动服务。
        curve_motion_service (CurveMotionDomainService): 曲线运动服务。
    """
    self.s_curve = s_curve
    self.smooth_service = smooth_service
    self.linear_motion_service = linear_motion_service
    self.curve_motion_service = curve_motion_service

plan_task_sequence

plan_task_sequence(tasks: List[Dict], start_position: List[float]) -> List[List[float]]

规划任务序列(仅返回位置)。

用于:执行运动、保存轨迹。

参数:

名称 类型 描述 默认
tasks List[Dict]

任务列表。

必需
start_position List[float]

起始位置。

必需

返回:

类型 描述
List[List[float]]

List[List[float]]: 轨迹点序列 [[q1,...,q6], ...]。

源代码位于: src/controller/controller/domain/services/motion/trajectory_planning_service.py
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
def plan_task_sequence(
    self,
    tasks: List[Dict],
    start_position: List[float]
) -> List[List[float]]:
    """规划任务序列(仅返回位置)。

    用于:执行运动、保存轨迹。

    Args:
        tasks (List[Dict]): 任务列表。
        start_position (List[float]): 起始位置。

    Returns:
        List[List[float]]: 轨迹点序列 [[q1,...,q6], ...]。
    """
    all_positions = []
    current_position = start_position

    for task in tasks:
        positions, end_position = self._plan_single_task(task, current_position)
        all_positions.extend(positions)
        current_position = end_position

    return all_positions

plan_task_sequence_with_derivatives

plan_task_sequence_with_derivatives(tasks: List[Dict], start_position: List[float]) -> Dict

规划任务序列(返回位置、速度、加速度、时间)。

用于:预览轨迹曲线。

参数:

名称 类型 描述 默认
tasks List[Dict]

任务列表。

必需
start_position List[float]

起始位置。

必需

返回:

名称 类型 描述
Dict Dict

包含以下键的字典: - time: [t0, t1, ...] - positions: [[q1,...,q6], ...] - velocities: [[qd1,...,qd6], ...] - accelerations: [[qdd1,...,qdd6], ...]

源代码位于: src/controller/controller/domain/services/motion/trajectory_planning_service.py
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
def plan_task_sequence_with_derivatives(
    self,
    tasks: List[Dict],
    start_position: List[float]
) -> Dict:
    """规划任务序列(返回位置、速度、加速度、时间)。

    用于:预览轨迹曲线。

    Args:
        tasks (List[Dict]): 任务列表。
        start_position (List[float]): 起始位置。

    Returns:
        Dict: 包含以下键的字典:
            - time: [t0, t1, ...]
            - positions: [[q1,...,q6], ...]
            - velocities: [[qd1,...,qd6], ...]
            - accelerations: [[qdd1,...,qdd6], ...]
    """
    all_time = []
    all_positions = []
    all_velocities = []
    all_accelerations = []

    current_position = start_position
    current_time = 0.0

    for task in tasks:
        result = self._plan_single_task_full(task, current_position)

        # 合并数据(时间累加)
        all_time.extend([t + current_time for t in result["time"]])
        all_positions.extend(result["positions"])
        all_velocities.extend(result["velocities"])
        all_accelerations.extend(result["accelerations"])

        # 更新状态
        if len(result["positions"]) > 0:
            current_position = result["positions"][-1]
            if len(result["time"]) > 0:
                current_time = all_time[-1]

    return {
        "time": all_time,
        "positions": all_positions,
        "velocities": all_velocities,
        "accelerations": all_accelerations
    }