-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathGraphFileHandler.py
More file actions
129 lines (113 loc) · 4.69 KB
/
GraphFileHandler.py
File metadata and controls
129 lines (113 loc) · 4.69 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
from dataclasses import dataclass
from Graph import Graph
from Graph import Node
import sys
import threading
from tqdm import tqdm
class GraphFileHandler:
@staticmethod
def make_csv(predecessors: list[Node], file_name: str) -> None:
with open(file_name + ".csv", "w", encoding="UTF-8") as file:
file.write("value,lon,lat\n")
for predecessor in predecessors:
file.write(f"{predecessor.lon},{predecessor.lat}\n")
@staticmethod
def graph_from_files(
file_path_edges: str, file_path_nodes: str, file_path_interest, debug=False
) -> Graph:
with open(file_path_nodes, "r", encoding="UTF-8") as file_nodes:
args = file_nodes.readline()
args = args.split()
graph = Graph(int(args[0]))
if debug:
print(f"Reading {file_path_edges}...")
gen = tqdm(enumerate(file_nodes))
else:
gen = enumerate(file_nodes)
for index, line in gen:
values = line.split()
graph.graph[index].lat = float(values[1])
graph.graph[index].lon = float(values[2])
if debug:
print(f"Reading {file_path_nodes}...")
with open(file_path_edges, "r", encoding="UTF-8") as file_edges:
file_edges.readline()
if debug:
gen = tqdm(file_edges)
else:
gen = file_edges
for line in gen:
values = line.split()
graph.add_connection(int(values[0]), int(values[1]), int(values[2]))
if debug:
print(f"Reading {file_path_interest}...")
with open(file_path_interest, "r", encoding="UTF-8") as file_interest:
file_interest.readline()
if debug:
gen = tqdm(file_interest)
else:
gen = file_interest
for line in gen:
values = line.split()
graph.graph[int(values[0])].type = int(values[1])
return graph
@staticmethod
def pre_process(graph: Graph, landmarks: list[int], directory: str, debug=True) -> None:
GraphFileHandler._pre_process_graph(
graph, landmarks, directory + "/preprocess.alt.to", silent = not debug
)
graph = graph.reverse()
GraphFileHandler._pre_process_graph(
graph, landmarks, directory + "/preprocess.alt.from", silent= not debug
)
@staticmethod
def pre_process_multithreaded(
graph: Graph, landmarks: list[int], directory: str
) -> None:
for index, landmark in enumerate(landmarks):
threading.Thread(
target=GraphFileHandler._pre_process_graph_multithreaded,
args=(graph, index, landmark, directory + "/preprocess.alt.to"),
).start()
graph = graph.reverse()
for index, landmark in enumerate(landmarks):
threading.Thread(
target=GraphFileHandler._pre_process_graph_multithreaded,
args=(graph, index, landmark, directory + "/preprocess.alt.from"),
).start()
@staticmethod
def _pre_process_graph(graph: Graph, landmarks: list[int], file_name: str, silent) -> None:
for index, landmark in enumerate(landmarks):
distances = graph.dijikstra_pre_process(landmark, silent=silent)
GraphFileHandler._write_pre_process(f"{file_name}.{index}", distances)
del distances
@staticmethod
def _pre_process_graph_multithreaded(
graph: Graph, index: int, landmark: int, file_name: str
) -> None:
distances = graph.dijikstra_pre_process(landmark)
GraphFileHandler._write_pre_process(f"{file_name}.{index}", distances)
@staticmethod
def _write_pre_process(file_path: str, distances: list[int]) -> None:
with open(file_path, "w", encoding="UTF-8") as file:
for distance in distances:
if distance == float("inf"):
distance = sys.maxsize
file.write(f"{distance}\n")
@staticmethod
def read_pre_process(
file_path: str, landmarks: int, nodes: int, debug: bool = False
) -> list[list[int]]:
data = [[None] * landmarks for _ in range(nodes)]
for i in range(landmarks):
full_path = file_path + "." + str(i)
if debug:
print(f"Reading {full_path}...")
with open(full_path, "r", encoding="UTF-8") as file:
if debug:
gen = tqdm(enumerate(file))
else:
gen = enumerate(file)
for index, line in gen:
data[index][i] = int(line)
return data