-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathResourceManager.py
More file actions
160 lines (133 loc) · 5.82 KB
/
ResourceManager.py
File metadata and controls
160 lines (133 loc) · 5.82 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
import json
class ResourceManager:
def __init__(self, resources_file="config/resources.json", models_file="config/models.json"):
with open(resources_file) as f:
resources = json.load(f)
self.total_cpu_cores = resources["cpu_cores"]
self.available_cpu_cores = resources["cpu_cores"]
# 展开所有NPU
self.npus = {}
for npu_group in resources["npus"]:
for i in range(npu_group["count"]):
npu_id = f"{npu_group['type']}-{i}"
self.npus[npu_id] = {
"id": npu_id,
"type": npu_group["type"],
"decoding_scaling_factor": npu_group["decoding_scaling_factor"],
"prefill_scaling_factor": npu_group["prefill_scaling_factor"],
"available": True
}
self.deployed_models = {} # model_id -> model_info
self.next_model_id = 1
with open(models_file) as f:
self.model_configs = {model["model_name"]: model for model in json.load(f)["models"]}
def deploy_model(self, timestamp, model_name, npu_ids):
"""部署模型到指定的NPU集合"""
if model_name not in self.model_configs:
return None
config = self.model_configs[model_name]
required_cpus = config["required_cpus"]
# 检查所有NPU是否可用
for npu_id in npu_ids:
if not self.npus[npu_id]["available"]:
return None
# 检查所有NPU是否为同一类型
npu_types = set(self.npus[npu_id]["type"] for npu_id in npu_ids)
if len(npu_types) > 1:
print(f"[Simulator] Error: Cannot deploy model {model_name} on mixed NPU types: {npu_types}")
return None
# 检查CPU资源
if self.available_cpu_cores < required_cpus:
return None
# 分配模型
model_id = f"model-{self.next_model_id}"
self.next_model_id += 1
for npu_id in npu_ids:
self.npus[npu_id]["available"] = False
self.available_cpu_cores -= required_cpus
# 记录模型信息
self.deployed_models[model_id] = {
"id": model_id,
"model_name": model_name,
"npu_ids": npu_ids,
"required_cpus": required_cpus,
"deploy_time": config["deployment_time"],
"deploy_start": timestamp,
"deployed": False,
"current_prefill": None, # 当前运行的prefill任务
"current_decoding": set(), # 当前运行的decoding任务集合
"decoding_tokens": 0
}
return model_id
def unload_model(self, model_id):
"""卸载模型"""
assert model_id in self.deployed_models, f"Model {model_id} is not found."
model = self.deployed_models[model_id]
for npu_id in model["npu_ids"]:
self.npus[npu_id]["available"] = True
self.available_cpu_cores += model["required_cpus"]
del self.deployed_models[model_id]
return True
def allocate_cpu(self, cpus):
"""分配CPU资源"""
if self.available_cpu_cores >= cpus:
self.available_cpu_cores -= cpus
return True
return False
def release_cpu(self, cpus):
"""释放CPU资源"""
self.available_cpu_cores = min(self.total_cpu_cores, self.available_cpu_cores + cpus)
def update_deployments(self, current_timestamp):
"""更新模型部署状态"""
for model_id, model in list(self.deployed_models.items()):
if not model["deployed"] and current_timestamp - model["deploy_start"] >= model["deploy_time"]:
model["deployed"] = True
def can_assign_prefill(self, model_id):
"""检查是否可以分配prefill任务"""
if model_id not in self.deployed_models:
return False
model = self.deployed_models[model_id]
return model["deployed"] and model["current_prefill"] is None
def can_assign_decoding(self, model_id):
"""检查是否可以分配decoding任务"""
if model_id not in self.deployed_models:
return False
model = self.deployed_models[model_id]
return model["deployed"] and len(model["current_decoding"]) < 4
def assign_prefill(self, model_id, task_id):
"""分配prefill任务到模型"""
if model_id not in self.deployed_models:
return False
model = self.deployed_models[model_id]
if model["current_prefill"] is not None:
return False
model["current_prefill"] = task_id
return True
def assign_decoding(self, model_id, task_id):
"""分配decoding任务到模型"""
if model_id not in self.deployed_models:
return False
model = self.deployed_models[model_id]
if len(model["current_decoding"]) >= 4:
return False
model["current_decoding"].add(task_id)
return True
def complete_task(self, task_id, model_id):
"""完成任务并释放模型资源"""
if model_id not in self.deployed_models:
return
model = self.deployed_models[model_id]
# 如果是prefill任务
if model["current_prefill"] == task_id:
model["current_prefill"] = None
# 如果是decoding任务
if task_id in model["current_decoding"]:
model["current_decoding"].remove(task_id)
def get_cpu_info(self):
return self.total_cpu_cores, self.available_cpu_cores
def get_model_info(self):
return self.deployed_models
def get_npu_info(self):
return self.npus
def get_model_config(self):
return self.model_configs