跳转至

Camera Application Service

controller.application.services.camera_application_service

摄像头应用服务 - Application层 协调摄像头Domain服务、检测服务和消息显示 提供统一的查询接口,避免ViewModel直接访问Domain层

CameraApplicationService

CameraApplicationService(camera_service: CameraDomainService, recognition_service: RecognitionDomainService, hand_eye_service: HandEyeTransformDomainService, kinematic_service: KinematicDomainService, robot_state_service: RobotStateDomainService, motion_constructor: MotionConstructor, command_hub: CommandHubService, message_display: MessageDisplay)

Bases: QObject

摄像头应用服务。

协调摄像头Domain服务、检测服务和消息显示,提供统一的查询接口,避免ViewModel直接访问Domain层。

属性:

名称 类型 描述
connection_status_changed pyqtSignal

摄像头连接状态变化信号。

detection_status_changed pyqtSignal

检测状态变化信号。

初始化摄像头应用服务。

源代码位于: src/controller/controller/application/services/camera_application_service.py
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
def __init__(
    self,
    camera_service: CameraDomainService,
    recognition_service: RecognitionDomainService,
    hand_eye_service: HandEyeTransformDomainService,
    kinematic_service: KinematicDomainService,
    robot_state_service: RobotStateDomainService,
    motion_constructor: MotionConstructor,
    command_hub: CommandHubService,
    message_display: MessageDisplay
):
    """初始化摄像头应用服务。"""
    super().__init__()
    self.camera_service = camera_service
    self.recognition_service = recognition_service
    self.hand_eye_service = hand_eye_service
    self.kinematic_service = kinematic_service
    self.robot_state_service = robot_state_service
    self.motion_constructor = motion_constructor
    self.command_hub = command_hub
    self.message_display = message_display

    # 连接Domain Service信号
    self._connect_signals()

connect_camera

connect_camera()

连接摄像头。

源代码位于: src/controller/controller/application/services/camera_application_service.py
64
65
66
67
68
69
70
71
72
73
74
75
76
def connect_camera(self):
    """连接摄像头。"""
    self.message_display.clear_messages()
    self._display_message("正在连接摄像头...", "摄像头")

    success = self.camera_service.connect()

    if success:
        self._display_message("摄像头连接成功", "摄像头")
        self.connection_status_changed.emit(True)
    else:
        self._display_message("摄像头连接失败", "错误")
        self.connection_status_changed.emit(False)

disconnect_camera

disconnect_camera()

断开摄像头。

源代码位于: src/controller/controller/application/services/camera_application_service.py
78
79
80
81
82
83
84
85
86
87
88
89
def disconnect_camera(self):
    """断开摄像头。"""
    self.message_display.clear_messages()
    self._display_message("正在断开摄像头...", "摄像头")

    success = self.camera_service.disconnect()

    if success:
        self._display_message("摄像头已断开", "摄像头")
        self.connection_status_changed.emit(False)
    else:
        self._display_message("断开摄像头失败", "错误")

start_detection

start_detection()

开始检测。

源代码位于: src/controller/controller/application/services/camera_application_service.py
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
def start_detection(self):
    """开始检测。"""
    self.message_display.clear_messages()
    self._display_message("正在启动检测...", "检测")

    success = self.recognition_service.start_detection()

    if success:
        self._display_message("开始零件识别", "检测")
        self.detection_status_changed.emit(True)
    else:
        self._display_message("启动检测失败", "错误")
        self.detection_status_changed.emit(False)

stop_detection

stop_detection()

停止检测。

源代码位于: src/controller/controller/application/services/camera_application_service.py
105
106
107
108
109
110
111
112
113
114
115
116
def stop_detection(self):
    """停止检测。"""
    self.message_display.clear_messages()
    self._display_message("正在停止检测...", "检测")

    success = self.recognition_service.stop_detection()

    if success:
        self._display_message("停止零件识别", "检测")
        self.detection_status_changed.emit(False)
    else:
        self._display_message("停止检测失败", "错误")

move_to_detected_part

move_to_detected_part()

运动到检测到的零件位置。

完整流程: 1. 检查检测状态 2. 获取检测结果 3. 获取当前关节角度 4. 手眼标定计算目标关节角度 5. 构建运动任务 6. 触发运动执行 7. 停止检测(可选)

源代码位于: src/controller/controller/application/services/camera_application_service.py
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
def move_to_detected_part(self):
    """运动到检测到的零件位置。

    完整流程:
    1. 检查检测状态
    2. 获取检测结果
    3. 获取当前关节角度
    4. 手眼标定计算目标关节角度
    5. 构建运动任务
    6. 触发运动执行
    7. 停止检测(可选)
    """
    # 1. 检查检测是否运行
    if not self.recognition_service.is_detection_running():
        self._display_message("检测未运行,无法执行运动", "错误")
        return

    # 2. 获取最新检测结果
    detection_result = self.recognition_service.get_latest_result()
    if not detection_result:
        self._display_message("未检测到零件,无法执行运动", "警告")
        return

    # 3. 获取当前关节角度
    current_state = self.robot_state_service.get_current_state()
    if not current_state:
        self._display_message("无法获取当前机器人状态", "错误")
        return

    current_joint_angles = current_state.joint_angles  # ✅ 修复:正确的属性名(弧度)

    # 4. 手眼标定计算目标关节角度
    try:
        target_angles = self.hand_eye_service.calculate_target_joint_angles(
            central_center=detection_result['central_center'],
            depth=detection_result['depth'],
            real_center=detection_result['real_center'],
            real_depth=detection_result['real_depth'],
            angle=detection_result['angle'],
            current_joint_angles=current_joint_angles
        )

        if target_angles is None:
            self._display_message("逆运动学无解,无法到达目标位置", "错误")
            return

    except Exception as e:
        self._display_message(f"计算目标位姿失败: {str(e)}", "错误")
        return

    # 5. 构建运动任务
    motion_task = {
        'type': 'motion',
        'target_angles': target_angles,
        'curve_type': 's_curve',  # 's_curve' 或 'linear'(笛卡尔直线)
        'frequency': 0.01
    }

    # 6. 准备运动并触发执行
    try:
        # 使用统一的 prepare_operation 接口
        self.motion_constructor.prepare_operation(
            MotionOperationMode.EXECUTE,
            [motion_task]  # 需要传入任务列表
        )
        self.command_hub.get_current_position()  # 触发运动执行

        self._display_message(
            f"开始运动到零件位置 "
            f"(中心: {detection_result['central_center']}, "
            f"深度: {detection_result['depth']:.2f}mm, "
            f"角度: {np.degrees(detection_result['angle']):.1f}°)",
            "运动"
        )

    except Exception as e:
        self._display_message(f"启动运动失败: {str(e)}", "错误")
        return

is_camera_connected

is_camera_connected() -> bool

检查摄像头是否连接。

返回:

名称 类型 描述
bool bool

True=已连接, False=未连接。

源代码位于: src/controller/controller/application/services/camera_application_service.py
272
273
274
275
276
277
278
def is_camera_connected(self) -> bool:
    """检查摄像头是否连接。

    Returns:
        bool: True=已连接, False=未连接。
    """
    return self.camera_service.is_connected

is_color_available

is_color_available() -> bool

检查彩色图像是否可用。

返回:

名称 类型 描述
bool bool

True=有数据, False=无数据。

源代码位于: src/controller/controller/application/services/camera_application_service.py
280
281
282
283
284
285
286
def is_color_available(self) -> bool:
    """检查彩色图像是否可用。

    Returns:
        bool: True=有数据, False=无数据。
    """
    return self.camera_service.is_color_available()

is_depth_available

is_depth_available() -> bool

检查深度图像是否可用。

返回:

名称 类型 描述
bool bool

True=有数据, False=无数据。

源代码位于: src/controller/controller/application/services/camera_application_service.py
288
289
290
291
292
293
294
def is_depth_available(self) -> bool:
    """检查深度图像是否可用。

    Returns:
        bool: True=有数据, False=无数据。
    """
    return self.camera_service.is_depth_available()

get_latest_color_image

get_latest_color_image() -> Optional[np.ndarray]

获取最新彩色图像。

返回:

类型 描述
Optional[ndarray]

Optional[np.ndarray]: 彩色图像(BGR格式),如果无数据则返回None。

源代码位于: src/controller/controller/application/services/camera_application_service.py
296
297
298
299
300
301
302
def get_latest_color_image(self) -> Optional[np.ndarray]:
    """获取最新彩色图像。

    Returns:
        Optional[np.ndarray]: 彩色图像(BGR格式),如果无数据则返回None。
    """
    return self.camera_service.get_latest_color_image()

get_latest_depth_image

get_latest_depth_image() -> Optional[np.ndarray]

获取最新深度图像。

返回:

类型 描述
Optional[ndarray]

Optional[np.ndarray]: 深度图像(16位或32位浮点),如果无数据则返回None。

源代码位于: src/controller/controller/application/services/camera_application_service.py
304
305
306
307
308
309
310
def get_latest_depth_image(self) -> Optional[np.ndarray]:
    """获取最新深度图像。

    Returns:
        Optional[np.ndarray]: 深度图像(16位或32位浮点),如果无数据则返回None。
    """
    return self.camera_service.get_latest_depth_image()

visualize_depth_image

visualize_depth_image(depth_image: ndarray) -> np.ndarray

深度图可视化为伪彩色图。

参数:

名称 类型 描述 默认
depth_image ndarray

原始深度图(16位或32位浮点)。

必需

返回:

类型 描述
ndarray

np.ndarray: 伪彩色深度图(BGR, uint8)。

源代码位于: src/controller/controller/application/services/camera_application_service.py
312
313
314
315
316
317
318
319
320
321
def visualize_depth_image(self, depth_image: np.ndarray) -> np.ndarray:
    """深度图可视化为伪彩色图。

    Args:
        depth_image (np.ndarray): 原始深度图(16位或32位浮点)。

    Returns:
        np.ndarray: 伪彩色深度图(BGR, uint8)。
    """
    return self.camera_service.visualize_depth_image(depth_image)

is_detection_running

is_detection_running() -> bool

检查检测是否正在运行。

返回:

名称 类型 描述
bool bool

True=运行中, False=未运行。

源代码位于: src/controller/controller/application/services/camera_application_service.py
323
324
325
326
327
328
329
def is_detection_running(self) -> bool:
    """检查检测是否正在运行。

    Returns:
        bool: True=运行中, False=未运行。
    """
    return self.recognition_service.is_detection_running()

get_latest_detection_result

get_latest_detection_result() -> Optional[Dict]

获取最新检测结果。

返回:

类型 描述
Optional[Dict]

Optional[Dict]: 检测结果字典,如果无数据则返回None。 包含字段:head_center, central_center, real_center, angle, depth, real_depth。

源代码位于: src/controller/controller/application/services/camera_application_service.py
331
332
333
334
335
336
337
338
def get_latest_detection_result(self) -> Optional[Dict]:
    """获取最新检测结果。

    Returns:
        Optional[Dict]: 检测结果字典,如果无数据则返回None。
            包含字段:head_center, central_center, real_center, angle, depth, real_depth。
    """
    return self.recognition_service.get_latest_result()

get_image_with_detection

get_image_with_detection(image: ndarray) -> np.ndarray

获取叠加了检测结果的图像。

封装了检测状态检查和图像绘制逻辑,提供高层业务接口。

参数:

名称 类型 描述 默认
image ndarray

原始图像(BGR格式)。

必需

返回:

类型 描述
ndarray

np.ndarray: 如果检测正在运行且有结果,返回叠加了检测标注的图像; 否则返回原图像。

源代码位于: src/controller/controller/application/services/camera_application_service.py
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
def get_image_with_detection(self, image: np.ndarray) -> np.ndarray:
    """获取叠加了检测结果的图像。

    封装了检测状态检查和图像绘制逻辑,提供高层业务接口。

    Args:
        image (np.ndarray): 原始图像(BGR格式)。

    Returns:
        np.ndarray: 如果检测正在运行且有结果,返回叠加了检测标注的图像;
                   否则返回原图像。
    """
    # 检查检测是否运行
    if self.recognition_service.is_detection_running():
        # 获取最新检测结果
        detection = self.recognition_service.get_latest_result()
        if detection:
            # 绘制检测结果到图像上
            return ImageDrawingUtils.draw_detection_result(image, detection)

    # 无检测或无结果,返回原图
    return image