@@ -594,27 +594,33 @@ class WorkflowGraph(ApiClass):
594594 nodes (List[WorkflowGraphNode]): A list of nodes in the workflow graph.
595595 edges (List[WorkflowGraphEdge]): A list of edges in the workflow graph, where each edge is a tuple of source, target, and details.
596596 primary_start_node (Union[str, WorkflowGraphNode]): The primary node to start the workflow from.
597+ common_source_code (str): Common source code that can be used across all nodes.
597598 """
598599 nodes : List [WorkflowGraphNode ] = dataclasses .field (default_factory = list )
599600 edges : List [Union [WorkflowGraphEdge , Tuple [WorkflowGraphNode , WorkflowGraphNode , dict ], Tuple [str , str , dict ]]] = dataclasses .field (default_factory = list )
600601 primary_start_node : Union [str , WorkflowGraphNode ] = dataclasses .field (default = None )
601602 common_source_code : str = dataclasses .field (default = None )
603+ specification_type : str = dataclasses .field (default = 'data_flow' )
602604
603605 def __post_init__ (self ):
604606 if self .edges :
605- for index , edge in enumerate (self .edges ):
606- if isinstance (edge , Tuple ):
607- source = edge [0 ] if isinstance (edge [0 ], str ) else edge [0 ].name
608- target = edge [1 ] if isinstance (edge [1 ], str ) else edge [1 ].name
609- details = edge [2 ] if len (edge ) > 2 and isinstance (edge [2 ], dict ) else None
610- self .edges [index ] = WorkflowGraphEdge (source = source , target = target , details = details )
607+ if self .specification_type == 'execution_flow' :
608+ for index , edge in enumerate (self .edges ):
609+ if isinstance (edge , Tuple ):
610+ source = edge [0 ] if isinstance (edge [0 ], str ) else edge [0 ].name
611+ target = edge [1 ] if isinstance (edge [1 ], str ) else edge [1 ].name
612+ details = edge [2 ] if len (edge ) > 2 and isinstance (edge [2 ], dict ) else None
613+ self .edges [index ] = WorkflowGraphEdge (source = source , target = target , details = details )
614+ else :
615+ raise ValueError ('workflow_graph' , 'Workflow Graph no longer supports explicit edges. They are inferred from data flow dependencies.' )
611616
612617 def to_dict (self ):
613618 return {
614619 'nodes' : [node .to_dict () for node in self .nodes ],
615- 'edges' : [edge .to_dict () for edge in self .edges ],
620+ ** ({ 'edges' : [edge .to_dict () for edge in self .edges ]} if self . specification_type == 'execution_flow' else {}) ,
616621 'primary_start_node' : self .primary_start_node .name if isinstance (self .primary_start_node , WorkflowGraphNode ) else self .primary_start_node ,
617- 'common_source_code' : self .common_source_code
622+ 'common_source_code' : self .common_source_code ,
623+ 'specification_type' : self .specification_type
618624 }
619625
620626 @classmethod
@@ -625,19 +631,45 @@ def from_dict(cls, graph: dict):
625631 node ['__return_filter' ] = True
626632 nodes = [WorkflowGraphNode .from_dict (node ) for node in graph .get ('nodes' , [])]
627633 edges = [WorkflowGraphEdge .from_dict (edge ) for edge in graph .get ('edges' , [])]
628- if graph .get ('primary_start_node' ) not in [node .name for node in nodes ]:
629- non_primary_nodes = set ()
630- for edge in edges :
631- non_primary_nodes .add (edge .target )
632- primary_nodes = set ([node .name for node in nodes ]) - non_primary_nodes
633- graph ['primary_start_node' ] = primary_nodes .pop () if primary_nodes else None
634+ primary_start_node = graph .get ('primary_start_node' )
635+ non_primary_nodes = set ()
636+ specification_type = graph .get ('specification_type' , 'execution_flow' )
634637
635- return cls (
636- nodes = nodes ,
637- edges = edges ,
638- primary_start_node = graph .get ('primary_start_node' , None ),
639- common_source_code = graph .get ('common_source_code' , None )
640- )
638+ if specification_type == 'execution_flow' :
639+ if primary_start_node not in [node .name for node in nodes ]:
640+ for edge in edges :
641+ non_primary_nodes .add (edge .target )
642+ primary_nodes = set ([node .name for node in nodes ]) - non_primary_nodes
643+ graph ['primary_start_node' ] = primary_nodes .pop () if primary_nodes else None
644+
645+ return cls (
646+ nodes = nodes ,
647+ edges = edges ,
648+ primary_start_node = graph .get ('primary_start_node' , None ),
649+ common_source_code = graph .get ('common_source_code' , None ),
650+ specification_type = 'execution_flow'
651+ )
652+ else :
653+ if edges :
654+ raise ValueError ('workflow_graph' , 'Workflow Graph no longer supports explicit edges. They are inferred from data flow dependencies.' )
655+
656+ if primary_start_node not in [node .name for node in nodes ]:
657+ for node in nodes :
658+ is_primary_eligible = True
659+ for mapping in node .input_mappings :
660+ if mapping .variable_type == enums .WorkflowNodeInputType .WORKFLOW_VARIABLE :
661+ is_primary_eligible = False
662+ break
663+ if not is_primary_eligible :
664+ non_primary_nodes .add (node .name )
665+ primary_nodes = set ([node .name for node in nodes ]) - non_primary_nodes
666+ primary_start_node = primary_nodes .pop () if primary_nodes else None
667+ return cls (
668+ nodes = nodes ,
669+ primary_start_node = primary_start_node ,
670+ common_source_code = graph .get ('common_source_code' , None ),
671+ specification_type = 'data_flow'
672+ )
641673
642674
643675@dataclasses .dataclass
0 commit comments