跳转至

Trajectory Repository

controller.infrastructure.persistence.trajectory_repository

轨迹数据仓库 - Infrastructure层

TrajectoryRepository

TrajectoryRepository(plans_dir: str = './plans')

轨迹数据仓库 - Infrastructure层。

职责: 1. 保存轨迹数据到文件系统 2. 加载轨迹数据 3. 管理存储目录

属性:

名称 类型 描述
plans_dir Path

轨迹存储目录。

初始化轨迹仓库。

参数:

名称 类型 描述 默认
plans_dir str

存储目录. Defaults to "./plans".

'./plans'
源代码位于: src/controller/controller/infrastructure/persistence/trajectory_repository.py
22
23
24
25
26
27
28
29
def __init__(self, plans_dir: str = "./plans"):
    """初始化轨迹仓库。

    Args:
        plans_dir (str, optional): 存储目录. Defaults to "./plans".
    """
    self.plans_dir = Path(plans_dir)
    self.plans_dir.mkdir(exist_ok=True)

save_trajectory

save_trajectory(filename: str, positions: List[List[float]]) -> None

保存轨迹数据。

参数:

名称 类型 描述 默认
filename str

文件名(不含扩展名)。

必需
positions List[List[float]]

轨迹点序列 [[q1,...,q6], ...]。

必需
源代码位于: src/controller/controller/infrastructure/persistence/trajectory_repository.py
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
def save_trajectory(
    self, 
    filename: str, 
    positions: List[List[float]]
) -> None:
    """保存轨迹数据。

    Args:
        filename (str): 文件名(不含扩展名)。
        positions (List[List[float]]): 轨迹点序列 [[q1,...,q6], ...]。
    """
    clean_name = self._sanitize_filename(filename)
    filepath = self.plans_dir / f"{clean_name}.json"

    # 将numpy数组转换为Python列表
    positions_serializable = self._convert_to_serializable(positions)

    with open(filepath, 'w', encoding='utf-8') as f:
        json.dump(positions_serializable, f, indent=2, ensure_ascii=False)

load_trajectory

load_trajectory(filename: str) -> List[List[float]]

加载轨迹数据。

参数:

名称 类型 描述 默认
filename str

文件名(不含扩展名)。

必需

返回:

类型 描述
List[List[float]]

List[List[float]]: 轨迹点序列。

源代码位于: src/controller/controller/infrastructure/persistence/trajectory_repository.py
51
52
53
54
55
56
57
58
59
60
61
62
def load_trajectory(self, filename: str) -> List[List[float]]:
    """加载轨迹数据。

    Args:
        filename (str): 文件名(不含扩展名)。

    Returns:
        List[List[float]]: 轨迹点序列。
    """
    filepath = self.plans_dir / f"{filename}.json"
    with open(filepath, 'r', encoding='utf-8') as f:
        return json.load(f)

list_trajectory_files

list_trajectory_files() -> List[str]

列出所有可用的轨迹文件。

返回:

名称 类型 描述
List[str]

List[str]: 文件名列表(不带扩展名)。

例如 List[str]

["默认方案-0", "默认方案-1", "test_trajectory"]

源代码位于: src/controller/controller/infrastructure/persistence/trajectory_repository.py
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
def list_trajectory_files(self) -> List[str]:
    """列出所有可用的轨迹文件。

    Returns:
        List[str]: 文件名列表(不带扩展名)。
        例如: ["默认方案-0", "默认方案-1", "test_trajectory"]
    """
    if not self.plans_dir.exists():
        return []

    files = []
    for file in self.plans_dir.glob("*.json"):
        files.append(file.stem)  # 文件名不带扩展名

    return sorted(files)

trajectory_exists

trajectory_exists(filename: str) -> bool

检查轨迹文件是否存在。

参数:

名称 类型 描述 默认
filename str

文件名(不含扩展名)。

必需

返回:

名称 类型 描述
bool bool

True=文件存在, False=文件不存在。

源代码位于: src/controller/controller/infrastructure/persistence/trajectory_repository.py
80
81
82
83
84
85
86
87
88
89
90
def trajectory_exists(self, filename: str) -> bool:
    """检查轨迹文件是否存在。

    Args:
        filename (str): 文件名(不含扩展名)。

    Returns:
        bool: True=文件存在, False=文件不存在。
    """
    filepath = self.plans_dir / f"{filename}.json"
    return filepath.exists()