Skip to content

Commit 604009e

Browse files
committed
feat(scheduler): implement mechanism type handling and improve plan discard logic
- Added MECHANISM_TYPE mapping for singleton vs multi-instance mechanisms - Implemented discard logic for conflicting mechanisms and inactive applications - Replaced TODOs in PlanScheduler with concrete logic - Added is_plan_app_active() helper in MLSState - Improved logging clarity for scheduled and discarded plans Signed-off-by: Pravin Kamble <iampbkamble@gmail.com>
1 parent 1bf435d commit 604009e

2 files changed

Lines changed: 22 additions & 14 deletions

File tree

agents/mlsysops/data/state.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -317,4 +317,7 @@ def update_plan_status(self,plan_uid:str, mechanism: str, status:str):
317317
updates['status'] = Status.COMPLETED.value
318318

319319
# Send updates to the task log
320-
return self.update_task_log(plan_uid, updates=updates)
320+
return self.update_task_log(plan_uid, updates=updates)
321+
322+
def is_plan_app_active(self, app_id: str) -> bool:
323+
return app_id in self.applications or app_id in self.active_mechanisms

agents/mlsysops/scheduler.py

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -23,17 +23,24 @@
2323
from .data.plan import Plan
2424
from .data.task_log import Status
2525

26+
MECHANISM_TYPE = {
27+
"cpu_freq": "singleton",
28+
"component_placement": "multi"
29+
}
30+
31+
2632
class PlanScheduler:
2733
def __init__(self, state):
2834
self.state = state
2935
self.period = 1
3036
self.pending_plans = []
3137

38+
3239
async def update_pending_plans(self):
33-
for pending_plan in self.pending_plans:
34-
task_log = self.state.get_task_log(pending_plan.uuid)
35-
if task_log.status != Status.PENDING:
36-
self.pending_plans.remove(pending_plan) # remove it
40+
self.pending_plans = [
41+
plan for plan in self.pending_plans
42+
if self.state.get_task_log(plan.uuid)['status'] == Status.PENDING
43+
]
3744

3845
async def run(self):
3946
logger.debug("Plan Scheduler started")
@@ -62,7 +69,6 @@ async def run(self):
6269
for plan in current_plan_list:
6370

6471
# Use FIFO logic - execute the first plan, and save the mechanisms touched.
65-
# TODO declare mechanisms as singletons or multi-instanced.
6672
# Singletons (e.g. CPU Freq): Can be configured once per Planning/Execution cycle, as they have
6773
# global effect
6874
# Multi-instance (e.g. component placement): Configure different parts of the system, that do not
@@ -73,9 +79,10 @@ async def run(self):
7379
logger.info(f"Processing {str(plan.uuid)} plan for mechanism {asset} for application {plan.application_id}")
7480

7581
should_discard = False
82+
mechanism_type = MECHANISM_TYPE.get(asset, "multi")
7683

7784
# if was executed a plan earlier, then discard it.
78-
if asset in mechanisms_touched:
85+
if mechanism_type == "singleton" and asset in mechanisms_touched:
7986
should_discard = True
8087

8188
task_log = self.state.get_task_log(plan.uuid)
@@ -87,20 +94,18 @@ async def run(self):
8794
should_discard = True
8895

8996
# check if the application has been removed for this application scoped plan
90-
if (plan.application_id not in self.state.applications and
91-
plan.application_id not in self.state.active_mechanisms): # TODO easy way to do for now. different mechanism scope
97+
if not self.state.is_plan_app_active(plan.application_id):
9298
should_discard = True
9399

94-
# TODO: check for fluidity debug
95100
# Check if it is core, should override the discard mechanism
96101
if not plan.core and should_discard:
97-
logger.test(f"|1| Plan planuid:{str(plan.uuid)} status:Discarded")
98-
self.state.update_task_log(plan.uuid,updates={"status": "Discarded"})
102+
logger.debug(f"Plan {plan.uuid} discarded.")
103+
self.state.update_task_log(plan.uuid, updates={"status": Status.DISCARDED.value})
99104
continue
100105

101106

102-
self.state.update_task_log(plan.uuid,updates={"status": "Scheduled"})
103-
logger.test(f"|1| Plan with planuid:{plan.uuid} scheduled for execution status:Scheduled")
107+
self.state.update_task_log(plan.uuid, updates={"status": Status.SCHEDULED.value})
108+
logger.debug(f"Plan {plan.uuid} scheduled for execution.")
104109
# mark mechanism touched only for non-core
105110
if not plan.core:
106111
mechanisms_touched[asset] = {

0 commit comments

Comments
 (0)