跳转至

Domain Layer - Vision Services

controller.domain.services.vision.camera_domain_service

摄像头领域服务 - Domain层 管理ROS2摄像头订阅节点

ColorImageSubscriber

ColorImageSubscriber(callback)

Bases: Node

彩色图像订阅节点。

属性:

名称 类型 描述
callback callable

图像接收回调函数。

bridge CvBridge

CV 桥接器。

subscription Subscription

ROS 订阅对象。

初始化彩色图像订阅节点。

参数:

名称 类型 描述 默认
callback callable

图像接收回调函数。

必需
源代码位于: src/controller/controller/domain/services/vision/camera_domain_service.py
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
def __init__(self, callback):
    """初始化彩色图像订阅节点。

    Args:
        callback (callable): 图像接收回调函数。
    """
    super().__init__('color_image_subscriber')
    self.callback = callback
    self.bridge = CvBridge()

    # 订阅彩色图像话题
    self.subscription = self.create_subscription(
        Image,
        '/camera/color/image_raw',
        self.image_callback,
        10
    )

image_callback

image_callback(msg)

图像回调函数。

参数:

名称 类型 描述 默认
msg Image

ROS 图像消息。

必需
源代码位于: src/controller/controller/domain/services/vision/camera_domain_service.py
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
def image_callback(self, msg):
    """图像回调函数。

    Args:
        msg (Image): ROS 图像消息。
    """
    try:
        # 转换为OpenCV格式(BGR)
        cv_image = self.bridge.imgmsg_to_cv2(msg, desired_encoding='bgr8')

        # 调用回调函数
        if self.callback:
            self.callback(cv_image)

    except CvBridgeError as e:
        self.get_logger().error(f'彩色图像转换失败: {e}')

DepthImageSubscriber

DepthImageSubscriber(callback)

Bases: Node

深度图像订阅节点。

属性:

名称 类型 描述
callback callable

图像接收回调函数。

bridge CvBridge

CV 桥接器。

subscription Subscription

ROS 订阅对象。

初始化深度图像订阅节点。

参数:

名称 类型 描述 默认
callback callable

图像接收回调函数。

必需
源代码位于: src/controller/controller/domain/services/vision/camera_domain_service.py
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
def __init__(self, callback):
    """初始化深度图像订阅节点。

    Args:
        callback (callable): 图像接收回调函数。
    """
    super().__init__('depth_image_subscriber')
    self.callback = callback
    self.bridge = CvBridge()

    # 订阅深度图像话题
    self.subscription = self.create_subscription(
        Image,
        '/camera/depth/image_raw',
        self.image_callback,
        10
    )

image_callback

image_callback(msg)

图像回调函数。

参数:

名称 类型 描述 默认
msg Image

ROS 图像消息。

必需
源代码位于: src/controller/controller/domain/services/vision/camera_domain_service.py
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
def image_callback(self, msg):
    """图像回调函数。

    Args:
        msg (Image): ROS 图像消息。
    """
    try:
        # 处理不同的深度图编码格式
        if msg.encoding == 'mono16' or msg.encoding == '16UC1':
            cv_image = self.bridge.imgmsg_to_cv2(msg, desired_encoding='16UC1')
        elif msg.encoding == '32FC1':
            cv_image = self.bridge.imgmsg_to_cv2(msg, desired_encoding='32FC1')
        else:
            # 尝试直接转换
            cv_image = self.bridge.imgmsg_to_cv2(msg, desired_encoding='passthrough')

        # 调用回调函数
        if self.callback:
            self.callback(cv_image)

    except CvBridgeError as e:
        self.get_logger().error(f'深度图像转换失败: {e}')
    except Exception as e:
        self.get_logger().error(f'深度图像处理失败: {e}')

CameraDomainService

CameraDomainService()

Bases: QObject

摄像头领域服务。

职责: - 管理 ROS2 摄像头订阅节点的生命周期 - 维护最新的彩色/深度图像数据(线程安全) - 发射图像接收信号 - 深度图可视化处理

属性:

名称 类型 描述
color_image_received pyqtSignal

彩色图像接收信号,携带图像数据 (np.ndarray)。

depth_image_received pyqtSignal

深度图像接收信号,携带图像数据 (np.ndarray)。

connection_status_changed pyqtSignal

连接状态变更信号,携带 (connected, message)。

error_occurred pyqtSignal

错误信号,携带错误信息。

初始化摄像头领域服务。

源代码位于: src/controller/controller/domain/services/vision/camera_domain_service.py
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
def __init__(self):
    """初始化摄像头领域服务。"""
    super().__init__()

    # ROS节点和执行器
    self.color_subscriber = None
    self.depth_subscriber = None
    self.ros_executor = None
    self.ros_thread = None

    # cv_bridge
    self.bridge = CvBridge()

    # 图像数据存储(线程安全)
    self.latest_color_image = None
    self.latest_depth_image = None
    self.color_image_lock = threading.Lock()
    self.depth_image_lock = threading.Lock()

    # 连接状态
    self.is_connected = False
    self.color_connected = False  # 彩色流是否有数据
    self.depth_connected = False  # 深度流是否有数据

connect

connect() -> bool

连接摄像头(启动 ROS 订阅)。

返回:

名称 类型 描述
bool bool

连接成功返回 True,失败返回 False。

源代码位于: src/controller/controller/domain/services/vision/camera_domain_service.py
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
def connect(self) -> bool:
    """连接摄像头(启动 ROS 订阅)。

    Returns:
        bool: 连接成功返回 True,失败返回 False。
    """
    try:
        if self.is_connected:
            return True

        # 检查并初始化ROS(灵活策略,参考旧版)
        if not rclpy.ok():
            try:
                rclpy.init()
            except RuntimeError as e:
                # ROS已经初始化但状态异常
                self.error_occurred.emit(f"ROS状态异常: {str(e)}")
                return False

        # 创建订阅节点
        self.color_subscriber = ColorImageSubscriber(self._on_color_image_received)
        self.depth_subscriber = DepthImageSubscriber(self._on_depth_image_received)

        # 创建executor
        self.ros_executor = MultiThreadedExecutor()
        self.ros_executor.add_node(self.color_subscriber)
        self.ros_executor.add_node(self.depth_subscriber)

        # 在独立线程运行executor
        self.ros_thread = threading.Thread(target=self._ros_spin_worker, daemon=True)
        self.ros_thread.start()

        self.is_connected = True
        self.connection_status_changed.emit(True, "摄像头连接成功")
        return True

    except Exception as e:
        self.error_occurred.emit(f"连接摄像头失败: {str(e)}")
        return False

disconnect

disconnect() -> bool

断开摄像头(关闭 ROS 订阅)。

返回:

名称 类型 描述
bool bool

断开成功返回 True,失败返回 False。

源代码位于: src/controller/controller/domain/services/vision/camera_domain_service.py
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
def disconnect(self) -> bool:
    """断开摄像头(关闭 ROS 订阅)。

    Returns:
        bool: 断开成功返回 True,失败返回 False。
    """
    try:
        if not self.is_connected:
            return True

        self.is_connected = False
        self.color_connected = False
        self.depth_connected = False

        # 停止executor(只影响摄像头节点)
        if self.ros_executor:
            self.ros_executor.shutdown()

        # 销毁节点
        if self.color_subscriber:
            self.color_subscriber.destroy_node()
            self.color_subscriber = None
        if self.depth_subscriber:
            self.depth_subscriber.destroy_node()
            self.depth_subscriber = None

        # 等待线程结束
        if self.ros_thread and self.ros_thread.is_alive():
            self.ros_thread.join(timeout=1.0)

        self.ros_executor = None
        self.ros_thread = None

        # 清空图像数据
        with self.color_image_lock:
            self.latest_color_image = None
        with self.depth_image_lock:
            self.latest_depth_image = None

        self.connection_status_changed.emit(False, "摄像头已断开")
        return True

    except Exception as e:
        self.error_occurred.emit(f"断开摄像头失败: {str(e)}")
        return False

get_latest_color_image

get_latest_color_image() -> Optional[np.ndarray]

获取最新彩色图像(线程安全)。

返回:

类型 描述
Optional[ndarray]

Optional[np.ndarray]: 图像数据,无数据返回 None。

源代码位于: src/controller/controller/domain/services/vision/camera_domain_service.py
249
250
251
252
253
254
255
256
def get_latest_color_image(self) -> Optional[np.ndarray]:
    """获取最新彩色图像(线程安全)。

    Returns:
        Optional[np.ndarray]: 图像数据,无数据返回 None。
    """
    with self.color_image_lock:
        return self.latest_color_image.copy() if self.latest_color_image is not None else None

get_latest_depth_image

get_latest_depth_image() -> Optional[np.ndarray]

获取最新深度图像(线程安全)。

返回:

类型 描述
Optional[ndarray]

Optional[np.ndarray]: 图像数据,无数据返回 None。

源代码位于: src/controller/controller/domain/services/vision/camera_domain_service.py
258
259
260
261
262
263
264
265
def get_latest_depth_image(self) -> Optional[np.ndarray]:
    """获取最新深度图像(线程安全)。

    Returns:
        Optional[np.ndarray]: 图像数据,无数据返回 None。
    """
    with self.depth_image_lock:
        return self.latest_depth_image.copy() if self.latest_depth_image is not None else None

is_color_available

is_color_available() -> bool

检查彩色图像是否可用。

返回:

名称 类型 描述
bool bool

是否可用。

源代码位于: src/controller/controller/domain/services/vision/camera_domain_service.py
267
268
269
270
271
272
273
def is_color_available(self) -> bool:
    """检查彩色图像是否可用。

    Returns:
        bool: 是否可用。
    """
    return self.color_connected and self.latest_color_image is not None

is_depth_available

is_depth_available() -> bool

检查深度图像是否可用。

返回:

名称 类型 描述
bool bool

是否可用。

源代码位于: src/controller/controller/domain/services/vision/camera_domain_service.py
275
276
277
278
279
280
281
def is_depth_available(self) -> bool:
    """检查深度图像是否可用。

    Returns:
        bool: 是否可用。
    """
    return self.depth_connected and self.latest_depth_image is not None

visualize_depth_image

visualize_depth_image(depth_image: ndarray) -> np.ndarray

深度图可视化为伪彩色图。

参数:

名称 类型 描述 默认
depth_image ndarray

原始深度图(16位或32位浮点)。

必需

返回:

类型 描述
ndarray

np.ndarray: 伪彩色深度图(BGR, uint8)。

源代码位于: src/controller/controller/domain/services/vision/camera_domain_service.py
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
def visualize_depth_image(self, depth_image: np.ndarray) -> np.ndarray:
    """深度图可视化为伪彩色图。

    Args:
        depth_image (np.ndarray): 原始深度图(16位或32位浮点)。

    Returns:
        np.ndarray: 伪彩色深度图(BGR, uint8)。
    """
    try:
        # 过滤无效值(depth <= 0 或 nan)
        valid_mask = (depth_image > 0) & np.isfinite(depth_image)

        if not np.any(valid_mask):
            # 如果没有有效数据,返回黑色图像
            depth_display = np.zeros_like(depth_image, dtype=np.uint8)
        else:
            # 归一化到 0-255
            min_val = np.min(depth_image[valid_mask])
            max_val = np.max(depth_image[valid_mask])

            if max_val - min_val < 1e-6:
                depth_display = np.zeros_like(depth_image, dtype=np.uint8)
            else:
                norm = (depth_image - min_val) / (max_val - min_val)
                norm[~valid_mask] = 0
                depth_display = (norm * 255).astype(np.uint8)

        # 应用伪彩色映射
        depth_colormap = cv2.applyColorMap(depth_display, cv2.COLORMAP_JET)
        return depth_colormap

    except Exception as e:
        # 如果可视化失败,返回黑色图像
        height, width = depth_image.shape[:2]
        return np.zeros((height, width, 3), dtype=np.uint8)

controller.domain.services.vision.recognition_domain_service

零件识别领域服务 - Domain层 管理ROS2检测结果订阅节点

DetectionSubscriberNode

DetectionSubscriberNode(callback)

Bases: Node

检测结果订阅节点。

属性:

名称 类型 描述
callback callable

检测结果回调函数。

subscription Subscription

ROS 订阅对象。

初始化检测结果订阅节点。

参数:

名称 类型 描述 默认
callback callable

检测结果回调函数。

必需
源代码位于: src/controller/controller/domain/services/vision/recognition_domain_service.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
def __init__(self, callback):
    """初始化检测结果订阅节点。

    Args:
        callback (callable): 检测结果回调函数。
    """
    super().__init__('detection_subscriber')
    self.callback = callback

    # 订阅检测结果话题
    self.subscription = self.create_subscription(
        DetectionResult,
        'recognition_result',
        self.detection_callback,
        10
    )

detection_callback

detection_callback(msg)

检测结果回调函数。

参数:

名称 类型 描述 默认
msg DetectionResult

ROS 检测结果消息。

必需
源代码位于: src/controller/controller/domain/services/vision/recognition_domain_service.py
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
def detection_callback(self, msg):
    """检测结果回调函数。

    Args:
        msg (DetectionResult): ROS 检测结果消息。
    """
    try:
        # 将ROS消息转换为字典格式
        detection_result = {
            'head_center': tuple(msg.head_center),
            'central_center': tuple(msg.central_center),
            'real_center': tuple(msg.real_center),
            'angle': msg.angle,
            'depth': msg.depth,
            'real_depth': msg.real_depth
        }

        # 调用回调函数
        if self.callback:
            self.callback(detection_result)

    except Exception as e:
        self.get_logger().error(f'处理检测结果失败: {e}')

RecognitionDomainService

RecognitionDomainService()

Bases: QObject

零件识别领域服务。

职责: - 订阅 recognition_result ROS 话题 - 维护最新检测结果(线程安全) - 独立于摄像头服务运行 - 发射检测结果信号

属性:

名称 类型 描述
detection_result_received pyqtSignal

检测结果接收信号,携带结果字典。

detection_status_changed pyqtSignal

检测状态变更信号,携带 (is_running, message)。

error_occurred pyqtSignal

错误信号,携带错误信息。

初始化零件识别领域服务。

源代码位于: src/controller/controller/domain/services/vision/recognition_domain_service.py
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
def __init__(self):
    """初始化零件识别领域服务。"""
    super().__init__()

    # ROS节点和执行器
    self.detection_subscriber = None
    self.ros_executor = None
    self.ros_thread = None

    # 检测结果存储(线程安全)
    self.latest_detection_result = {}
    self.detection_lock = threading.Lock()

    # 运行状态
    self.is_running = False

start_detection

start_detection() -> bool

开始检测(启动 ROS 订阅)。

返回:

名称 类型 描述
bool bool

启动成功返回 True,失败返回 False。

源代码位于: src/controller/controller/domain/services/vision/recognition_domain_service.py
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
def start_detection(self) -> bool:
    """开始检测(启动 ROS 订阅)。

    Returns:
        bool: 启动成功返回 True,失败返回 False。
    """
    try:
        if self.is_running:
            return True

        # 检查并初始化ROS(灵活策略,参考旧版)
        if not rclpy.ok():
            try:
                rclpy.init()
            except RuntimeError as e:
                # ROS已经初始化但状态异常
                self.error_occurred.emit(f"ROS状态异常: {str(e)}")
                return False
            except Exception as e:
                self.error_occurred.emit(f"ROS初始化失败: {str(e)}")
                return False

        # 创建订阅节点
        self.detection_subscriber = DetectionSubscriberNode(self._on_detection_received)

        # 创建executor
        self.ros_executor = SingleThreadedExecutor()
        self.ros_executor.add_node(self.detection_subscriber)

        # 在独立线程运行executor
        self.ros_thread = threading.Thread(target=self._ros_spin_worker, daemon=True)
        self.ros_thread.start()

        self.is_running = True
        self.detection_status_changed.emit(True, "开始零件识别")
        return True

    except Exception as e:
        self.error_occurred.emit(f"启动检测失败: {str(e)}")
        return False

stop_detection

stop_detection() -> bool

停止检测(关闭 ROS 订阅)。

返回:

名称 类型 描述
bool bool

停止成功返回 True,失败返回 False。

源代码位于: src/controller/controller/domain/services/vision/recognition_domain_service.py
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
def stop_detection(self) -> bool:
    """停止检测(关闭 ROS 订阅)。

    Returns:
        bool: 停止成功返回 True,失败返回 False。
    """
    try:
        if not self.is_running:
            return True

        self.is_running = False

        # 停止executor(只影响检测节点)
        if self.ros_executor:
            self.ros_executor.shutdown()

        # 销毁节点
        if self.detection_subscriber:
            self.detection_subscriber.destroy_node()
            self.detection_subscriber = None

        # 等待线程结束
        if self.ros_thread and self.ros_thread.is_alive():
            self.ros_thread.join(timeout=1.0)

        self.ros_executor = None
        self.ros_thread = None

        # 清空检测结果
        with self.detection_lock:
            self.latest_detection_result = {}

        self.detection_status_changed.emit(False, "停止零件识别")
        return True

    except Exception as e:
        self.error_occurred.emit(f"停止检测失败: {str(e)}")
        return False

get_latest_result

get_latest_result() -> Optional[Dict]

获取最新检测结果(线程安全)。

返回:

类型 描述
Optional[Dict]

Optional[Dict]: 检测结果字典,无结果返回 None。

源代码位于: src/controller/controller/domain/services/vision/recognition_domain_service.py
183
184
185
186
187
188
189
190
191
192
def get_latest_result(self) -> Optional[Dict]:
    """获取最新检测结果(线程安全)。

    Returns:
        Optional[Dict]: 检测结果字典,无结果返回 None。
    """
    with self.detection_lock:
        if self.latest_detection_result:
            return copy.deepcopy(self.latest_detection_result)
        return None

is_detection_running

is_detection_running() -> bool

检查检测是否正在运行。

返回:

名称 类型 描述
bool bool

是否正在运行。

源代码位于: src/controller/controller/domain/services/vision/recognition_domain_service.py
194
195
196
197
198
199
200
def is_detection_running(self) -> bool:
    """检查检测是否正在运行。

    Returns:
        bool: 是否正在运行。
    """
    return self.is_running

clear_result

clear_result()

清除检测结果。

源代码位于: src/controller/controller/domain/services/vision/recognition_domain_service.py
202
203
204
205
def clear_result(self):
    """清除检测结果。"""
    with self.detection_lock:
        self.latest_detection_result = {}