-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathgenerator.py
More file actions
297 lines (254 loc) · 12.2 KB
/
generator.py
File metadata and controls
297 lines (254 loc) · 12.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
from workflow import WorkflowDAG
from collections import defaultdict
from visualize import visualize_graph
import random
import os
import zlib, random
seed = zlib.crc32("捡破烂".encode('utf-8')) & 0xffffffff
print(f"seed: {seed}")
random.seed(seed)
# 按比例生成的集中配置
NUM_WORKFLOWS = 100
OUTPUT_DIR = f"multi_workflows_{NUM_WORKFLOWS}"
GENERATION_CONFIG = {
"fixed": {
"num_workflows": NUM_WORKFLOWS,
"output_dir": OUTPUT_DIR,
"merge_final": True
},
"distributions": {
"layers": {1: 0.10, 2: 0.40, 3: 0.35, 4: 0.15},
"tool_call_counts_per_layer": {0: 0.10, 1: 0.15, 2: 0.30, 3: 0.25, 4: 0.15, 5: 0.05},
"merge_after_tool": {True: 0.60, False: 0.40},
"time_range": {10: 0.50, 60: 0.30, 300: 0.15, 1200: 0.05},
"max_all_zero_ratio": 0.10
}
}
class WorkflowGenerator:
def __init__(self, wf_id):
self.dag = WorkflowDAG()
self.dag.wf_id = wf_id
self.node_counter = defaultdict(int)
self.available_models = [
"gpt-4", "llama-3-70b", "claude-3-opus",
"mistral-large", "gemini-1.5-pro"
]
def generate_id(self, task_type):
self.node_counter[task_type] += 1
return f"workflow_{self.dag.wf_id}_{task_type}_{self.node_counter[task_type]}"
def create_prefill_decoding_pair(self, prefix=None):
prefill_id = self.generate_id("prefill") if not prefix else f"{prefix}_prefill"
decoding_id = self.generate_id("decoding") if not prefix else f"{prefix}_decoding"
prefill_node = self.dag.add_node(prefill_id, "prefill")
decoding_node = self.dag.add_node(decoding_id, "decoding")
self.dag.add_edge(prefill_id, decoding_id)
prefill_info = self.dag.get_info(prefill_id)
decoding_info = self.dag.get_info(decoding_id)
model_name = random.choice(self.available_models)
prefill_tokens = random.randint(2000, 3000)
prefill_info.model_name = model_name
prefill_info.predicted_tokens = prefill_tokens
prefill_info.actual_tokens = prefill_tokens
decoding_tokens = random.randint(2500, 4500)
decoding_info.model_name = model_name
decoding_info.predicted_tokens = decoding_tokens
deviation = random.uniform(-0.2, 0.2) * decoding_tokens
decoding_info.actual_tokens = max(10, int(decoding_tokens + deviation))
return prefill_node, decoding_node
def create_tool_calls(self, parent_node, count=3):
"""创建多个并行tool_call节点"""
tool_calls = []
for i in range(count):
tool_id = self.generate_id("tool_call")
tool_node = self.dag.add_node(tool_id, "tool_call")
self.dag.add_edge(parent_node.task_id, tool_id)
tool_info = self.dag.get_info(tool_id)
tool_info.cpus = random.randint(1, 8)
predicted_time = random.uniform(0.5, 5.0)
tool_info.predicted_time = predicted_time
deviation = random.uniform(-0.2, 0.2) * predicted_time
tool_info.actual_time = max(0.01, predicted_time + deviation)
tool_calls.append(tool_node)
return tool_calls
def build_multi_layer_workflow(self, layers=2, tool_call_counts=[3, 2],
merge_final=True, merge_after_tool=True, arrival_time=0):
"""
构建多层工作流,支持tool_call_count=0和分支选项
:param layers: 工作流层数(不包括最后的总结层)
:param tool_call_counts: 每层的tool_call数量列表,0表示跳过tool_call直接进入下一层
:param merge_final: 是否将所有分支合并到最终的总结节点
:param merge_after_tool: 是否将同一层的tool calls合并到同一个prefill节点
"""
if len(tool_call_counts) < layers:
tool_call_counts += [2] * (layers - len(tool_call_counts))
self.dag.arrival_time = arrival_time
start_prefill, start_decoding = self.create_prefill_decoding_pair(f"workflow_{self.dag.wf_id}_start")
self.dag.root = start_prefill
current_decoding_nodes = [start_decoding]
for layer in range(layers):
next_prefill_nodes = []
count = tool_call_counts[layer]
for decoding_node in current_decoding_nodes:
if count > 0:
tool_calls = self.create_tool_calls(decoding_node, count)
if merge_after_tool:
prefill, _ = self.create_prefill_decoding_pair(
f"workflow_{self.dag.wf_id}_layer{layer+1}_merged"
)
for tool in tool_calls:
self.dag.add_edge(tool.task_id, prefill.task_id)
next_prefill_nodes.append(prefill)
else:
for tool in tool_calls:
prefill, _ = self.create_prefill_decoding_pair(
f"{tool.task_id}_layer{layer+1}"
)
self.dag.add_edge(tool.task_id, prefill.task_id)
next_prefill_nodes.append(prefill)
else:
prefill, _ = self.create_prefill_decoding_pair(
f"workflow_{self.dag.wf_id}_direct_layer{layer+1}"
)
self.dag.add_edge(decoding_node.task_id, prefill.task_id)
next_prefill_nodes.append(prefill)
current_decoding_nodes = []
for prefill in next_prefill_nodes:
decoding = self.dag.get_children(prefill.task_id)[0]
current_decoding_nodes.append(decoding)
if merge_final:
final_prefill, final_decoding = self.create_prefill_decoding_pair(f"workflow_{self.dag.wf_id}_final")
self.dag.final_node = final_decoding
for decoding_node in current_decoding_nodes:
self.dag.add_edge(decoding_node.task_id, final_prefill.task_id)
else:
self.dag.final_node = None
for decoding_node in current_decoding_nodes:
branch_final_prefill, branch_final_decoding = self.create_prefill_decoding_pair(
f"{decoding_node.task_id}_branch_final"
)
self.dag.add_edge(decoding_node.task_id, branch_final_prefill.task_id)
return self.dag
def generate_multiple_workflows(num_workflows, output_dir="workflows", layers=2,
tool_call_counts=[3, 2], merge_final=True,
merge_after_tool=True, time_range=10):
"""
生成多个工作流并保存到文件
:param num_workflows: 要生成的工作流数量
:param output_dir: 输出目录
:param layers: 工作流层数
:param tool_call_counts: 每层的tool_call数量列表
:param merge_final: 是否合并最终节点
:param merge_after_tool: 是否合并tool calls
:param time_range: 到达时间分布的时间范围
"""
# 创建输出目录
os.makedirs(output_dir, exist_ok=True)
print(f"Generating {num_workflows} workflows...")
for i in range(num_workflows):
arrival_time = random.randint(0, time_range)
generator = WorkflowGenerator(f"wf_{i}")
dag = generator.build_multi_layer_workflow(
layers=layers,
tool_call_counts=tool_call_counts,
merge_final=merge_final,
merge_after_tool=merge_after_tool,
arrival_time=arrival_time
)
filename = os.path.join(output_dir, f"workflow_{i+1:03d}.json")
dag.save_to_json(filename)
print(f" Saved workflow {i+1} to {filename} (Arrival Time: {arrival_time} s)")
print(f"Successfully generated {num_workflows} workflows in '{output_dir}' directory.")
# 按分布采样的辅助函数
def sample_from_distribution(dist):
items = list(dist.items())
if not items:
raise ValueError("Empty distribution")
total = sum(float(w) for _, w in items)
if total <= 0:
raise ValueError("Non-positive distribution total weight")
r = random.random() * total
acc = 0.0
for key, weight in items:
acc += float(weight)
if r <= acc:
return key
return items[-1][0]
def sample_tool_calls_for_layers(num_layers, dist):
return [sample_from_distribution(dist) for _ in range(num_layers)]
def generate_multiple_workflows_from_config(config):
fixed = config.get("fixed", {})
dists = config.get("distributions", {})
num_workflows = int(fixed.get("num_workflows", 2048))
output_dir = fixed.get("output_dir", "multi_workflows")
merge_final = bool(fixed.get("merge_final", True))
layers_dist = dists.get("layers", {2: 1.0})
tool_dist = dists.get("tool_call_counts_per_layer", {2: 1.0})
merge_after_tool_dist = dists.get("merge_after_tool", {True: 1.0})
time_range_dist = dists.get("time_range", {10: 1.0})
max_all_zero_ratio = float(dists.get("max_all_zero_ratio", 0.10))
os.makedirs(output_dir, exist_ok=True)
allowed_all_zero = int(num_workflows * max_all_zero_ratio)
count_all_zero = 0
print(f"Generating {num_workflows} workflows from config...")
for i in range(num_workflows):
layers = int(sample_from_distribution(layers_dist))
tool_call_counts = sample_tool_calls_for_layers(layers, tool_dist)
is_all_zero = all(int(c) == 0 for c in tool_call_counts)
if is_all_zero and count_all_zero >= allowed_all_zero:
# 限制全0比例,尝试重采样/强制至少一个非0
for _ in range(3):
tool_call_counts = sample_tool_calls_for_layers(layers, tool_dist)
if not all(int(c) == 0 for c in tool_call_counts):
is_all_zero = False
break
if is_all_zero and layers > 0:
idx = random.randrange(layers)
for _ in range(5):
v = int(sample_from_distribution(tool_dist))
if v > 0:
tool_call_counts[idx] = v
is_all_zero = False
break
if is_all_zero:
count_all_zero += 1
merge_after_tool = bool(sample_from_distribution(merge_after_tool_dist))
time_range = int(sample_from_distribution(time_range_dist))
arrival_time = random.randint(0, time_range)
generator = WorkflowGenerator(f"wf_{i}")
dag = generator.build_multi_layer_workflow(
layers=layers,
tool_call_counts=[int(x) for x in tool_call_counts],
merge_final=merge_final,
merge_after_tool=merge_after_tool,
arrival_time=arrival_time
)
filename = os.path.join(output_dir, f"workflow_{i+1:03d}.json")
dag.save_to_json(filename)
print(f" Saved workflow {i+1} to {filename} (Arrival Time: {arrival_time} s)")
print(f"Successfully generated {num_workflows} workflows in '{output_dir}' directory (config mode).")
def load_workflows_from_directory(directory):
workflows = []
json_files = [f for f in os.listdir(directory) if f.endswith('.json')]
print(f"Loading workflows from '{directory}'...")
for filename in json_files:
filepath = os.path.join(directory, filename)
try:
dag = WorkflowDAG.load_from_json(filepath)
workflows.append(dag)
print(f" Loaded workflow from {filename} (Arrival Time: {dag.arrival_time} s)")
except Exception as e:
print(f" Error loading {filename}: {str(e)}")
print(f"Successfully loaded {len(workflows)} workflows.")
return workflows
if __name__ == "__main__":
# 保留原有演示入口(小样本)
# generate_multiple_workflows(
# num_workflows=5,
# output_dir="multi_workflows",
# layers=2,
# tool_call_counts=[2, 2],
# merge_after_tool=False,
# time_range=10
# )
# 新增:基于比例配置的批量生成入口
generate_multiple_workflows_from_config(GENERATION_CONFIG)