跳转至

Serial Adapter

controller.infrastructure.communication.serial_adapter

串口适配器 - Infrastructure层 负责管理串口连接和断开

SerialAdapter

SerialAdapter()

串口连接适配器。

属性:

名称 类型 描述
serial_port Optional[Serial]

PySerial 实例。

current_config Optional[Dict[str, Any]]

当前配置字典。

初始化串口适配器。

源代码位于: src/controller/controller/infrastructure/communication/serial_adapter.py
17
18
19
20
def __init__(self):
    """初始化串口适配器。"""
    self.serial_port: Optional[serial.Serial] = None
    self.current_config: Optional[Dict[str, Any]] = None

connect

connect(port: str, config: Dict[str, Any]) -> bool

连接串口。

参数:

名称 类型 描述 默认
port str

串口名称。

必需
config Dict[str, Any]

串口配置参数。

必需

返回:

名称 类型 描述
bool bool

连接是否成功。

源代码位于: src/controller/controller/infrastructure/communication/serial_adapter.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
def connect(self, port: str, config: Dict[str, Any]) -> bool:
    """连接串口。

    Args:
        port (str): 串口名称。
        config (Dict[str, Any]): 串口配置参数。

    Returns:
        bool: 连接是否成功。
    """
    try:
        # 如果已经连接,先断开
        if self.is_connected():
            self.disconnect()

        # 创建串口连接
        self.serial_port = serial.Serial(
            port=port,
            baudrate=config.get('baudrate', 115200),
            bytesize=config.get('bytesize', 8),
            parity=config.get('parity', 'N'),
            stopbits=config.get('stopbits', 1),
            timeout=config.get('timeout', 1),
            xonxoff=config.get('xonxoff', False),
            rtscts=config.get('rtscts', False),
            dsrdtr=config.get('dsrdtr', False)
        )

        # 保存配置
        self.current_config = config.copy()
        self.current_config['port'] = port

        return True

    except Exception:
        self.serial_port = None
        self.current_config = None
        return False

disconnect

disconnect() -> bool

断开串口连接。

返回:

名称 类型 描述
bool bool

断开是否成功。

源代码位于: src/controller/controller/infrastructure/communication/serial_adapter.py
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
def disconnect(self) -> bool:
    """断开串口连接。

    Returns:
        bool: 断开是否成功。
    """
    try:
        if self.serial_port and self.serial_port.is_open:
            self.serial_port.close()
        return True
    except Exception:
        return False
    finally:
        self.serial_port = None
        self.current_config = None

is_connected

is_connected() -> bool

检查串口是否已连接。

返回:

名称 类型 描述
bool bool

是否已连接。

源代码位于: src/controller/controller/infrastructure/communication/serial_adapter.py
77
78
79
80
81
82
83
84
def is_connected(self) -> bool:
    """检查串口是否已连接。

    Returns:
        bool: 是否已连接。
    """
    return (self.serial_port is not None and 
            self.serial_port.is_open)

get_serial_port

get_serial_port() -> Optional[serial.Serial]

获取串口对象。

返回:

类型 描述
Optional[Serial]

Optional[serial.Serial]: 串口对象,未连接时返回None。

源代码位于: src/controller/controller/infrastructure/communication/serial_adapter.py
86
87
88
89
90
91
92
93
94
def get_serial_port(self) -> Optional[serial.Serial]:
    """获取串口对象。

    Returns:
        Optional[serial.Serial]: 串口对象,未连接时返回None。
    """
    if self.is_connected():
        return self.serial_port
    return None

get_current_config

get_current_config() -> Optional[Dict[str, Any]]

获取当前连接配置。

返回:

类型 描述
Optional[Dict[str, Any]]

Optional[Dict[str, Any]]: 当前配置,未连接时返回 None。

源代码位于: src/controller/controller/infrastructure/communication/serial_adapter.py
 96
 97
 98
 99
100
101
102
def get_current_config(self) -> Optional[Dict[str, Any]]:
    """获取当前连接配置。

    Returns:
        Optional[Dict[str, Any]]: 当前配置,未连接时返回 None。
    """
    return self.current_config.copy() if self.current_config else None

get_port_name

get_port_name() -> Optional[str]

获取当前连接的端口名称。

返回:

类型 描述
Optional[str]

Optional[str]: 端口名称,未连接时返回 None。

源代码位于: src/controller/controller/infrastructure/communication/serial_adapter.py
104
105
106
107
108
109
110
111
112
def get_port_name(self) -> Optional[str]:
    """获取当前连接的端口名称。

    Returns:
        Optional[str]: 端口名称,未连接时返回 None。
    """
    if self.current_config:
        return self.current_config.get('port')
    return None