-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathgbfs_lazy.py
More file actions
219 lines (160 loc) · 7.17 KB
/
gbfs_lazy.py
File metadata and controls
219 lines (160 loc) · 7.17 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
"""
Custom lazy Greedy Best-First Search (GBFS) with customization points explained.
Tyr also provides an off-the-shelf implementation, which is almost identical
to the astar_eager.py example. Please familiarize yourself with astar_eager.py
before exploring this example.
Example usage (run from the repository root):
python3 python/examples/planning/gbfs_lazy.py \
-d data/tests/classical/gripper/domain.pddl \
-p data/tests/classical/gripper/test-1.pddl
Author: Dominik Drexler (dominik.drexler@liu.se)
"""
import argparse
from pathlib import Path
from queue import PriorityQueue
from dataclasses import dataclass
from enum import Enum
from pytyr.common import (
ExecutionContext
)
from pytyr.formalism.planning import (
ParserOptions,
Parser
)
from pytyr.planning import (
SearchStatus
)
from pytyr.planning.lifted import (
StateIndex,
SearchResult,
SuccessorGenerator,
Heuristic,
FFRPGHeuristic,
PruningStrategy,
GoalStrategy,
TaskGoalStrategy,
State,
Node,
LabeledNode,
Plan,
Task
)
class SearchNodeStatus(Enum):
NEW = 0
OPEN = 1
CLOSED = 2
DEADEND = 3
GOAL = 4
@dataclass
class SearchNode:
g_value : float
parent : StateIndex | None
status : SearchNodeStatus
def backtrack_plan(goal_node : Node, goal_search_node : SearchNode, search_nodes : dict[StateIndex, SearchNode], successor_generator : SuccessorGenerator):
"""
Backtracks from the goal search node to compute the plan inducing state trajectory,
followed by computing the ground actions connecting subsequent state in the trajectory.
"""
cur_state_index = goal_node.get_state().get_index()
backward_state_trajectory = []
while cur_state_index is not None:
backward_state_trajectory.append(cur_state_index)
search_node = search_nodes.get(cur_state_index)
cur_state_index = search_node.parent
forward_state_trajectory = list(reversed(backward_state_trajectory))
assert(forward_state_trajectory)
node = successor_generator.get_node(forward_state_trajectory[0])
labeled_succ_nodes : list[LabeledNode] = []
for i in range(len(forward_state_trajectory) - 1):
for labeled_succ_node in successor_generator.get_labeled_successor_nodes(node):
succ_node = labeled_succ_node.node
succ_state = succ_node.get_state()
succ_state_index = succ_state.get_index()
if succ_state_index == forward_state_trajectory[i + 1]:
labeled_succ_nodes.append(labeled_succ_node)
node = labeled_succ_node.node
return Plan(node, labeled_succ_nodes)
def find_solution(task : Task, successor_generator : SuccessorGenerator, heuristic : Heuristic) -> SearchResult:
state_repository = successor_generator.get_state_repository()
goal_strategy = TaskGoalStrategy(task)
search_result = SearchResult()
if not goal_strategy.is_static_goal_satisfied():
search_result.goal_node = None
search_result.plan = None
search_result.status = SearchStatus.UNSOLVABLE
return search_result
initial_node = successor_generator.get_initial_node()
initial_state = initial_node.get_state()
if goal_strategy.is_dynamic_goal_satisfied(initial_state):
search_result.goal_node = initial_node
search_result.plan = Plan(initial_node)
search_result.status = SearchStatus.SOLVED
return search_result
initial_h_value = heuristic.evaluate(initial_state)
initial_g_value = initial_node.get_metric()
initial_state_index = initial_state.get_index()
search_nodes : dict[StateIndex, SearchNode] = dict()
search_nodes[initial_state_index] = SearchNode(initial_g_value, None, SearchNodeStatus.NEW)
queue = PriorityQueue()
# Note: we insert cheap state indices to allow memory to be returned to the pool.
# We can always retrieve a state by calling state_repository.get_registered_state(i) for a StateIndex i.
queue.put((initial_h_value, initial_g_value, initial_state_index))
while not queue.empty():
h_value, g_value, state_index = queue.get()
state = state_repository.get_registered_state(state_index)
node = Node(state, g_value)
search_node = search_nodes.get(state_index)
state_h_value = heuristic.evaluate(state)
preferred_actions = heuristic.get_preferred_actions()
search_node.status = SearchNodeStatus.CLOSED
if state_h_value == float("inf"):
search_node.status = SearchNodeStatus.DEADEND
continue
for labeled_succ_node in successor_generator.get_labeled_successor_nodes(node):
succ_node = labeled_succ_node.node
succ_state = succ_node.get_state()
succ_g_value = succ_node.get_metric()
succ_state_index = succ_state.get_index()
if succ_state_index not in search_nodes:
search_nodes[succ_state_index] = SearchNode(succ_g_value, state_index, SearchNodeStatus.NEW)
succ_search_node : SearchNode = search_nodes.get(succ_state_index)
if not succ_search_node.status == SearchNodeStatus.NEW:
continue
succ_search_node.parent = state_index
action = labeled_succ_node.label
if action in preferred_actions:
pass # the heuristic marked this labeled successor node as preferred
if goal_strategy.is_dynamic_goal_satisfied(succ_state):
search_result.goal_node = succ_node
search_result.plan = backtrack_plan(succ_node, succ_search_node, search_nodes, successor_generator)
search_result.status = SearchStatus.SOLVED
return search_result
succ_search_node.status = SearchNodeStatus.OPEN
queue.put((state_h_value, succ_g_value, succ_state_index))
search_result.goal_node = None
search_result.plan = None
search_result.status = SearchStatus.EXHAUSTED
return search_result
def main():
arg_parser = argparse.ArgumentParser(description="GBFS Lazy Search.")
arg_parser.add_argument("-d", "--domain-filepath", type=Path, required=True, help="Path to a PDDL domain file.")
arg_parser.add_argument("-p", "--task-filepath", type=Path, required=True, help="Path to PDDL task file.")
args = arg_parser.parse_args()
domain_filepath : Path = args.domain_filepath
task_filepath : Path = args.task_filepath
execution_context = ExecutionContext(2)
parser_options = ParserOptions()
parser = Parser(domain_filepath, parser_options)
lifted_task = Task(parser.parse_task(task_filepath, parser_options))
heuristic = FFRPGHeuristic(lifted_task, execution_context)
successor_generator = SuccessorGenerator(lifted_task, execution_context)
search_result = find_solution(lifted_task, successor_generator, heuristic)
print("Search status:", search_result.status)
plan = search_result.plan
if plan is not None:
print(f"Found plan with length {plan.get_length()} and cost {plan.get_cost()}")
print(plan)
else:
print("No solution was found.")
if __name__ == "__main__":
main()