Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions example_workflows/nested/get_jobflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from python_workflow_definition.jobflow import load_workflow_json

flow=load_workflow_json("main2.pwd.json")

from jobflow import run_locally
flow.draw_graph(figsize=(3, 3)).show()
print(flow.as_dict())
print(run_locally(flow))
97 changes: 97 additions & 0 deletions example_workflows/nested/jobflow.ipynb

Large diffs are not rendered by default.

14 changes: 14 additions & 0 deletions example_workflows/nested/main.pwd.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{
"version": "0.1.1",
"nodes": [
{ "id": 0, "type": "workflow", "value": "prod_div.json" },
{ "id": 1, "value": 4, "type": "input", "name": "a" },
{ "id": 2, "value": 3, "type": "input", "name": "b" },
{ "id": 3, "type": "output", "name": "final_result" }
],
"edges": [
{ "target": 0, "targetPort": "x", "source": 1, "sourcePort": null },
{ "target": 0, "targetPort": "y", "source": 2, "sourcePort": null },
{ "target": 3, "targetPort": "null", "source": 0, "sourcePort": "result" }
]
}
21 changes: 21 additions & 0 deletions example_workflows/nested/main2.pwd.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
{
"version": "0.1.1",
"nodes": [
{ "id": 0, "value": 3, "type": "input", "name": "a" },
{ "id": 1, "value": 2, "type": "input", "name": "b" },
{ "id": 2, "value": 4, "type": "input", "name": "c" },
{ "id": 3, "type": "function", "value": "workflow.get_prod_and_div" },
{ "id": 4, "type": "workflow", "value": "prod_div.json" },
{ "id": 5, "type": "function", "value": "workflow.get_sum" },
{ "id": 6, "type": "output", "name": "final_result" }
],
"edges": [
{ "target": 3, "targetPort": "x", "source": 0, "sourcePort": null },
{ "target": 3, "targetPort": "y", "source": 2, "sourcePort": null },
{ "target": 4, "targetPort": "x", "source": 3, "sourcePort": "prod" },
{ "target": 4, "targetPort": "y", "source": 3, "sourcePort": "div" },
{ "target": 5, "targetPort": "x", "source": 4, "sourcePort": "result" },
{ "target": 5, "targetPort": "y", "source": 1, "sourcePort": null },
{ "target": 6, "targetPort": null, "source": 5, "sourcePort": null }
]
}
16 changes: 16 additions & 0 deletions example_workflows/nested/main_two_wf.pwd.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
{
"version": "0.1.1",
"nodes": [
{ "id": 0, "type": "workflow", "value": "prod_div.json" },
{ "id": 1, "value": 4, "type": "input", "name": "a" },
{ "id": 2, "value": 3, "type": "input", "name": "b" },
{ "id": 3, "type": "workflow", "value": "prod.json" },
{ "id": 4, "type": "output", "name": "final_result" }
],
"edges": [
{ "target": 0, "targetPort": "x", "source": 1, "sourcePort": null },
{ "target": 0, "targetPort": "y", "source": 2, "sourcePort": null },
{ "target": 3, "targetPort": "x", "source": 0, "sourcePort": "result" },
{ "target": 4, "targetPort": "null", "source": 3, "sourcePort": "result" }
]
}
12 changes: 12 additions & 0 deletions example_workflows/nested/prod.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{
"version": "0.1.1",
"nodes": [
{ "id": 0, "type": "function", "value": "workflow.get_square" },
{ "id": 1, "type": "input", "value": 1, "name": "x" },
{ "id": 2, "type": "output", "name": "result" }
],
"edges": [
{ "target": 0, "targetPort": "x", "source": 1, "sourcePort": null },
{ "target": 2, "targetPort": null, "source": 1, "sourcePort": null }
]
}
19 changes: 19 additions & 0 deletions example_workflows/nested/prod_div.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{
"version": "0.1.1",
"nodes": [
{ "id": 0, "type": "function", "value": "workflow.get_prod_and_div" },
{ "id": 1, "type": "function", "value": "workflow.get_sum" },
{ "id": 2, "type": "function", "value": "workflow.get_square" },
{ "id": 3, "type": "input", "value": 1, "name": "x" },
{ "id": 4, "type": "input", "value": 2, "name": "y" },
{ "id": 5, "type": "output", "name": "result" }
],
"edges": [
{ "target": 0, "targetPort": "x", "source": 3, "sourcePort": null },
{ "target": 0, "targetPort": "y", "source": 4, "sourcePort": null },
{ "target": 1, "targetPort": "x", "source": 0, "sourcePort": "prod" },
{ "target": 1, "targetPort": "y", "source": 0, "sourcePort": "div" },
{ "target": 2, "targetPort": "x", "source": 1, "sourcePort": null },
{ "target": 5, "targetPort": null, "source": 2, "sourcePort": null }
]
}
119 changes: 99 additions & 20 deletions src/python_workflow_definition/jobflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,34 +235,101 @@ def _get_workflow(
nodes_dict: dict, input_dict: dict, total_dict: dict, source_handles_dict: dict
) -> list:
def get_attr_helper(obj, source_handle):
if source_handle is None:
return getattr(obj, "output")
print("here2")
print(obj)
if type(obj) is not list:
if source_handle is None:
return getattr(obj, "output")
else:
return getattr(getattr(obj, "output"), source_handle)
else:
return getattr(getattr(obj, "output"), source_handle)
if source_handle is None:
return getattr(obj[-1], "output")
else:
return getattr(getattr(obj[-1], "output"), source_handle)

memory_dict = {}

output_reference=None
for k in total_dict.keys():
v = nodes_dict[k]
if type(v) is list:
# i need to export the uuid and output references
# from workflows and jobs
fn=v
job1_dict=v[0].as_dict()
uuid=job1_dict["uuid"]
mod = import_module(job1_dict["function"]["@module"])
method=getattr(mod, job1_dict["function"]["@callable"])

if k in source_handles_dict.keys():
new_job1 = job(
method=method,
data=[el for el in source_handles_dict[k] if el is not None],
uuid=uuid,
)
else:
new_job1 = job(method=method, uuid=uuid)
kwargs = {}

for kw, vw in total_dict[k].items():
if output_reference is not None:
# not sure this works?
kwargs[kw] = output_reference
output_reference = None
else:
if vw[SOURCE_LABEL] in input_dict:
kwargs[kw] = input_dict[vw[SOURCE_LABEL]]
else:
kwargs[kw] = get_attr_helper(
obj=memory_dict[vw[SOURCE_LABEL]],
source_handle=vw[SOURCE_PORT_LABEL],
)
output_reference=v[-1].output


memory_stuff = new_job1(**kwargs)

fn = [memory_stuff]+v[1:]

memory_dict[k] =Flow(fn)

if isfunction(v):
#if output_reference is None:

# output_reference=None

if k in source_handles_dict.keys():
fn = job(
method=v,
data=[el for el in source_handles_dict[k] if el is not None],
)
else:
fn = job(method=v)
kwargs = {
kw: (
input_dict[vw[SOURCE_LABEL]]
if vw[SOURCE_LABEL] in input_dict
else get_attr_helper(

kwargs={}

for kw, vw in total_dict[k].items():
if output_reference is not None:
# not sure this works?
kwargs[kw] = output_reference
output_reference=None
else:
if vw[SOURCE_LABEL] in input_dict:
kwargs[kw]=input_dict[vw[SOURCE_LABEL]]
else:
kwargs[kw]= get_attr_helper(
obj=memory_dict[vw[SOURCE_LABEL]],
source_handle=vw[SOURCE_PORT_LABEL],
)
)
for kw, vw in total_dict[k].items()
}
)

print("here")
print(kwargs)



memory_dict[k] = fn(**kwargs)

return list(memory_dict.values())


Expand All @@ -274,6 +341,18 @@ def _get_item_from_tuple(input_obj, index, index_lst):


def load_workflow_json(file_name: str) -> Flow:
nodes_new_dict, input_dict, new_total_dict, source_handles_dict = recursive_load_workflow_json(file_name)
task_lst = _get_workflow(
nodes_dict=nodes_new_dict,
input_dict=input_dict,
total_dict=new_total_dict,
source_handles_dict=source_handles_dict,
)
print(task_lst)
return Flow(task_lst, output=task_lst[-1].output)


def recursive_load_workflow_json(file_name: str) -> list:
content = remove_result(
workflow_dict=PythonWorkflowDefinitionWorkflow.load_json_file(
file_name=file_name
Expand All @@ -296,7 +375,10 @@ def load_workflow_json(file_name: str) -> Flow:

nodes_new_dict = {}
for k, v in convert_nodes_list_to_dict(nodes_list=content[NODES_LABEL]).items():
if isinstance(v, str) and "." in v:
if isinstance(v, str) and ".json" in v:
nodes_new_dict_here, input_dict_here, new_total_dict_here, sources_handles_dict_here = recursive_load_workflow_json(file_name=v)
nodes_new_dict[int(k)] = _get_workflow(nodes_new_dict_here, input_dict_here, new_total_dict_here, sources_handles_dict_here)
elif isinstance(v, str) and "." in v:
p, m = v.rsplit(".", 1)
mod = import_module(p)
nodes_new_dict[int(k)] = getattr(mod, m)
Expand All @@ -307,13 +389,10 @@ def load_workflow_json(file_name: str) -> Flow:
total_dict = _group_edges(edges_lst=edges_new_lst)
input_dict = _get_input_dict(nodes_dict=nodes_new_dict)
new_total_dict = _resort_total_lst(total_dict=total_dict, nodes_dict=nodes_new_dict)
task_lst = _get_workflow(
nodes_dict=nodes_new_dict,
input_dict=input_dict,
total_dict=new_total_dict,
source_handles_dict=source_handles_dict,
)
return Flow(task_lst)

# go through the whole list again and update the dicts with the inputs from the outer workflwos

return nodes_new_dict, input_dict, new_total_dict, source_handles_dict


def write_workflow_json(flow: Flow, file_name: str = "workflow.json"):
Expand Down
21 changes: 21 additions & 0 deletions src/python_workflow_definition/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,34 @@ def check_value_format(cls, v: str):
raise ValueError(msg)
return v

class PythonWorkflowDefinitionWorklowNode(PythonWorkflowDefinitionBaseNode):
"""
Model for function execution nodes.
The 'name' attribute is computed automatically from 'value'.
"""

type: Literal["workflow"]
value: str # Expected format: 'module.function'

@field_validator("value")
@classmethod
def check_value_format(cls, v: str):
if not v or "." not in v or v.startswith(".") or v.endswith("."):
msg = (
"FunctionNode 'value' must be a non-empty string ",
"in 'module.function' format with at least one period.",
)
raise ValueError(msg)
return v


# Discriminated Union for Nodes
PythonWorkflowDefinitionNode = Annotated[
Union[
PythonWorkflowDefinitionInputNode,
PythonWorkflowDefinitionOutputNode,
PythonWorkflowDefinitionFunctionNode,
PythonWorkflowDefinitionWorklowNode
],
Field(discriminator="type"),
]
Expand Down
Loading