From 20b49befc385deba3d90ac8fbfa0e741effe8fe6 Mon Sep 17 00:00:00 2001 From: Tan Chian Fern Date: Sun, 1 Mar 2026 23:03:51 +0800 Subject: [PATCH 1/3] Add test. Signed-off-by: Tan Chian Fern --- centralized/test/test_cbs.py | 41 ++++++++++++++++++++++++++++++++++++ requirements.txt | 4 ++++ 2 files changed, 45 insertions(+) create mode 100644 centralized/test/test_cbs.py diff --git a/centralized/test/test_cbs.py b/centralized/test/test_cbs.py new file mode 100644 index 00000000..cac14baf --- /dev/null +++ b/centralized/test/test_cbs.py @@ -0,0 +1,41 @@ +import os +import sys + +import pytest +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) + +from cbs.cbs import Environment, CBS + +@pytest.mark.timeout(5) # Timeout after 5 seconds +def test_cbs_two_agents(): + + """ + X X O X X + a b O B A + + a, b: agent starts + A, B: agent goals + X: obstacle + O: space + """ + dimension = [5, 2] + obstacles = [(0, 1), (1,1), (3,1), (4,1)] + + agents = [ + {"start": [0, 0], "goal": [4, 0], "name": "A"}, + {"start": [1, 0], "goal": [3, 0], "name": "B"}, + ] + + + env = Environment(dimension, agents, obstacles) + cbs = CBS(env) + + solution = cbs.search() + + assert solution is not None + + assert solution["A"][-1]["x"] == 4 + assert solution["A"][-1]["y"] == 0 + + assert solution["B"][-1]["x"] == 3 + assert solution["B"][-1]["y"] == 0 diff --git a/requirements.txt b/requirements.txt index b09d01b5..0fdb6433 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,8 @@ +lark numpy matplotlib scipy ffmpeg-python +pyyaml +pytest +pytest-timeout From 35841c9ceb6fba7a832b6745a9f89a49bd328770 Mon Sep 17 00:00:00 2001 From: Tan Chian Fern Date: Sun, 1 Mar 2026 23:59:41 +0800 Subject: [PATCH 2/3] Pass the Environment object to Astar. Signed-off-by: Tan Chian Fern --- centralized/cbs/a_star.py | 26 ++++++++------- centralized/cbs/cbs.py | 39 ++-------------------- centralized/cbs/constraints.py | 59 ++++++++++++++++++++++++++++++++++ 3 files changed, 76 insertions(+), 48 deletions(-) create mode 100644 centralized/cbs/constraints.py diff --git a/centralized/cbs/a_star.py b/centralized/cbs/a_star.py index f73f9c9e..b27c046b 100644 --- a/centralized/cbs/a_star.py +++ b/centralized/cbs/a_star.py @@ -6,12 +6,16 @@ """ -class AStar(): - def __init__(self, env): - self.agent_dict = env.agent_dict - self.admissible_heuristic = env.admissible_heuristic - self.is_at_goal = env.is_at_goal - self.get_neighbors = env.get_neighbors +from typing import TYPE_CHECKING + +from cbs.constraints import Constraints, VertexConstraint + +if TYPE_CHECKING: + from cbs.cbs import Environment + +class AStar: + def __init__(self, env: 'Environment'): + self.env = env def reconstruct_path(self, came_from, current): total_path = [current] @@ -24,7 +28,7 @@ def search(self, agent_name): """ low level search """ - initial_state = self.agent_dict[agent_name]["start"] + initial_state = self.env.agent_dict[agent_name]["start"] step_cost = 1 closed_set = set() @@ -37,19 +41,19 @@ def search(self, agent_name): f_score = {} - f_score[initial_state] = self.admissible_heuristic(initial_state, agent_name) + f_score[initial_state] = self.env.admissible_heuristic(initial_state, agent_name) while open_set: temp_dict = {open_item:f_score.setdefault(open_item, float("inf")) for open_item in open_set} current = min(temp_dict, key=temp_dict.get) - if self.is_at_goal(current, agent_name): + if self.env.is_at_goal(current, agent_name): return self.reconstruct_path(came_from, current) open_set -= {current} closed_set |= {current} - neighbor_list = self.get_neighbors(current) + neighbor_list = self.env.get_neighbors(current) for neighbor in neighbor_list: if neighbor in closed_set: @@ -65,6 +69,6 @@ def search(self, agent_name): came_from[neighbor] = current g_score[neighbor] = tentative_g_score - f_score[neighbor] = g_score[neighbor] + self.admissible_heuristic(neighbor, agent_name) + f_score[neighbor] = g_score[neighbor] + self.env.admissible_heuristic(neighbor, agent_name) return False diff --git a/centralized/cbs/cbs.py b/centralized/cbs/cbs.py index c0a4de67..f62a1869 100644 --- a/centralized/cbs/cbs.py +++ b/centralized/cbs/cbs.py @@ -14,6 +14,8 @@ from copy import deepcopy from cbs.a_star import AStar +from cbs.constraints import Constraints, EdgeConstraint, VertexConstraint + class Location(object): def __init__(self, x=-1, y=-1): @@ -54,43 +56,6 @@ def __str__(self): return '(' + str(self.time) + ', ' + self.agent_1 + ', ' + self.agent_2 + \ ', '+ str(self.location_1) + ', ' + str(self.location_2) + ')' -class VertexConstraint(object): - def __init__(self, time, location): - self.time = time - self.location = location - - def __eq__(self, other): - return self.time == other.time and self.location == other.location - def __hash__(self): - return hash(str(self.time)+str(self.location)) - def __str__(self): - return '(' + str(self.time) + ', '+ str(self.location) + ')' - -class EdgeConstraint(object): - def __init__(self, time, location_1, location_2): - self.time = time - self.location_1 = location_1 - self.location_2 = location_2 - def __eq__(self, other): - return self.time == other.time and self.location_1 == other.location_1 \ - and self.location_2 == other.location_2 - def __hash__(self): - return hash(str(self.time) + str(self.location_1) + str(self.location_2)) - def __str__(self): - return '(' + str(self.time) + ', '+ str(self.location_1) +', '+ str(self.location_2) + ')' - -class Constraints(object): - def __init__(self): - self.vertex_constraints = set() - self.edge_constraints = set() - - def add_constraint(self, other): - self.vertex_constraints |= other.vertex_constraints - self.edge_constraints |= other.edge_constraints - - def __str__(self): - return "VC: " + str([str(vc) for vc in self.vertex_constraints]) + \ - "EC: " + str([str(ec) for ec in self.edge_constraints]) class Environment(object): def __init__(self, dimension, agents, obstacles): diff --git a/centralized/cbs/constraints.py b/centralized/cbs/constraints.py new file mode 100644 index 00000000..99571548 --- /dev/null +++ b/centralized/cbs/constraints.py @@ -0,0 +1,59 @@ +class Constraints(object): + def __init__(self): + self.vertex_constraints = set() + self.edge_constraints = set() + + def add_constraint(self, other): + self.vertex_constraints |= other.vertex_constraints + self.edge_constraints |= other.edge_constraints + + def __str__(self): + return ( + "VC: " + + str([str(vc) for vc in self.vertex_constraints]) + + "EC: " + + str([str(ec) for ec in self.edge_constraints]) + ) + + +class VertexConstraint(object): + def __init__(self, time, location): + self.time = time + self.location = location + + def __eq__(self, other): + return self.time == other.time and self.location == other.location + + def __hash__(self): + return hash(str(self.time) + str(self.location)) + + def __str__(self): + return "(" + str(self.time) + ", " + str(self.location) + ")" + + +class EdgeConstraint(object): + def __init__(self, time, location_1, location_2): + self.time = time + self.location_1 = location_1 + self.location_2 = location_2 + + def __eq__(self, other): + return ( + self.time == other.time + and self.location_1 == other.location_1 + and self.location_2 == other.location_2 + ) + + def __hash__(self): + return hash(str(self.time) + str(self.location_1) + str(self.location_2)) + + def __str__(self): + return ( + "(" + + str(self.time) + + ", " + + str(self.location_1) + + ", " + + str(self.location_2) + + ")" + ) \ No newline at end of file From 2de034fa7d808e306a5eaad185fde36922c946c2 Mon Sep 17 00:00:00 2001 From: Tan Chian Fern Date: Mon, 2 Mar 2026 00:05:10 +0800 Subject: [PATCH 3/3] Check vertex constraints when an agent has reached its goal. Signed-off-by: Tan Chian Fern --- centralized/cbs/a_star.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/centralized/cbs/a_star.py b/centralized/cbs/a_star.py index b27c046b..9be55664 100644 --- a/centralized/cbs/a_star.py +++ b/centralized/cbs/a_star.py @@ -48,7 +48,12 @@ def search(self, agent_name): current = min(temp_dict, key=temp_dict.get) if self.env.is_at_goal(current, agent_name): - return self.reconstruct_path(came_from, current) + conflict = any( + vc.time >= current.time and vc.location == current.location + for vc in self.env.constraints.vertex_constraints + ) + if not conflict: + return self.reconstruct_path(came_from, current) open_set -= {current} closed_set |= {current}