跳转至

Port Scanner

controller.infrastructure.communication.port_scanner

串口扫描器 - Infrastructure层 负责扫描可用的串口端口

PortScanner

串口端口扫描器。

负责扫描可用的串口端口,包含真实串口和虚拟串口。

scan_ports staticmethod

scan_ports() -> List[str]

扫描可用的串口端口(包含真实串口和虚拟串口)。

返回:

类型 描述
List[str]

List[str]: 可用端口名称列表。

源代码位于: src/controller/controller/infrastructure/communication/port_scanner.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
@staticmethod
def scan_ports() -> List[str]:
    """扫描可用的串口端口(包含真实串口和虚拟串口)。

    Returns:
        List[str]: 可用端口名称列表。
    """
    all_ports = []

    try:
        # 1. 扫描真实硬件串口
        ports = serial.tools.list_ports.comports()
        all_ports.extend([port.device for port in ports])

        # 2. 扫描虚拟串口(伪终端)
        virtual_port_patterns = [
            '/dev/pts/*',      # 伪终端
            '/tmp/tty*',       # 临时虚拟串口(socat创建的)
            '/dev/ttyV*',      # 虚拟串口设备
        ]

        for pattern in virtual_port_patterns:
            for device in glob.glob(pattern):
                # 检查是否是字符设备且可访问
                if os.path.exists(device) and PortScanner._is_accessible(device):
                    if device not in all_ports:
                        all_ports.append(device)

        # 排序:真实串口在前,虚拟串口在后
        all_ports.sort(key=lambda x: (not x.startswith('/dev/ttyUSB'), 
                                     not x.startswith('/dev/ttyACM'), 
                                     x))

        return all_ports

    except Exception:
        return all_ports

get_port_info staticmethod

get_port_info(port_name: str) -> Optional[Dict[str, str]]

获取指定端口的详细信息(支持真实串口和虚拟串口)。

参数:

名称 类型 描述 默认
port_name str

端口名称。

必需

返回:

类型 描述
Optional[Dict[str, str]]

Optional[Dict[str, str]]: 端口信息字典,包含 description 和 hwid。如果端口不可用则返回 None。

源代码位于: src/controller/controller/infrastructure/communication/port_scanner.py
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
@staticmethod
def get_port_info(port_name: str) -> Optional[Dict[str, str]]:
    """获取指定端口的详细信息(支持真实串口和虚拟串口)。

    Args:
        port_name (str): 端口名称。

    Returns:
        Optional[Dict[str, str]]: 端口信息字典,包含 description 和 hwid。如果端口不可用则返回 None。
    """
    try:
        # 1. 先尝试从真实硬件串口获取信息
        ports = serial.tools.list_ports.comports()
        for port in ports:
            if port.device == port_name:
                return {
                    'device': port.device,
                    'description': port.description or '',
                    'hwid': port.hwid or ''
                }

        # 2. 如果是虚拟串口,返回自定义信息
        if os.path.exists(port_name):
            description = "虚拟串口"

            # 根据路径判断类型
            if port_name.startswith('/tmp/'):
                description = "临时虚拟串口 (socat)"
            elif port_name.startswith('/dev/pts/'):
                description = "伪终端 (pty)"
            elif port_name.startswith('/dev/ttyV'):
                description = "虚拟串口设备"

            return {
                'device': port_name,
                'description': description,
                'hwid': 'VIRTUAL'
            }

        return None
    except Exception:
        return None

get_all_port_info staticmethod

get_all_port_info() -> List[Dict[str, str]]

获取所有可用端口的详细信息(包含真实串口和虚拟串口)。

返回:

类型 描述
List[Dict[str, str]]

List[Dict[str, str]]: 所有端口信息列表。

源代码位于: src/controller/controller/infrastructure/communication/port_scanner.py
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
@staticmethod
def get_all_port_info() -> List[Dict[str, str]]:
    """获取所有可用端口的详细信息(包含真实串口和虚拟串口)。

    Returns:
        List[Dict[str, str]]: 所有端口信息列表。
    """
    try:
        all_port_info = []

        # 获取所有端口
        all_ports = PortScanner.scan_ports()

        # 为每个端口获取信息
        for port_name in all_ports:
            port_info = PortScanner.get_port_info(port_name)
            if port_info:
                all_port_info.append(port_info)

        return all_port_info
    except Exception:
        return []