跳转至

Domain Layer - Utilities

controller.domain.utils.message_decoder

MessageDecoder

MessageDecoder(config_path='controller/config/message_decoder_config.yaml')

消息解码器。

基于 YAML 配置文件动态解析十六进制消息。

属性:

名称 类型 描述
config dict

加载的配置字典。

robot_utils RobotUtils

工具类实例。

parsers Dict[str, BaseParser]

已注册的解析器字典。

MessageIn type

动态创建的消息数据类。

初始化消息解码器。

参数:

名称 类型 描述 默认
config_path str

配置文件路径. Defaults to "controller/config/message_decoder_config.yaml".

'controller/config/message_decoder_config.yaml'
源代码位于: src/controller/controller/domain/utils/message_decoder.py
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
def __init__(self, config_path = "controller/config/message_decoder_config.yaml"):
    """初始化消息解码器。

    Args:
        config_path (str, optional): 配置文件路径. Defaults to "controller/config/message_decoder_config.yaml".
    """
    # 使用 ROS2 包资源管理方式查找配置文件
    if _HAS_AMENT_INDEX:
        try:
            # ROS2 方式:从 share 目录加载
            package_share_dir = get_package_share_directory('controller')
            config_path = os.path.join(package_share_dir, 'config', 'message_decoder_config.yaml')
        except Exception:
            # 如果包未安装,使用相对路径(开发模式)
            pass

    with open(config_path, 'r', encoding='utf-8') as f:
        self.config = yaml.safe_load(f)

    self.robot_utils = RobotUtils()
    self.parsers = self._register_parsers()
    self.MessageIn = self._create_message_class()

decode_message

decode_message(hex_message: str) -> Any

解码十六进制消息。

参数:

名称 类型 描述 默认
hex_message str

原始十六进制字符串。

必需

返回:

名称 类型 描述
Any Any

解码后的消息对象 (MessageIn)。

引发:

类型 描述
ValueError

如果消息格式错误或校验失败。

源代码位于: src/controller/controller/domain/utils/message_decoder.py
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
def decode_message(self, hex_message: str) -> Any:
    """解码十六进制消息。

    Args:
        hex_message (str): 原始十六进制字符串。

    Returns:
        Any: 解码后的消息对象 (MessageIn)。

    Raises:
        ValueError: 如果消息格式错误或校验失败。
    """
    # 移除空格和换行符
    hex_message = hex_message.replace(' ', '').replace('\n', '').replace('\r', '')

    # 验证协议头和尾
    protocol_config = self.config['protocol']
    header = protocol_config['header']
    footer = protocol_config['footer']

    if not hex_message.startswith(header):
        raise ValueError(f"Invalid header. Expected {header}, got {hex_message[:4]}")

    if not hex_message.endswith(footer):
        raise ValueError(f"Invalid footer. Expected {footer}, got {hex_message[-4:]}")


    # 验证CRC(如果启用)
    if protocol_config.get('crc_enabled', False):
        crc_bytes = protocol_config.get('crc_bytes', 2) * 2  # 转换为十六进制字符数
        message_without_crc = hex_message[:-len(footer)-crc_bytes]
        received_crc = hex_message[-len(footer)-crc_bytes:-len(footer)]

        calculated_crc = self.robot_utils.calculate_crc16(message_without_crc)
        calculated_crc_hex = f"{calculated_crc:04X}"

        if calculated_crc_hex != received_crc:
            raise ValueError(f"CRC校验失败: 接收={received_crc}, 计算={calculated_crc_hex}")

    # 解析各个字段
    fields_config = self.config['message_formats']['message_in']['fields']
    parsed_data = {}
    current_pos = 0

    for field_name, field_config in fields_config.items():
        field_bytes = field_config['bytes']
        hex_chars = field_bytes * 2  # 每个字节对应2个十六进制字符

        # 特殊处理CRC字段
        if field_name == 'crc':
            if protocol_config.get('crc_enabled', False):
                field_hex = hex_message[current_pos:current_pos + hex_chars]
            else:
                field_hex = ""
        else:
            field_hex = hex_message[current_pos:current_pos + hex_chars]

        # 解析字段
        parsed_value = self._parse_field(field_hex, field_config)
        parsed_data[field_name] = parsed_value

        current_pos += hex_chars

    return self.MessageIn(**parsed_data)

decode_message_detailed

decode_message_detailed(hex_message: str) -> Dict[str, Any]

解码消息并返回详细信息(包括原始十六进制值)。

参数:

名称 类型 描述 默认
hex_message str

原始十六进制字符串。

必需

返回:

类型 描述
Dict[str, Any]

Dict[str, Any]: 包含详细信息的字典。

源代码位于: src/controller/controller/domain/utils/message_decoder.py
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
def decode_message_detailed(self, hex_message: str) -> Dict[str, Any]:
    """解码消息并返回详细信息(包括原始十六进制值)。

    Args:
        hex_message (str): 原始十六进制字符串。

    Returns:
        Dict[str, Any]: 包含详细信息的字典。
    """
    message_in = self.decode_message(hex_message)

    # 创建详细信息字典
    detailed_info = {}
    fields_config = self.config['message_formats']['message_in']['fields']

    for field_name, _ in fields_config.items():
        value = getattr(message_in, field_name)
        detailed_info[field_name] = {
            'value': value,
            'description': fields_config[field_name].get('description', field_name)
        }

    return detailed_info

controller.domain.utils.message_encoder

MessageEncoder

MessageEncoder(config_path='controller/config/message_encoder_config.yaml')

消息编码器。

基于 YAML 配置文件动态生成十六进制消息。

属性:

名称 类型 描述
config dict

加载的配置字典。

robot_utils RobotUtils

工具类实例。

formatters Dict[str, BaseFormatter]

已注册的格式化器字典。

Message type

动态创建的消息数据类。

初始化消息编码器。

参数:

名称 类型 描述 默认
config_path str

配置文件路径. Defaults to "controller/config/message_encoder_config.yaml".

'controller/config/message_encoder_config.yaml'
源代码位于: src/controller/controller/domain/utils/message_encoder.py
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
def __init__(self, config_path = "controller/config/message_encoder_config.yaml"):
    """初始化消息编码器。

    Args:
        config_path (str, optional): 配置文件路径. Defaults to "controller/config/message_encoder_config.yaml".
    """
    # 使用 ROS2 包资源管理方式查找配置文件
    if _HAS_AMENT_INDEX:
        try:
            # ROS2 方式:从 share 目录加载
            package_share_dir = get_package_share_directory('controller')
            config_path = os.path.join(package_share_dir, 'config', 'message_encoder_config.yaml')
        except Exception:
            # 如果包未安装,使用相对路径(开发模式)
            pass

    with open(config_path, 'r', encoding='utf-8') as f:
        self.config = yaml.safe_load(f)

    self.robot_utils = RobotUtils()
    self.formatters = self._register_formatters()
    self.Message = self._create_message_class()

create_message

create_message(**kwargs: Any) -> Any

创建 MessageOut 实例。

参数:

名称 类型 描述 默认
**kwargs Any

消息字段值。

{}

返回:

名称 类型 描述
Any Any

消息对象 (MessageOut)。

源代码位于: src/controller/controller/domain/utils/message_encoder.py
134
135
136
137
138
139
140
141
142
143
def create_message(self, **kwargs: Any) -> Any:
    """创建 MessageOut 实例。

    Args:
        **kwargs: 消息字段值。

    Returns:
        Any: 消息对象 (MessageOut)。
    """
    return self.Message(**kwargs)

interpret_message

interpret_message(message: Any) -> str

根据配置动态将消息对象编码为字符串。

参数:

名称 类型 描述 默认
message Any

MessageOut 对象。

必需

返回:

名称 类型 描述
str str

编码后的十六进制字符串。

源代码位于: src/controller/controller/domain/utils/message_encoder.py
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
def interpret_message(self, message: Any) -> str:
    """根据配置动态将消息对象编码为字符串。

    Args:
        message (Any): MessageOut 对象。

    Returns:
        str: 编码后的十六进制字符串。
    """
    protocol_config = self.config['protocol']
    fields_config = self.config['message_formats']['message_out']['fields']

    # 1. 添加协议头
    command = protocol_config['header']

    # 2. 按配置顺序处理每个字段
    for field_name, field_config in fields_config.items():
        field_value = getattr(message, field_name)
        command += self._format_field(field_value, field_config)

    # 3. 计算CRC(如果启用)
    if protocol_config.get('crc_enabled', False):
        crc = self.robot_utils.calculate_crc16(bytes.fromhex(command))
        command += f"{(crc >> 8) & 0xFF:02X}{crc & 0xFF:02X}"

    # 4. 添加协议尾
    command += protocol_config['footer']

    return command

controller.domain.utils.image_drawing_utils

图像绘制工具类 - Domain层 用于在图像上绘制检测结果

ImageDrawingUtils

图像绘制工具类。

提供静态方法用于在 OpenCV 图像上绘制检测结果和辅助图形。

draw_detection_result staticmethod

draw_detection_result(image: ndarray, detection: Dict) -> np.ndarray

在图像上绘制检测结果。

参数:

名称 类型 描述 默认
image ndarray

原始图像(BGR)。

必需
detection Dict

检测结果字典,包含: - head_center (Tuple[float, float]): 头部中心点 (x, y)。 - central_center (Tuple[float, float]): 中央中心点 (x, y)。 - real_center (Tuple[float, float]): 实际中心点 (x, y)。 - angle (float): 角度(弧度)。 - depth (float): central_center 的深度(米)。 - real_depth (float): real_center 的深度(米)。

必需

返回:

类型 描述
ndarray

np.ndarray: 绘制后的图像(新图像,不修改原图)。

源代码位于: src/controller/controller/domain/utils/image_drawing_utils.py
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
@staticmethod
def draw_detection_result(image: np.ndarray, detection: Dict) -> np.ndarray:
    """在图像上绘制检测结果。

    Args:
        image (np.ndarray): 原始图像(BGR)。
        detection (Dict): 检测结果字典,包含:
            - head_center (Tuple[float, float]): 头部中心点 (x, y)。
            - central_center (Tuple[float, float]): 中央中心点 (x, y)。
            - real_center (Tuple[float, float]): 实际中心点 (x, y)。
            - angle (float): 角度(弧度)。
            - depth (float): central_center 的深度(米)。
            - real_depth (float): real_center 的深度(米)。

    Returns:
        np.ndarray: 绘制后的图像(新图像,不修改原图)。
    """
    if not detection:
        return image

    # 复制图像,避免修改原图
    result_image = image.copy()

    try:
        # 提取检测数据
        head_center = detection.get('head_center')
        central_center = detection.get('central_center')
        real_center = detection.get('real_center')
        angle = detection.get('angle', 0.0)
        depth = detection.get('depth', 0.0)
        real_depth = detection.get('real_depth', 0.0)

        # 转换为整数坐标
        if head_center:
            head_center = (int(head_center[0]), int(head_center[1]))
            # 绘制head_center(红色圆点)
            cv2.circle(result_image, head_center, 5, (0, 0, 255), -1)
            # 绘制angle文本
            cv2.putText(
                result_image, 
                f"angle: {angle:.2f}", 
                (head_center[0], head_center[1] - 10), 
                cv2.FONT_HERSHEY_SIMPLEX, 
                0.5, 
                (0, 0, 255), 
                2
            )

        if central_center:
            central_center = (int(central_center[0]), int(central_center[1]))
            # 绘制central_center(绿色圆点)
            cv2.circle(result_image, central_center, 5, (0, 255, 0), -1)
            # 绘制depth文本
            cv2.putText(
                result_image, 
                f"depth: {depth:.2f}", 
                (central_center[0], central_center[1] - 10), 
                cv2.FONT_HERSHEY_SIMPLEX, 
                0.5, 
                (0, 255, 0), 
                2
            )
            # 绘制方向箭头
            ImageDrawingUtils.draw_direction_arrow(result_image, central_center, angle)

        if real_center:
            real_center = (int(real_center[0]), int(real_center[1]))
            # 绘制real_center(红色圆点)
            cv2.circle(result_image, real_center, 5, (0, 0, 255), -1)
            # 绘制real_depth文本
            cv2.putText(
                result_image, 
                f"real_depth: {real_depth:.2f}", 
                (real_center[0], real_center[1] - 10), 
                cv2.FONT_HERSHEY_SIMPLEX, 
                0.5, 
                (0, 0, 255), 
                2
            )

    except Exception as e:
        # 如果绘制失败,返回原图
        return image

    return result_image

draw_direction_arrow staticmethod

draw_direction_arrow(image: ndarray, center: Tuple[int, int], angle: float)

绘制方向箭头。

参数:

名称 类型 描述 默认
image ndarray

图像(原地修改)。

必需
center Tuple[int, int]

中心点坐标 (x, y)。

必需
angle float

角度(弧度)- 直着向上为 0,顺时针为正。

必需
源代码位于: src/controller/controller/domain/utils/image_drawing_utils.py
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
@staticmethod
def draw_direction_arrow(image: np.ndarray, center: Tuple[int, int], angle: float):
    """绘制方向箭头。

    Args:
        image (np.ndarray): 图像(原地修改)。
        center (Tuple[int, int]): 中心点坐标 (x, y)。
        angle (float): 角度(弧度)- 直着向上为 0,顺时针为正。
    """
    try:
        # 箭头参数
        arrow_length = 50  # 箭头长度(像素)
        arrow_color = (255, 0, 0)  # 蓝色(BGR格式)
        arrow_thickness = 3

        # 计算箭头终点
        # 角度系统:直着向上为0,顺时针为正
        # 向上方向为 (0, -1),因为图像坐标系Y轴向下
        end_x = center[0] + arrow_length * math.sin(angle)
        end_y = center[1] - arrow_length * math.cos(angle)
        end_point = (int(end_x), int(end_y))

        # 绘制箭头
        cv2.arrowedLine(
            image, 
            center, 
            end_point, 
            arrow_color, 
            arrow_thickness, 
            tipLength=0.3
        )

    except Exception as e:
        pass

controller.domain.utils.kinematic_utils

KinematicUtils

运动学工具类。

提供基本的运动学变换和角度处理函数。

dh2rm staticmethod

dh2rm(a: float, alpha: float, d: float, theta: float) -> np.ndarray

根据 DH 参数计算变换矩阵。

参数:

名称 类型 描述 默认
a float

连杆长度。

必需
alpha float

连杆扭转角。

必需
d float

连杆偏移。

必需
theta float

关节角。

必需

返回:

类型 描述
ndarray

np.ndarray: 4x4 齐次变换矩阵。

源代码位于: src/controller/controller/domain/utils/kinematic_utils.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
@staticmethod
def dh2rm(a: float, alpha: float, d: float, theta: float) -> np.ndarray:
    """根据 DH 参数计算变换矩阵。

    Args:
        a (float): 连杆长度。
        alpha (float): 连杆扭转角。
        d (float): 连杆偏移。
        theta (float): 关节角。

    Returns:
        np.ndarray: 4x4 齐次变换矩阵。
    """
    c_theta = cos(theta)
    s_theta = sin(theta)
    c_alpha = cos(alpha)
    s_alpha = sin(alpha)

    return np.array([
        [c_theta, -s_theta, 0, a],
        [s_theta * c_alpha, c_theta * c_alpha, -s_alpha, -s_alpha * d],
        [s_theta * s_alpha, c_theta * s_alpha, c_alpha, c_alpha * d],
        [0, 0, 0, 1]
    ], dtype=np.float64)

rm2quat staticmethod

rm2quat(rm: ndarray) -> np.ndarray

旋转矩阵转四元数。

参数:

名称 类型 描述 默认
rm ndarray

3x3 旋转矩阵或 4x4 变换矩阵。

必需

返回:

类型 描述
ndarray

np.ndarray: 四元数 [x, y, z, w]。

源代码位于: src/controller/controller/domain/utils/kinematic_utils.py
36
37
38
39
40
41
42
43
44
45
46
47
48
49
@staticmethod
def rm2quat(rm: np.ndarray) -> np.ndarray:
    """旋转矩阵转四元数。

    Args:
        rm (np.ndarray): 3x3 旋转矩阵或 4x4 变换矩阵。

    Returns:
        np.ndarray: 四元数 [x, y, z, w]。
    """
    # 如果是 4x4 矩阵,提取 3x3 部分
    if rm.shape == (4, 4):
        rm = rm[:3, :3]
    return R.from_matrix(rm).as_quat()

quat2rm staticmethod

quat2rm(quat: ndarray) -> np.ndarray

四元数转旋转矩阵。

参数:

名称 类型 描述 默认
quat ndarray

四元数 [x, y, z, w]。

必需

返回:

类型 描述
ndarray

np.ndarray: 3x3 旋转矩阵。

源代码位于: src/controller/controller/domain/utils/kinematic_utils.py
51
52
53
54
55
56
57
58
59
60
61
@staticmethod
def quat2rm(quat: np.ndarray) -> np.ndarray:
    """四元数转旋转矩阵。

    Args:
        quat (np.ndarray): 四元数 [x, y, z, w]。

    Returns:
        np.ndarray: 3x3 旋转矩阵。
    """
    return R.from_quat(quat).as_matrix()

quat2euler staticmethod

quat2euler(quat: ndarray) -> np.ndarray

四元数转欧拉角 (XYZ 顺序)。

参数:

名称 类型 描述 默认
quat ndarray

四元数 [x, y, z, w]。

必需

返回:

类型 描述
ndarray

np.ndarray: 欧拉角 [roll, pitch, yaw] (弧度)。

源代码位于: src/controller/controller/domain/utils/kinematic_utils.py
63
64
65
66
67
68
69
70
71
72
73
@staticmethod
def quat2euler(quat: np.ndarray) -> np.ndarray:
    """四元数转欧拉角 (XYZ 顺序)。

    Args:
        quat (np.ndarray): 四元数 [x, y, z, w]。

    Returns:
        np.ndarray: 欧拉角 [roll, pitch, yaw] (弧度)。
    """
    return R.from_quat(quat).as_euler('xyz', degrees=False)

euler2quat staticmethod

euler2quat(euler: ndarray) -> np.ndarray

欧拉角转四元数 (XYZ 顺序)。

参数:

名称 类型 描述 默认
euler ndarray

欧拉角 [roll, pitch, yaw] (弧度)。

必需

返回:

类型 描述
ndarray

np.ndarray: 四元数 [x, y, z, w]。

源代码位于: src/controller/controller/domain/utils/kinematic_utils.py
75
76
77
78
79
80
81
82
83
84
85
@staticmethod
def euler2quat(euler: np.ndarray) -> np.ndarray:
    """欧拉角转四元数 (XYZ 顺序)。

    Args:
        euler (np.ndarray): 欧拉角 [roll, pitch, yaw] (弧度)。

    Returns:
        np.ndarray: 四元数 [x, y, z, w]。
    """
    return R.from_euler('xyz', euler, degrees=False).as_quat()

euler2rm staticmethod

euler2rm(euler: ndarray) -> np.ndarray

欧拉角转旋转矩阵 (XYZ 顺序)。

参数:

名称 类型 描述 默认
euler ndarray

欧拉角 [roll, pitch, yaw] (弧度)。

必需

返回:

类型 描述
ndarray

np.ndarray: 3x3 旋转矩阵。

源代码位于: src/controller/controller/domain/utils/kinematic_utils.py
87
88
89
90
91
92
93
94
95
96
97
@staticmethod
def euler2rm(euler: np.ndarray) -> np.ndarray:
    """欧拉角转旋转矩阵 (XYZ 顺序)。

    Args:
        euler (np.ndarray): 欧拉角 [roll, pitch, yaw] (弧度)。

    Returns:
        np.ndarray: 3x3 旋转矩阵。
    """
    return R.from_euler('xyz', euler, degrees=False).as_matrix()

rm2euler staticmethod

rm2euler(rm: ndarray) -> np.ndarray

旋转矩阵转欧拉角 (XYZ 顺序)。

参数:

名称 类型 描述 默认
rm ndarray

3x3 旋转矩阵。

必需

返回:

类型 描述
ndarray

np.ndarray: 欧拉角 [roll, pitch, yaw] (弧度)。

源代码位于: src/controller/controller/domain/utils/kinematic_utils.py
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
@staticmethod
def rm2euler(rm: np.ndarray) -> np.ndarray:
    """旋转矩阵转欧拉角 (XYZ 顺序)。

    Args:
        rm (np.ndarray): 3x3 旋转矩阵。

    Returns:
        np.ndarray: 欧拉角 [roll, pitch, yaw] (弧度)。
    """
    # 如果是 4x4 矩阵,提取 3x3 部分
    if rm.shape == (4, 4):
        rm = rm[:3, :3]
    return R.from_matrix(rm).as_euler('xyz', degrees=False)

normalize_angle staticmethod

normalize_angle(angle: float) -> float

将角度归一化到 [-pi, pi] 范围。

参数:

名称 类型 描述 默认
angle float

输入角度(弧度)。

必需

返回:

名称 类型 描述
float float

归一化后的角度(弧度)。

源代码位于: src/controller/controller/domain/utils/kinematic_utils.py
114
115
116
117
118
119
120
121
122
123
124
@staticmethod
def normalize_angle(angle: float) -> float:
    """将角度归一化到 [-pi, pi] 范围。

    Args:
        angle (float): 输入角度(弧度)。

    Returns:
        float: 归一化后的角度(弧度)。
    """
    return (angle + pi) % (2 * pi) - pi

controller.domain.utils.robot_utils

RobotUtils

RobotUtils()

机器人工具类。

提供机器人相关的数值转换、校验和计算等工具方法。

初始化机器人参数。

源代码位于: src/controller/controller/domain/utils/robot_utils.py
10
11
12
13
14
15
def __init__(self):
    """初始化机器人参数。"""
    self.JOINT_OFFSETS = [79119, 369835, 83627, 392414, 507293, 456372]
    self.RADIAN_TO_POS_SCALE_FACTOR = (2**19) / (2 * pi)  # 弧度转位置值的系数
    self.POS_TO_RADIAN_SCALE_FACTOR = (2 * pi) / (2**19)  # 位置值转弧度的系数
    self.init_radians = [0, -pi/2, 0, pi/2, 0, 0]

position2radian

position2radian(position: list) -> list

编码器位置值转换为弧度。

参数:

名称 类型 描述 默认
position list

编码器位置值列表。

必需

返回:

名称 类型 描述
list list

关节角度列表(弧度)。

源代码位于: src/controller/controller/domain/utils/robot_utils.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
def position2radian(self, position: list) -> list:
    """编码器位置值转换为弧度。

    Args:
        position (list): 编码器位置值列表。

    Returns:
        list: 关节角度列表(弧度)。
    """
    radians = self.init_radians.copy()
    for i, pos in enumerate(position):
        if isinstance(pos, str):
            pos = int(pos)
        radians[i] += (pos - self.JOINT_OFFSETS[i]) * self.POS_TO_RADIAN_SCALE_FACTOR
    return radians

radian2position

radian2position(radian: list) -> list

弧度转换为编码器位置值。

参数:

名称 类型 描述 默认
radian list

关节角度列表(弧度)。

必需

返回:

名称 类型 描述
list list

编码器位置值列表(32位整数)。

源代码位于: src/controller/controller/domain/utils/robot_utils.py
33
34
35
36
37
38
39
40
41
42
43
44
45
46
def radian2position(self, radian: list) -> list:
    """弧度转换为编码器位置值。

    Args:
        radian (list): 关节角度列表(弧度)。

    Returns:
        list: 编码器位置值列表(32位整数)。
    """
    positions = []
    for i, rad in enumerate(radian):
        position = int((rad - self.init_radians[i]) * self.RADIAN_TO_POS_SCALE_FACTOR + self.JOINT_OFFSETS[i]) & 0xFFFFFFFF
        positions.append(position)
    return positions

speed2position

speed2position(speed: list) -> list

速度值转换为位置增量值。

参数:

名称 类型 描述 默认
speed list

速度列表。

必需

返回:

名称 类型 描述
list list

对应的时间步长内的位置增量值。

源代码位于: src/controller/controller/domain/utils/robot_utils.py
48
49
50
51
52
53
54
55
56
57
def speed2position(self, speed: list) -> list:
    """速度值转换为位置增量值。

    Args:
        speed (list): 速度列表。

    Returns:
        list: 对应的时间步长内的位置增量值。
    """
    return [int(position * self.RADIAN_TO_POS_SCALE_FACTOR) & 0xFFFFFF for position in speed]

torque_transfer staticmethod

torque_transfer(torque: list) -> list

力矩值转换与归一化。

参数:

名称 类型 描述 默认
torque list

力矩值列表,长度不超过6。

必需

返回:

名称 类型 描述
list list

转换后的力矩值列表(16位整数)。

引发:

类型 描述
ValueError

如果力矩列表长度超过6。

源代码位于: src/controller/controller/domain/utils/robot_utils.py
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
@staticmethod
def torque_transfer(torque: list) -> list:
    """力矩值转换与归一化。

    Args:
        torque (list): 力矩值列表,长度不超过6。

    Returns:
        list: 转换后的力矩值列表(16位整数)。

    Raises:
        ValueError: 如果力矩列表长度超过6。
    """
    if len(torque) > 6:
        raise ValueError("力矩值列表长度不能超过6")
    transfer_torque = []
    for t in torque[:3]:
        t = min(max(t, -50), 50)
        transfer_torque.append(int(t / 87 * 1000))
    for t in torque[3:]:
        t = min(max(t, -10), 10)
        transfer_torque.append(int(t * 100))
    return [int(t) & 0xFFFF for t in transfer_torque]

effector2hex staticmethod

effector2hex(effector_data: float) -> list

末端执行器数据转换为十六进制字节列表。

将浮点数拆分为整数部分和小数部分,分别转换为 2 字节的大端序十六进制。

参数:

名称 类型 描述 默认
effector_data float

末端执行器数据。

必需

返回:

名称 类型 描述
list list

包含两个十六进制字符串的列表 [整数部分hex, 小数部分hex]。

源代码位于: src/controller/controller/domain/utils/robot_utils.py
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
@staticmethod
def effector2hex(effector_data: float) -> list:
    """末端执行器数据转换为十六进制字节列表。

    将浮点数拆分为整数部分和小数部分,分别转换为 2 字节的大端序十六进制。

    Args:
        effector_data (float): 末端执行器数据。

    Returns:
        list: 包含两个十六进制字符串的列表 [整数部分hex, 小数部分hex]。
    """
    effector_data = str(float(effector_data))
    arr = effector_data.split('.')
    return [int(arr[0]).to_bytes(2, 'big').hex(), int(arr[1]).to_bytes(2, 'big').hex()]

calculate_crc16 staticmethod

calculate_crc16(data) -> int

计算 CRC16 校验和 (CCITT)。

参数:

名称 类型 描述 默认
data Union[bytes, str]

字节数组或十六进制字符串。

必需

返回:

名称 类型 描述
int int

16位校验和。如果输入格式错误返回 False。

源代码位于: src/controller/controller/domain/utils/robot_utils.py
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
@staticmethod
def calculate_crc16(data) -> int:
    """计算 CRC16 校验和 (CCITT)。

    Args:
        data (Union[bytes, str]): 字节数组或十六进制字符串。

    Returns:
        int: 16位校验和。如果输入格式错误返回 False。
    """
    if isinstance(data, str):
        try:
            data = bytes.fromhex(data.strip())
        except Exception as e:
            return False

    # 计算CRC16-CCITT
    crc = 0xFFFF
    for byte in data:
        crc ^= (int(byte) << 8)
        for _ in range(8):
            if crc & 0x8000:
                crc = (crc << 1) ^ 0x1021
            else:
                crc = crc << 1
            crc &= 0xFFFF

    return crc