-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathdata_models.py
More file actions
492 lines (357 loc) · 21.6 KB
/
data_models.py
File metadata and controls
492 lines (357 loc) · 21.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
from __future__ import annotations
import operator
import uuid
from dataclasses import dataclass, field
from typing import Annotated, Literal
from langchain_core.messages import BaseMessage
from pydantic import BaseModel, Field
# ────────────────────────────────────────────────
# Leaves
# ────────────────────────────────────────────────
class PhysicsModel(BaseModel):
"""Analytical / empirical model attached to a design element."""
name: str = Field(..., description="<Unique model name, e.g. 'HeatExchanger1D'.")
equations: str = Field(
"", description="LaTeX / plain-text governing equations, e.g., 'Q = m_dot * Cp * (T_in - T_out)'."
)
coding_directives: str = Field(
"",
description="Coding directives for the coder agent to generate the Python code to simulate the physics model.",
)
python_code: str = Field("", description=("The Python code to simulate the physics model."))
coder_notes: str = Field("", description="Notes from the coder agent about the code generation process.")
assumptions: list[str] = Field(
default_factory=list, description="Assumptions, e.g., [one-dimensional, steady-state, no fouling]."
)
status: str = Field("draft", description="'draft' | 'reviewed' | 'validated'.")
class Embodiment(BaseModel):
"""Concrete physical realisation of a (sub)function."""
principle: str = Field(..., description="Technology keyword - e.g. 'reverse-osmosis', 'airfoil NACA0012'.")
description: str = Field("", description="1-3 sentence narrative of how the embodiment works.")
design_parameters: dict[str, float] = Field(
default_factory=dict, description="Key variables WITH units, e.g. {'area_m2': 2.5}."
)
cost_estimate: float = Field(-1.0, description="USD (-1.0 → not yet estimated).")
mass_estimate: float = Field(-1.0, description="kg (-1.0 → not yet estimated).")
status: str = Field("draft", description="'draft' | 'reviewed' | 'validated'.")
# ────────────────────────────────────────────────
# Graph node
# ────────────────────────────────────────────────
class DesignNode(BaseModel):
"""
Atomic element of the Design-State Graph (DSG).
"""
# ── identity ──────────────────────────────────────────────────────────
node_id: str = Field(default_factory=lambda: str(uuid.uuid4()), description="Node identifier.")
node_kind: str = Field(default_factory=lambda: str(uuid.uuid4()), description="Type of node.")
name: str = Field(..., description="Short label shown in diagrams.")
description: str = Field("", description="Long-form explanation (purpose, behaviour, interfaces).")
# ── engineering payload ───────────────────────────────────────────────
embodiment: Embodiment = Field(
default_factory=Embodiment,
description="Current physical embodiment of the node, meaning the physical realisation of the node, a system.",
)
physics_models: list[PhysicsModel] = Field(
default_factory=list,
description="The physics models that are used to describe the node with the high-fidelity numerical model.",
)
# ── traceability & maturity ───────────────────────────────────────────
linked_reqs: list[str] = Field(default_factory=list, description="IDs of requirement nodes this element satisfies.")
verification_plan: str = Field(
"", description="How compliance will be verified (Inspection / Analysis / Test / Demo)."
)
maturity: str = Field("draft", description="'draft' | 'reviewed' | 'validated'.")
tags: list[str] = Field(default_factory=list, description="Free keywords.")
# ────────────────────────────────────────────────
# Whole DSG
# ────────────────────────────────────────────────
class DesignState(BaseModel):
"""Snapshot of the complete directed graph."""
nodes: dict[str, DesignNode] = Field(default_factory=dict, description="Map node_id to node data.")
edges: list[list[str]] = Field(
default_factory=list,
description="Single source of truth for graph connectivity. Each item = [source_id, target_id]. The edges_in and edges_out lists in nodes are derived from this list.",
)
workflow_complete: bool = Field(
False,
description="Indicates whether the workflow is complete. Leave it to False as only the Supervisor can set it to True.",
)
# ────────────────────────────────────────────────
# Generation-agent output
# ────────────────────────────────────────────────
class DSGListOutput(BaseModel):
proposals: list[DesignState] = Field(..., description="N alternative DSGs generated in one call.")
# ────────────────────────────────────────────────
# Optional edit ops
# ────────────────────────────────────────────────
class NodeOp(BaseModel):
op: Literal["add", "update", "delete"]
node: DesignNode | None = None
node_id: str | None = None
updates: dict[str, str] = Field(default_factory=dict)
justification: str = ""
class EdgeOp(BaseModel):
op: Literal["add", "delete"]
src: str
dst: str
justification: str = ""
class Proposal(BaseModel):
"""
A container for an ephemeral design proposal, capturing the iterative
contributions and evaluations of various agents across a single design step.
"""
# The raw text or structured content generated by the Generation agent.
content: DesignState
# Design step to know in which design step this proposal was formulated
current_step_index: int = 0
# Reflection agent's critique or suggestions. Could be a short text summary.
feedback: str | None = None
# Ranking agent's metrics from eval_saved.py evaluation
grade: float | None = None
ranking_justification: str | None = None
# Evolved content from the Evolution agent (could be combined or refined text).
evolved_content: DesignState | None = None
evolution_justification: str | None = None
# The final status after the entire iteration: "selected", "rejected", or other.
status: str | None = None
# Explanation for that status: e.g., "Rejected because it conflicts with top-level constraints"
reason_for_status: str | None = None
# Well a title
title: str | None = None
# New field to track which iteration this proposal belongs to
generation_iteration_index: int = 0
reflection_iteration_index: int = 0
ranking_iteration_index: int = 0
evolution_iteration_index: int = 0
meta_review_iteration_index: int = 0
synthesizer_iteration_index: int = 0 # Tracks the current iteration of the synthesizer loop
# iteration_index: int = 0
#
class WorkerAnalysis(BaseModel):
content: str = Field(..., description="The result from the worker task")
from_task: str = Field(..., description="Short description of the task")
step_index: int = Field(..., description="Design step in which this was created")
called_by_agent: str = Field(..., description="Which agent requested this analysis")
@dataclass
class State:
"""Graph state for the full engineering design workflow."""
# **General Messages** (for Human & LLM Interactions)
messages: Annotated[list[BaseMessage], operator.add] = field(default_factory=list)
# **🔹 Key Engineering Artifacts**
cahier_des_charges: CahierDesCharges | None = None # The structured requirements document
supervisor_instructions: Annotated[list[str], operator.add] = field(default_factory=list) # Step-wise instructions
current_step_index: int = 0 # Track the current design step
# **🔹 Supervisor Agent Tracking**
supervisor_decision: dict | None = None # Stores the last decision made by the Supervisor
supervisor_status: str = "in_progress" # Can be ["in_progress", "complete", "redo"]
redo_reason: str | None = None # If the step is redone, why?
supervisor_current_objectives: Annotated[list[str], operator.add] = field(
default_factory=list
) # Step-specific objectives
supervisor_visit_counter: int = 0 # Counter for supervisor visits
dsg_save_folder: str | None = None # Folder name for saving DSGs
thread_id: str | None = None # Thread ID from the pipeline
# **🔹 Step Execution & Control**
active_agent: str = "human" # Tracks the currently active agent
current_tasks_count: int = 0 # Number of worker tasks dispatched
# **🔹 Flags for Workflow Iteration**
redo_work: bool = False # Set to True if the supervisor requests iteration
task_complete: bool = False # Marks if the design step is complete
next_agent: str = "" # Which agent should proceed next
# **🔹 Proposal Tracking**
proposals: Annotated[list[Proposal], operator.add] = field(default_factory=list)
selected_proposal_index: int | None = None
pending_design_states: Annotated[list[DesignState], operator.add] = field(default_factory=list)
# **🔹 Proposal Ranking**
ranking_justification: str | None = None
# **🔹 Proposal Evolution**
evolution_justification: str | None = None
# **🔹 Final Design Graph**
design_graph_history: Annotated[list[DesignState], operator.add] = field(default_factory=list)
pending_node_ops: Annotated[list[NodeOp], operator.add] = field(default_factory=list)
pending_edge_ops: Annotated[list[EdgeOp], operator.add] = field(default_factory=list)
# **🔹 Handover Logs & Iterations**
generation_notes: Annotated[list[str], operator.add] = field(default_factory=list)
generation_iteration: int = 0
reflection_notes: Annotated[list[str], operator.add] = field(default_factory=list)
reflection_iteration: int = 0
coder_notes: Annotated[list[str], operator.add] = field(default_factory=list)
coder_iteration: int = 0
ranking_notes: Annotated[list[str], operator.add] = field(default_factory=list)
ranking_iteration: int = 0
evolution_notes: Annotated[list[str], operator.add] = field(default_factory=list)
evolution_iteration: int = 0
meta_review_notes: Annotated[list[str], operator.add] = field(default_factory=list)
meta_review_iteration: int = 0
synthesizer_notes: Annotated[list[str], operator.add] = field(default_factory=list)
synthesizer_iteration: int = 0
graph_designer_notes: Annotated[list[str], operator.add] = field(default_factory=list)
graph_designer_iteration: int = 0
proximity_notes: Annotated[list[str], operator.add] = field(default_factory=list)
# **🔹 Orchestrator & Worker Interactions**
analyses: Annotated[list[WorkerAnalysis], operator.add] = field(default_factory=list)
orchestrator_orders: Annotated[list[str], operator.add] = field(default_factory=list)
current_requesting_agent: str = "" # Tracks which agent called the orchestrator
max_iterations: int = 0 # to be increased to account for each new round sent by the planner
class EngineeringTask(BaseModel):
"""An engineering task dispatched to Worker Agents."""
topic: str = Field(description="Concise title of the engineering task.")
description: str = Field(description="Detailed explanation of the task, its objectives, and expected outputs.")
return_to_agent: str = Field(description="Name of the specialized agent to return the analysis to.")
class OrchestratorDecision(BaseModel):
"""Decision output from the Orchestrator, breaking down the problem into tasks."""
response: str = Field(
description="Rationale behind the task decomposition, including any assumptions, design strategies, or prioritization logic."
)
research_tasks: list[EngineeringTask] | None = Field(
description="List of engineering tasks to assign to Worker Agents."
)
class FunctionalRequirement(BaseModel):
id: int = Field(..., description="Unique ID for the functional requirement.")
description: str = Field(..., description="Detailed explanation of what the system should do.")
class NonFunctionalRequirement(BaseModel):
id: int = Field(..., description="Unique ID for the non-functional requirement.")
category: str = Field(..., description="Category such as Performance, Usability, Safety, Compliance, etc.")
description: str = Field(..., description="Description of the non-functional constraint.")
# ── lowest-level atoms ────────────────────────────────────────────────
class StakeholderNeed(BaseModel):
"""Voice-of-customer item (SN-xx)."""
code: str = Field(..., description="e.g. SN-1")
text: str = Field(..., description="Need statement")
class Verification(BaseModel):
"""Verification approach for one requirement."""
method: Literal["I", "A", "T", "D"] # Inspection | Analysis | Test | Demo
description: str = "" # optional extra detail
class SystemRequirement(BaseModel):
"""Engineering requirement (SR-xx)."""
code: str = Field(..., description="e.g. SR-03")
text: str = Field(..., description="Requirement statement")
rationale: str | None = None # why this SR exists
verifies: list[str] = Field(default_factory=list, description="List of SN codes traced to")
verification: Verification # single primary method
class Constraint(BaseModel):
"""Non-negotiable design constraint or external interface."""
name: str
text: str
class Deliverable(BaseModel):
"""Design artefact to be produced."""
name: str
description: str
# ── top-level Cahier-des-Charges ─────────────────────────────────────
class CahierDesCharges(BaseModel):
"""Full requirements specification (INCOSE-style skeleton)."""
# 1 - Project overview
project_title: str
objective: str # single headline objective
# 2 - Stakeholder needs
stakeholder_needs: list[StakeholderNeed]
# 3 - System-level requirements
system_requirements: list[SystemRequirement]
# 4 - Constraints & interfaces
constraints: list[Constraint]
# 5 - Verification strategy (derived automatically from SR.verification,
# but a free text field is handy for extra notes)
verification_notes: str | None = None
# 6 - Expected deliverables
deliverables: list[Deliverable]
# Free-form final note
final_note: str | None = None
class SupervisorDecision(BaseModel):
step_completed: bool = Field(..., description="Indicates if the current step is complete or needs iteration.")
# next_step_index: int = Field(..., description="The next step index to execute.") #TODO: decide if we want the agent to decide of that later
instructions: str = Field(..., description="Instructions for the next agent.")
reason_for_iteration: str | None = Field(None, description="If iterating, explain why rework is needed.")
workflow_complete: bool = Field(False, description="Indicates whether the entire workflow is completed.")
class SingleProposal(BaseModel):
title: str = Field(..., description="Concise human-readable summary")
content: DesignState = Field(..., description="A complete Design-State Graph (DSG) proposal")
class ProposalsOutput(BaseModel):
proposals: list[SingleProposal]
class CoderOutput(BaseModel):
"""Output from the coder agent for a single physics model."""
python_code: str = Field(..., description="Complete Python implementation following the 11-point specification.")
class SingleReflection(BaseModel):
proposal_index: int = Field(..., description="Index of the proposal to which this reflection applies")
feedback: str = Field(..., description="Critical review or suggestions about the proposal")
class ReflectionOutput(BaseModel):
reflections: list[SingleReflection] = Field(..., description="List of reflection items for each proposal")
class SingleReflectionPair(BaseModel):
proposal_index: int = Field(..., description="Index of the proposal to which this reflection applies")
feedback: str = Field(..., description="Critical review or suggestions about the proposal")
final_status: str = Field(..., description="Final decision (selected, rejected, or needs more iteration)")
reason: str = Field(..., description="Rationale for the decision")
class ReflectionPairOutput(BaseModel):
workflow_complete: bool = Field(
False,
description="Indicates whether the workflow is complete. Only trigger True when the workflow is complete.",
)
selected_proposal_index: int = Field(-1, description="Index of the chosen proposal (-1 if none are valid)")
detailed_summary_for_graph: str = Field("", description="Instructions for improving the selected design graph")
reflections: list[SingleReflectionPair] = Field(
default_factory=list, description="List of reflection items for each proposal"
)
class SingleRanking(BaseModel):
proposal_index: int = Field(..., description="Index of the proposal being ranked")
# previous_score: Optional[float] = Field(None, description="Previous ranking score, if applicable")
grade: float = Field(..., description="Adjusted ranking score for this proposal")
ranking_justification: str | None = Field(..., description="Clear explanation for ranking decision")
title: str | None = None # Retaining title for readability #TODO: to use in the future
class RankingOutput(BaseModel):
rankings: list[SingleRanking] = Field(..., description="List of ranking decisions for each proposal")
class SingleEvolution(BaseModel):
proposal_index: int = Field(..., description="Index of the proposal being evolved")
original_score: float | None = Field(None, description="Previous ranking score, if available")
new_content: DesignState = Field(..., description="Refined or improved version of the DSG")
evolution_justification: str | None = Field(..., description="Explanation of what was changed and why")
title: str | None = None # Retaining title for readability
class EvolutionOutput(BaseModel):
evolutions: list[SingleEvolution] = Field(..., description="List of evolved proposals")
class SingleMetaDecision(BaseModel):
proposal_index: int = Field(..., description="Index of the proposal being finalized")
final_status: str = Field(..., description="Final decision (selected, rejected, or needs more iteration)")
reason: str = Field(..., description="Rationale for the decision")
class MetaReviewOutput(BaseModel):
selected_proposal_index: int = Field(..., description="Index of the chosen proposal (-1 if none are valid)")
detailed_summary_for_graph: str = Field(..., description="Instructions for improving the selected design graph")
decisions: list[SingleMetaDecision] = Field(..., description="Final statuses for all proposals")
class SynthesizerOutput(BaseModel):
"""
Represents the structured output from the Synthesizer Agent.
"""
summary_explanation: str
nodes: list[NodeOp]
edges: list[EdgeOp]
class GraphDesignerPlan(BaseModel):
"""
Represents the structured plan for modifying the design graph.
"""
summary_reasoning: str = Field(
..., description="A structured explanation of why these graph modifications are needed."
)
# List of node modifications (addition, deletion, update)
nodes: list[NodeOp] = Field(
..., description="A list of modifications (add, delete, update) to apply to nodes in the graph."
)
# List of edge modifications (addition, deletion)
edges: list[EdgeOp] = Field(
..., description="A list of edge modifications (add, delete) to apply relationships between nodes."
)
class PairState(BaseModel):
"""State for the 2-Agent System (Generation + Reflection loop)."""
messages: Annotated[list[BaseMessage], operator.add] = Field(default_factory=list)
first_pass: bool = True
user_request: str = ""
proposals: Annotated[list[Proposal], operator.add] = field(default_factory=list)
feedback: Annotated[list[str], operator.add] = Field(default_factory=list)
generation_iteration: int = 0
reflection_iteration: int = 0
max_iterations: int = 5 # Default max iterations
cahier_des_charges: str = "" # The CdC as a string
design_graph_history: Annotated[list[DesignState], operator.add] = field(default_factory=list)
detailed_summary_for_graph: Annotated[list[str], operator.add] = Field(default_factory=list)
reflection_notes: Annotated[list[str], operator.add] = field(default_factory=list)
selected_proposal_index: int = -1
workflow_complete: bool = False
dsg_save_folder: str | None = None # Folder for saving DSGs
supervisor_visit_counter: int = 0
thread_id: str | None = None # Thread ID from the pipeline