跳转至

State Domain Services

controller.domain.services.state.robot_state_domain_service

机械臂状态领域服务 - Domain层 统一管理机械臂运行时状态

RobotStateDomainService

RobotStateDomainService()

Bases: QObject

机械臂状态领域服务 - 单一数据源。

职责: 1. 维护当前机械臂状态(最新快照) 2. 维护历史状态缓冲(滑动窗口) 3. 计算静摩擦补偿(基于历史数据) 4. 发射信号通知订阅者

设计原则: - Single Source of Truth: 所有状态只在这里维护 - Pub-Sub Pattern: 支持多个订阅者 - Thread Safe: 使用锁保护可变数据

属性:

名称 类型 描述
state_updated pyqtSignal

状态更新信号,携带 RobotStateSnapshot。

angles_changed pyqtSignal

角度显著变化信号,携带 [θ1, θ2, ..., θ6] 列表。

torque_compensation_requested pyqtSignal

力矩补偿请求信号,携带当前角度列表。

update_state

update_state(decoded_message)

更新机械臂状态(核心方法)。

由 MessageResponseService 调用,每次接收到串口数据并解码后调用。

参数:

名称 类型 描述 默认
decoded_message Any

解码后的消息对象。

必需

get_current_state

get_current_state() -> Optional[RobotStateSnapshot]

获取当前状态快照(不可变,线程安全)。

返回:

类型 描述
Optional[RobotStateSnapshot]

Optional[RobotStateSnapshot]: 当前状态快照。

get_current_angles

get_current_angles() -> List[float]

获取当前关节角度(弧度)。

返回:

类型 描述
List[float]

List[float]: 6个关节的角度列表。

get_current_positions

get_current_positions() -> List[int]

获取当前关节位置(编码器值)。

返回:

类型 描述
List[int]

List[int]: 6个关节的编码器位置列表。

get_position_history

get_position_history(count: int = 20) -> List[Dict]

获取历史位置数据。

参数:

名称 类型 描述 默认
count int

获取的记录数量. Defaults to 20.

20

返回:

类型 描述
List[Dict]

List[Dict]: 历史位置数据列表,每个元素包含 timestamp, angles, positions。

get_friction_compensation

get_friction_compensation() -> List[float]

获取当前静摩擦补偿值。

返回:

类型 描述
List[float]

List[float]: 6个关节的静摩擦补偿值列表。

set_teaching_mode

set_teaching_mode(enabled: bool)

设置示教模式。

参数:

名称 类型 描述 默认
enabled bool

True=开启, False=关闭。

必需

set_friction_config

set_friction_config(config: List[float])

设置静摩擦配置。

参数:

名称 类型 描述 默认
config List[float]

6个关节的静摩擦阈值列表。

必需

clear_history

clear_history()

清空历史数据。

controller.domain.services.state.teach_record_domain_service

示教记录领域服务

TeachRecordDomainService

TeachRecordDomainService()

Bases: QObject

示教记录领域服务 - Domain层。

职责: 1. 管理示教记录数据(内存中) 2. 控制记录的开始/停止 3. 提供记录的增删改查操作 4. 生成记录名称

属性:

名称 类型 描述
recording_state_changed pyqtSignal

记录状态变化信号,携带是否正在记录。

record_added pyqtSignal

记录添加信号,携带新增的记录名称。

record_deleted pyqtSignal

记录删除信号,携带删除的记录名称。

初始化示教记录服务。

源代码位于: src/controller/controller/domain/services/state/teach_record_domain_service.py
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
def __init__(self):
    """初始化示教记录服务。"""
    super().__init__()

    # 记录数据 {记录名: 角度列表}
    self._records: Dict[str, List[List[float]]] = {}

    # 当前记录状态
    self._is_recording = False
    self._current_record_angles: List[List[float]] = []
    self._record_counter = 0

    # 记录定时器(10ms间隔,100Hz采样率)
    self._record_timer = QTimer()
    self._record_timer.setInterval(10)
    self._record_timer.timeout.connect(self._on_record_timer_timeout)

start_recording

start_recording()

开始记录。

源代码位于: src/controller/controller/domain/services/state/teach_record_domain_service.py
49
50
51
52
53
54
55
56
def start_recording(self):
    """开始记录。"""
    if not self._is_recording:
        self._is_recording = True
        self._current_record_angles = []
        self._record_counter = 0
        self._record_timer.start()
        self.recording_state_changed.emit(True)

stop_recording

stop_recording() -> Optional[str]

停止记录并保存。

返回:

类型 描述
Optional[str]

Optional[str]: 新生成的记录名称,如果没有数据则返回 None。

源代码位于: src/controller/controller/domain/services/state/teach_record_domain_service.py
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
def stop_recording(self) -> Optional[str]:
    """停止记录并保存。

    Returns:
        Optional[str]: 新生成的记录名称,如果没有数据则返回 None。
    """
    if self._is_recording:
        self._is_recording = False
        self._record_timer.stop()
        self.recording_state_changed.emit(False)

        # 保存当前记录
        if self._current_record_angles:
            record_name = self._generate_record_name()
            self._records[record_name] = self._current_record_angles.copy()
            self.record_added.emit(record_name)
            return record_name
        else:
            return None
    return None

is_recording

is_recording() -> bool

查询是否正在记录。

返回:

名称 类型 描述
bool bool

是否正在记录。

源代码位于: src/controller/controller/domain/services/state/teach_record_domain_service.py
79
80
81
82
83
84
85
def is_recording(self) -> bool:
    """查询是否正在记录。

    Returns:
        bool: 是否正在记录。
    """
    return self._is_recording

add_angle_to_current_record

add_angle_to_current_record(angles: List[float])

添加角度到当前记录。

参数:

名称 类型 描述 默认
angles List[float]

6维关节角度。

必需
源代码位于: src/controller/controller/domain/services/state/teach_record_domain_service.py
87
88
89
90
91
92
93
94
95
def add_angle_to_current_record(self, angles: List[float]):
    """添加角度到当前记录。

    Args:
        angles (List[float]): 6维关节角度。
    """
    if self._is_recording and len(angles) == 6:
        self._current_record_angles.append(angles.copy())
        self._record_counter += 1

get_record_timer

get_record_timer() -> QTimer

获取记录定时器(用于外部连接)。

返回:

名称 类型 描述
QTimer QTimer

记录定时器。

源代码位于: src/controller/controller/domain/services/state/teach_record_domain_service.py
104
105
106
107
108
109
110
def get_record_timer(self) -> QTimer:
    """获取记录定时器(用于外部连接)。

    Returns:
        QTimer: 记录定时器。
    """
    return self._record_timer

get_all_records

get_all_records() -> Dict[str, List[List[float]]]

获取所有记录。

返回:

类型 描述
Dict[str, List[List[float]]]

Dict[str, List[List[float]]]: 所有记录字典。

源代码位于: src/controller/controller/domain/services/state/teach_record_domain_service.py
116
117
118
119
120
121
122
def get_all_records(self) -> Dict[str, List[List[float]]]:
    """获取所有记录。

    Returns:
        Dict[str, List[List[float]]]: 所有记录字典。
    """
    return self._records.copy()

get_record_names

get_record_names() -> List[str]

获取所有记录名称。

返回:

类型 描述
List[str]

List[str]: 记录名称列表。

源代码位于: src/controller/controller/domain/services/state/teach_record_domain_service.py
124
125
126
127
128
129
130
def get_record_names(self) -> List[str]:
    """获取所有记录名称。

    Returns:
        List[str]: 记录名称列表。
    """
    return list(self._records.keys())

get_record

get_record(name: str) -> Optional[List[List[float]]]

获取指定记录。

参数:

名称 类型 描述 默认
name str

记录名称。

必需

返回:

类型 描述
Optional[List[List[float]]]

Optional[List[List[float]]]: 角度列表,如果不存在返回 None。

源代码位于: src/controller/controller/domain/services/state/teach_record_domain_service.py
132
133
134
135
136
137
138
139
140
141
def get_record(self, name: str) -> Optional[List[List[float]]]:
    """获取指定记录。

    Args:
        name (str): 记录名称。

    Returns:
        Optional[List[List[float]]]: 角度列表,如果不存在返回 None。
    """
    return self._records.get(name)

delete_record

delete_record(name: str) -> bool

删除指定记录。

参数:

名称 类型 描述 默认
name str

记录名称。

必需

返回:

名称 类型 描述
bool bool

是否删除成功。

源代码位于: src/controller/controller/domain/services/state/teach_record_domain_service.py
143
144
145
146
147
148
149
150
151
152
153
154
155
156
def delete_record(self, name: str) -> bool:
    """删除指定记录。

    Args:
        name (str): 记录名称。

    Returns:
        bool: 是否删除成功。
    """
    if name in self._records:
        del self._records[name]
        self.record_deleted.emit(name)
        return True
    return False

reverse_record

reverse_record(name: str) -> Optional[str]

反转指定记录并另存为新记录。

参数:

名称 类型 描述 默认
name str

原记录名称。

必需

返回:

类型 描述
Optional[str]

Optional[str]: 新记录名称,如果失败返回 None。

源代码位于: src/controller/controller/domain/services/state/teach_record_domain_service.py
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
def reverse_record(self, name: str) -> Optional[str]:
    """反转指定记录并另存为新记录。

    Args:
        name (str): 原记录名称。

    Returns:
        Optional[str]: 新记录名称,如果失败返回 None。
    """
    if name not in self._records:
        return None

    try:
        # 反转角度序列
        original_list = self._records[name]
        reversed_list = list(reversed(original_list))

        # 生成新名称:原名-反转(如已存在则追加序号)
        base_name = f"{name}-反转"
        new_name = base_name
        suffix = 2
        while new_name in self._records:
            new_name = f"{base_name}-{suffix}"
            suffix += 1

        # 保存新记录
        self._records[new_name] = reversed_list
        self.record_added.emit(new_name)
        return new_name
    except Exception as e:
        return None

load_records

load_records(records: Dict[str, List[List[float]]])

加载记录数据(从Repository)。

参数:

名称 类型 描述 默认
records Dict[str, List[List[float]]]

记录数据字典。

必需
源代码位于: src/controller/controller/domain/services/state/teach_record_domain_service.py
190
191
192
193
194
195
196
def load_records(self, records: Dict[str, List[List[float]]]):
    """加载记录数据(从Repository)。

    Args:
        records (Dict[str, List[List[float]]]): 记录数据字典。
    """
    self._records = records.copy()

clear_all_records

clear_all_records()

清空所有记录。

源代码位于: src/controller/controller/domain/services/state/teach_record_domain_service.py
198
199
200
def clear_all_records(self):
    """清空所有记录。"""
    self._records.clear()