From 3bbda19814ae5abcd8bbee8d7af13dad370e10e9 Mon Sep 17 00:00:00 2001 From: JaGeo Date: Thu, 23 Oct 2025 14:59:43 +0200 Subject: [PATCH 1/5] add the nested examples --- example_workflows/nested/jobflow.ipynb | 35 +++++++++++++++++++ example_workflows/nested/main.pwd.json | 14 ++++++++ example_workflows/nested/prod_div.json | 19 ++++++++++ .../{arithmetic => nested}/workflow.py | 0 4 files changed, 68 insertions(+) create mode 100644 example_workflows/nested/jobflow.ipynb create mode 100644 example_workflows/nested/main.pwd.json create mode 100644 example_workflows/nested/prod_div.json rename example_workflows/{arithmetic => nested}/workflow.py (100%) diff --git a/example_workflows/nested/jobflow.ipynb b/example_workflows/nested/jobflow.ipynb new file mode 100644 index 0000000..d4672cd --- /dev/null +++ b/example_workflows/nested/jobflow.ipynb @@ -0,0 +1,35 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "id": "initial_id", + "metadata": { + "collapsed": true + }, + "outputs": [], + "source": "from python_workflow_definition.aiida import load_workflow_json" + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 2 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython2", + "version": "2.7.6" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/example_workflows/nested/main.pwd.json b/example_workflows/nested/main.pwd.json new file mode 100644 index 0000000..64737fb --- /dev/null +++ b/example_workflows/nested/main.pwd.json @@ -0,0 +1,14 @@ +{ + "version": "0.1.1", + "nodes": [ + { "id": 0, "type": "workflow", "value": "prod_div.json" }, + { "id": 1, "value": 1, "type": "input", "name": "a" }, + { "id": 2, "value": 2, "type": "input", "name": "b" }, + { "id": 3, "type": "output", "name": "final_result" } + ], + "edges": [ + { "target": 0, "targetPort": "x", "source": 1, "sourcePort": null }, + { "target": 0, "targetPort": "y", "source": 2, "sourcePort": null }, + { "target": 3, "targetPort": "null", "source": 0, "sourcePort": "result" } + ] +} \ No newline at end of file diff --git a/example_workflows/nested/prod_div.json b/example_workflows/nested/prod_div.json new file mode 100644 index 0000000..60c4221 --- /dev/null +++ b/example_workflows/nested/prod_div.json @@ -0,0 +1,19 @@ +{ + "version": "0.1.1", + "nodes": [ + { "id": 0, "type": "function", "value": "workflow.get_prod_and_div" }, + { "id": 1, "type": "function", "value": "workflow.get_sum" }, + { "id": 2, "type": "function", "value": "workflow.get_square" }, + { "id": 3, "type": "input", "value": 1, "name": "x" }, + { "id": 4, "type": "input", "value": 2, "name": "y" }, + { "id": 5, "type": "output", "name": "result" } + ], + "edges": [ + { "target": 0, "targetPort": "x", "source": 3, "sourcePort": null }, + { "target": 0, "targetPort": "y", "source": 4, "sourcePort": null }, + { "target": 1, "targetPort": "x", "source": 0, "sourcePort": "prod" }, + { "target": 1, "targetPort": "y", "source": 0, "sourcePort": "div" }, + { "target": 2, "targetPort": "x", "source": 1, "sourcePort": null }, + { "target": 5, "targetPort": null, "source": 2, "sourcePort": null } + ] +} \ No newline at end of file diff --git a/example_workflows/arithmetic/workflow.py b/example_workflows/nested/workflow.py similarity index 100% rename from example_workflows/arithmetic/workflow.py rename to example_workflows/nested/workflow.py From e4fa3ded46b5ed1bc69af280ad8f3acbe9dd7f32 Mon Sep 17 00:00:00 2001 From: JaGeo Date: Thu, 23 Oct 2025 15:08:52 +0200 Subject: [PATCH 2/5] add pydantic model for workflow --- example_workflows/nested/jobflow.ipynb | 57 ++++++++++++++++++++++-- src/python_workflow_definition/models.py | 21 +++++++++ 2 files changed, 75 insertions(+), 3 deletions(-) diff --git a/example_workflows/nested/jobflow.ipynb b/example_workflows/nested/jobflow.ipynb index d4672cd..dbf60c1 100644 --- a/example_workflows/nested/jobflow.ipynb +++ b/example_workflows/nested/jobflow.ipynb @@ -2,13 +2,64 @@ "cells": [ { "cell_type": "code", - "execution_count": null, "id": "initial_id", "metadata": { - "collapsed": true + "collapsed": true, + "ExecuteTime": { + "end_time": "2025-10-23T13:08:02.617292Z", + "start_time": "2025-10-23T13:08:02.205177Z" + } + }, + "source": "from python_workflow_definition.jobflow import load_workflow_json", + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "/home/jgeorge/miniconda3/envs/2025_PWD_Extension_nested_flows/lib/python3.11/site-packages/tqdm/auto.py:21: TqdmWarning: IProgress not found. Please update jupyter and ipywidgets. See https://ipywidgets.readthedocs.io/en/stable/user_install.html\n", + " from .autonotebook import tqdm as notebook_tqdm\n" + ] + } + ], + "execution_count": 1 + }, + { + "metadata": { + "ExecuteTime": { + "end_time": "2025-10-23T13:08:03.884311Z", + "start_time": "2025-10-23T13:08:03.635773Z" + } }, + "cell_type": "code", + "source": "load_workflow_json(\"main.pwd.json\")", + "id": "7e1707c47e14fbcc", + "outputs": [ + { + "ename": "ModuleNotFoundError", + "evalue": "No module named 'prod_div'", + "output_type": "error", + "traceback": [ + "\u001B[31m---------------------------------------------------------------------------\u001B[39m", + "\u001B[31mModuleNotFoundError\u001B[39m Traceback (most recent call last)", + "\u001B[36mCell\u001B[39m\u001B[36m \u001B[39m\u001B[32mIn[2]\u001B[39m\u001B[32m, line 1\u001B[39m\n\u001B[32m----> \u001B[39m\u001B[32m1\u001B[39m \u001B[43mload_workflow_json\u001B[49m\u001B[43m(\u001B[49m\u001B[33;43m\"\u001B[39;49m\u001B[33;43mmain.pwd.json\u001B[39;49m\u001B[33;43m\"\u001B[39;49m\u001B[43m)\u001B[49m\n", + "\u001B[36mFile \u001B[39m\u001B[32m/smb/jgeorge/hpc-user/PycharmProjects/2025_PWD_Extension_nested_flows/python-workflow-definition/src/python_workflow_definition/jobflow.py:301\u001B[39m, in \u001B[36mload_workflow_json\u001B[39m\u001B[34m(file_name)\u001B[39m\n\u001B[32m 299\u001B[39m \u001B[38;5;28;01mif\u001B[39;00m \u001B[38;5;28misinstance\u001B[39m(v, \u001B[38;5;28mstr\u001B[39m) \u001B[38;5;129;01mand\u001B[39;00m \u001B[33m\"\u001B[39m\u001B[33m.\u001B[39m\u001B[33m\"\u001B[39m \u001B[38;5;129;01min\u001B[39;00m v:\n\u001B[32m 300\u001B[39m p, m = v.rsplit(\u001B[33m\"\u001B[39m\u001B[33m.\u001B[39m\u001B[33m\"\u001B[39m, \u001B[32m1\u001B[39m)\n\u001B[32m--> \u001B[39m\u001B[32m301\u001B[39m mod = \u001B[43mimport_module\u001B[49m\u001B[43m(\u001B[49m\u001B[43mp\u001B[49m\u001B[43m)\u001B[49m\n\u001B[32m 302\u001B[39m nodes_new_dict[\u001B[38;5;28mint\u001B[39m(k)] = \u001B[38;5;28mgetattr\u001B[39m(mod, m)\n\u001B[32m 303\u001B[39m \u001B[38;5;28;01melse\u001B[39;00m:\n", + "\u001B[36mFile \u001B[39m\u001B[32m~/miniconda3/envs/2025_PWD_Extension_nested_flows/lib/python3.11/importlib/__init__.py:126\u001B[39m, in \u001B[36mimport_module\u001B[39m\u001B[34m(name, package)\u001B[39m\n\u001B[32m 124\u001B[39m \u001B[38;5;28;01mbreak\u001B[39;00m\n\u001B[32m 125\u001B[39m level += \u001B[32m1\u001B[39m\n\u001B[32m--> \u001B[39m\u001B[32m126\u001B[39m \u001B[38;5;28;01mreturn\u001B[39;00m \u001B[43m_bootstrap\u001B[49m\u001B[43m.\u001B[49m\u001B[43m_gcd_import\u001B[49m\u001B[43m(\u001B[49m\u001B[43mname\u001B[49m\u001B[43m[\u001B[49m\u001B[43mlevel\u001B[49m\u001B[43m:\u001B[49m\u001B[43m]\u001B[49m\u001B[43m,\u001B[49m\u001B[43m \u001B[49m\u001B[43mpackage\u001B[49m\u001B[43m,\u001B[49m\u001B[43m \u001B[49m\u001B[43mlevel\u001B[49m\u001B[43m)\u001B[49m\n", + "\u001B[36mFile \u001B[39m\u001B[32m:1204\u001B[39m, in \u001B[36m_gcd_import\u001B[39m\u001B[34m(name, package, level)\u001B[39m\n", + "\u001B[36mFile \u001B[39m\u001B[32m:1176\u001B[39m, in \u001B[36m_find_and_load\u001B[39m\u001B[34m(name, import_)\u001B[39m\n", + "\u001B[36mFile \u001B[39m\u001B[32m:1140\u001B[39m, in \u001B[36m_find_and_load_unlocked\u001B[39m\u001B[34m(name, import_)\u001B[39m\n", + "\u001B[31mModuleNotFoundError\u001B[39m: No module named 'prod_div'" + ] + } + ], + "execution_count": 2 + }, + { + "metadata": {}, + "cell_type": "code", "outputs": [], - "source": "from python_workflow_definition.aiida import load_workflow_json" + "execution_count": null, + "source": "", + "id": "ae57f67080d3a26c" } ], "metadata": { diff --git a/src/python_workflow_definition/models.py b/src/python_workflow_definition/models.py index 4980cfa..78afd16 100644 --- a/src/python_workflow_definition/models.py +++ b/src/python_workflow_definition/models.py @@ -63,6 +63,26 @@ def check_value_format(cls, v: str): raise ValueError(msg) return v +class PythonWorkflowDefinitionWorklowNode(PythonWorkflowDefinitionBaseNode): + """ + Model for function execution nodes. + The 'name' attribute is computed automatically from 'value'. + """ + + type: Literal["workflow"] + value: str # Expected format: 'module.function' + + @field_validator("value") + @classmethod + def check_value_format(cls, v: str): + if not v or "." not in v or v.startswith(".") or v.endswith("."): + msg = ( + "FunctionNode 'value' must be a non-empty string ", + "in 'module.function' format with at least one period.", + ) + raise ValueError(msg) + return v + # Discriminated Union for Nodes PythonWorkflowDefinitionNode = Annotated[ @@ -70,6 +90,7 @@ def check_value_format(cls, v: str): PythonWorkflowDefinitionInputNode, PythonWorkflowDefinitionOutputNode, PythonWorkflowDefinitionFunctionNode, + PythonWorkflowDefinitionWorklowNode ], Field(discriminator="type"), ] From f09755cf7b30aebdcadf756b4b663c7c103cdd35 Mon Sep 17 00:00:00 2001 From: JaGeo Date: Thu, 23 Oct 2025 17:35:26 +0200 Subject: [PATCH 3/5] submit the first working version --- example_workflows/nested/get_jobflow.py | 8 +++ example_workflows/nested/jobflow.ipynb | 83 +++++++++++++++++------ src/python_workflow_definition/jobflow.py | 59 +++++++++++++--- 3 files changed, 122 insertions(+), 28 deletions(-) create mode 100644 example_workflows/nested/get_jobflow.py diff --git a/example_workflows/nested/get_jobflow.py b/example_workflows/nested/get_jobflow.py new file mode 100644 index 0000000..67d107f --- /dev/null +++ b/example_workflows/nested/get_jobflow.py @@ -0,0 +1,8 @@ +from python_workflow_definition.jobflow import load_workflow_json + +flow=load_workflow_json("main.pwd.json") + +from jobflow import run_locally +flow.draw_graph(figsize=(3, 3)).show() +print(flow.as_dict()) +run_locally(flow[0]) diff --git a/example_workflows/nested/jobflow.ipynb b/example_workflows/nested/jobflow.ipynb index dbf60c1..f2f1a8b 100644 --- a/example_workflows/nested/jobflow.ipynb +++ b/example_workflows/nested/jobflow.ipynb @@ -6,8 +6,8 @@ "metadata": { "collapsed": true, "ExecuteTime": { - "end_time": "2025-10-23T13:08:02.617292Z", - "start_time": "2025-10-23T13:08:02.205177Z" + "end_time": "2025-10-23T13:36:38.417376Z", + "start_time": "2025-10-23T13:36:38.034604Z" } }, "source": "from python_workflow_definition.jobflow import load_workflow_json", @@ -26,40 +26,83 @@ { "metadata": { "ExecuteTime": { - "end_time": "2025-10-23T13:08:03.884311Z", - "start_time": "2025-10-23T13:08:03.635773Z" + "end_time": "2025-10-23T13:36:38.677677Z", + "start_time": "2025-10-23T13:36:38.425149Z" } }, "cell_type": "code", - "source": "load_workflow_json(\"main.pwd.json\")", + "source": [ + "flow=load_workflow_json(\"main.pwd.json\")\n", + "flow.as_dict()" + ], "id": "7e1707c47e14fbcc", "outputs": [ { - "ename": "ModuleNotFoundError", - "evalue": "No module named 'prod_div'", + "name": "stdout", + "output_type": "stream", + "text": [ + "{0: , 1: , 2: , 3: 1, 4: 2}\n" + ] + }, + { + "ename": "UnboundLocalError", + "evalue": "cannot access local variable 'sources_handles_dict' where it is not associated with a value", "output_type": "error", "traceback": [ "\u001B[31m---------------------------------------------------------------------------\u001B[39m", - "\u001B[31mModuleNotFoundError\u001B[39m Traceback (most recent call last)", - "\u001B[36mCell\u001B[39m\u001B[36m \u001B[39m\u001B[32mIn[2]\u001B[39m\u001B[32m, line 1\u001B[39m\n\u001B[32m----> \u001B[39m\u001B[32m1\u001B[39m \u001B[43mload_workflow_json\u001B[49m\u001B[43m(\u001B[49m\u001B[33;43m\"\u001B[39;49m\u001B[33;43mmain.pwd.json\u001B[39;49m\u001B[33;43m\"\u001B[39;49m\u001B[43m)\u001B[49m\n", - "\u001B[36mFile \u001B[39m\u001B[32m/smb/jgeorge/hpc-user/PycharmProjects/2025_PWD_Extension_nested_flows/python-workflow-definition/src/python_workflow_definition/jobflow.py:301\u001B[39m, in \u001B[36mload_workflow_json\u001B[39m\u001B[34m(file_name)\u001B[39m\n\u001B[32m 299\u001B[39m \u001B[38;5;28;01mif\u001B[39;00m \u001B[38;5;28misinstance\u001B[39m(v, \u001B[38;5;28mstr\u001B[39m) \u001B[38;5;129;01mand\u001B[39;00m \u001B[33m\"\u001B[39m\u001B[33m.\u001B[39m\u001B[33m\"\u001B[39m \u001B[38;5;129;01min\u001B[39;00m v:\n\u001B[32m 300\u001B[39m p, m = v.rsplit(\u001B[33m\"\u001B[39m\u001B[33m.\u001B[39m\u001B[33m\"\u001B[39m, \u001B[32m1\u001B[39m)\n\u001B[32m--> \u001B[39m\u001B[32m301\u001B[39m mod = \u001B[43mimport_module\u001B[49m\u001B[43m(\u001B[49m\u001B[43mp\u001B[49m\u001B[43m)\u001B[49m\n\u001B[32m 302\u001B[39m nodes_new_dict[\u001B[38;5;28mint\u001B[39m(k)] = \u001B[38;5;28mgetattr\u001B[39m(mod, m)\n\u001B[32m 303\u001B[39m \u001B[38;5;28;01melse\u001B[39;00m:\n", - "\u001B[36mFile \u001B[39m\u001B[32m~/miniconda3/envs/2025_PWD_Extension_nested_flows/lib/python3.11/importlib/__init__.py:126\u001B[39m, in \u001B[36mimport_module\u001B[39m\u001B[34m(name, package)\u001B[39m\n\u001B[32m 124\u001B[39m \u001B[38;5;28;01mbreak\u001B[39;00m\n\u001B[32m 125\u001B[39m level += \u001B[32m1\u001B[39m\n\u001B[32m--> \u001B[39m\u001B[32m126\u001B[39m \u001B[38;5;28;01mreturn\u001B[39;00m \u001B[43m_bootstrap\u001B[49m\u001B[43m.\u001B[49m\u001B[43m_gcd_import\u001B[49m\u001B[43m(\u001B[49m\u001B[43mname\u001B[49m\u001B[43m[\u001B[49m\u001B[43mlevel\u001B[49m\u001B[43m:\u001B[49m\u001B[43m]\u001B[49m\u001B[43m,\u001B[49m\u001B[43m \u001B[49m\u001B[43mpackage\u001B[49m\u001B[43m,\u001B[49m\u001B[43m \u001B[49m\u001B[43mlevel\u001B[49m\u001B[43m)\u001B[49m\n", - "\u001B[36mFile \u001B[39m\u001B[32m:1204\u001B[39m, in \u001B[36m_gcd_import\u001B[39m\u001B[34m(name, package, level)\u001B[39m\n", - "\u001B[36mFile \u001B[39m\u001B[32m:1176\u001B[39m, in \u001B[36m_find_and_load\u001B[39m\u001B[34m(name, import_)\u001B[39m\n", - "\u001B[36mFile \u001B[39m\u001B[32m:1140\u001B[39m, in \u001B[36m_find_and_load_unlocked\u001B[39m\u001B[34m(name, import_)\u001B[39m\n", - "\u001B[31mModuleNotFoundError\u001B[39m: No module named 'prod_div'" + "\u001B[31mUnboundLocalError\u001B[39m Traceback (most recent call last)", + "\u001B[36mCell\u001B[39m\u001B[36m \u001B[39m\u001B[32mIn[2]\u001B[39m\u001B[32m, line 1\u001B[39m\n\u001B[32m----> \u001B[39m\u001B[32m1\u001B[39m flow=\u001B[43mload_workflow_json\u001B[49m\u001B[43m(\u001B[49m\u001B[33;43m\"\u001B[39;49m\u001B[33;43mmain.pwd.json\u001B[39;49m\u001B[33;43m\"\u001B[39;49m\u001B[43m)\u001B[49m\n\u001B[32m 2\u001B[39m flow.as_dict()\n", + "\u001B[36mFile \u001B[39m\u001B[32m/smb/jgeorge/hpc-user/PycharmProjects/2025_PWD_Extension_nested_flows/python-workflow-definition/src/python_workflow_definition/jobflow.py:279\u001B[39m, in \u001B[36mload_workflow_json\u001B[39m\u001B[34m(file_name)\u001B[39m\n\u001B[32m 278\u001B[39m \u001B[38;5;28;01mdef\u001B[39;00m\u001B[38;5;250m \u001B[39m\u001B[34mload_workflow_json\u001B[39m(file_name: \u001B[38;5;28mstr\u001B[39m) -> Flow:\n\u001B[32m--> \u001B[39m\u001B[32m279\u001B[39m nodes_new_dict, input_dict, new_total_dict, sources_handles_dict = \u001B[43mrecursive_load_workflow_json\u001B[49m\u001B[43m(\u001B[49m\u001B[43mfile_name\u001B[49m\u001B[43m)\u001B[49m\n\u001B[32m 280\u001B[39m task_lst = _get_workflow(\n\u001B[32m 281\u001B[39m nodes_dict=nodes_new_dict,\n\u001B[32m 282\u001B[39m input_dict=input_dict,\n\u001B[32m 283\u001B[39m total_dict=new_total_dict,\n\u001B[32m 284\u001B[39m source_handles_dict=source_handles_dict,\n\u001B[32m 285\u001B[39m )\n\u001B[32m 286\u001B[39m \u001B[38;5;28;01mreturn\u001B[39;00m Flow(task_lst)\n", + "\u001B[36mFile \u001B[39m\u001B[32m/smb/jgeorge/hpc-user/PycharmProjects/2025_PWD_Extension_nested_flows/python-workflow-definition/src/python_workflow_definition/jobflow.py:313\u001B[39m, in \u001B[36mrecursive_load_workflow_json\u001B[39m\u001B[34m(file_name)\u001B[39m\n\u001B[32m 311\u001B[39m \u001B[38;5;28;01mfor\u001B[39;00m k, v \u001B[38;5;129;01min\u001B[39;00m convert_nodes_list_to_dict(nodes_list=content[NODES_LABEL]).items():\n\u001B[32m 312\u001B[39m \u001B[38;5;28;01mif\u001B[39;00m \u001B[38;5;28misinstance\u001B[39m(v, \u001B[38;5;28mstr\u001B[39m) \u001B[38;5;129;01mand\u001B[39;00m \u001B[33m\"\u001B[39m\u001B[33m.json\u001B[39m\u001B[33m\"\u001B[39m \u001B[38;5;129;01min\u001B[39;00m v:\n\u001B[32m--> \u001B[39m\u001B[32m313\u001B[39m nodes_new_dict, input_dict, new_total_dict, sources_handles_dict = \u001B[43mrecursive_load_workflow_json\u001B[49m\u001B[43m(\u001B[49m\u001B[43mfile_name\u001B[49m\u001B[43m=\u001B[49m\u001B[43mv\u001B[49m\u001B[43m)\u001B[49m\n\u001B[32m 314\u001B[39m nodes_new_dict[\u001B[38;5;28mint\u001B[39m(k)] = _get_workflow(nodes_new_dict, input_dict, new_total_dict, sources_handles_dict)\n\u001B[32m 315\u001B[39m \u001B[38;5;28;01melif\u001B[39;00m \u001B[38;5;28misinstance\u001B[39m(v, \u001B[38;5;28mstr\u001B[39m) \u001B[38;5;129;01mand\u001B[39;00m \u001B[33m\"\u001B[39m\u001B[33m.\u001B[39m\u001B[33m\"\u001B[39m \u001B[38;5;129;01min\u001B[39;00m v:\n", + "\u001B[36mFile \u001B[39m\u001B[32m/smb/jgeorge/hpc-user/PycharmProjects/2025_PWD_Extension_nested_flows/python-workflow-definition/src/python_workflow_definition/jobflow.py:331\u001B[39m, in \u001B[36mrecursive_load_workflow_json\u001B[39m\u001B[34m(file_name)\u001B[39m\n\u001B[32m 328\u001B[39m input_dict = _get_input_dict(nodes_dict=nodes_new_dict)\n\u001B[32m 329\u001B[39m new_total_dict = _resort_total_lst(total_dict=total_dict, nodes_dict=nodes_new_dict)\n\u001B[32m--> \u001B[39m\u001B[32m331\u001B[39m \u001B[38;5;28;01mreturn\u001B[39;00m nodes_new_dict, input_dict, new_total_dict, \u001B[43msources_handles_dict\u001B[49m\n", + "\u001B[31mUnboundLocalError\u001B[39m: cannot access local variable 'sources_handles_dict' where it is not associated with a value" ] } ], "execution_count": 2 }, { - "metadata": {}, + "metadata": { + "ExecuteTime": { + "end_time": "2025-10-23T13:36:38.681890549Z", + "start_time": "2025-10-23T13:23:37.712387Z" + } + }, "cell_type": "code", - "outputs": [], - "execution_count": null, - "source": "", - "id": "ae57f67080d3a26c" + "source": [ + "from jobflow import run_locally\n", + "run_locally(flow)" + ], + "id": "ae57f67080d3a26c", + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "2025-10-23 15:23:37,721 INFO Started executing jobs locally\n" + ] + }, + { + "ename": "ValueError", + "evalue": "cannot apply compose_all to an empty list", + "output_type": "error", + "traceback": [ + "\u001B[31m---------------------------------------------------------------------------\u001B[39m", + "\u001B[31mValueError\u001B[39m Traceback (most recent call last)", + "\u001B[36mCell\u001B[39m\u001B[36m \u001B[39m\u001B[32mIn[4]\u001B[39m\u001B[32m, line 2\u001B[39m\n\u001B[32m 1\u001B[39m \u001B[38;5;28;01mfrom\u001B[39;00m\u001B[38;5;250m \u001B[39m\u001B[34;01mjobflow\u001B[39;00m\u001B[38;5;250m \u001B[39m\u001B[38;5;28;01mimport\u001B[39;00m run_locally\n\u001B[32m----> \u001B[39m\u001B[32m2\u001B[39m \u001B[43mrun_locally\u001B[49m\u001B[43m(\u001B[49m\u001B[43mflow\u001B[49m\u001B[43m)\u001B[49m\n", + "\u001B[36mFile \u001B[39m\u001B[32m~/miniconda3/envs/2025_PWD_Extension_nested_flows/lib/python3.11/site-packages/jobflow/managers/local.py:180\u001B[39m, in \u001B[36mrun_locally\u001B[39m\u001B[34m(flow, log, store, create_folders, root_dir, ensure_success, allow_external_references, raise_immediately)\u001B[39m\n\u001B[32m 177\u001B[39m \u001B[38;5;28;01mreturn\u001B[39;00m \u001B[38;5;129;01mnot\u001B[39;00m encountered_bad_response\n\u001B[32m 179\u001B[39m logger.info(\u001B[33m\"\u001B[39m\u001B[33mStarted executing jobs locally\u001B[39m\u001B[33m\"\u001B[39m)\n\u001B[32m--> \u001B[39m\u001B[32m180\u001B[39m finished_successfully = \u001B[43m_run\u001B[49m\u001B[43m(\u001B[49m\u001B[43mflow\u001B[49m\u001B[43m)\u001B[49m\n\u001B[32m 181\u001B[39m logger.info(\u001B[33m\"\u001B[39m\u001B[33mFinished executing jobs locally\u001B[39m\u001B[33m\"\u001B[39m)\n\u001B[32m 183\u001B[39m \u001B[38;5;28;01mif\u001B[39;00m ensure_success \u001B[38;5;129;01mand\u001B[39;00m \u001B[38;5;129;01mnot\u001B[39;00m finished_successfully:\n", + "\u001B[36mFile \u001B[39m\u001B[32m~/miniconda3/envs/2025_PWD_Extension_nested_flows/lib/python3.11/site-packages/jobflow/managers/local.py:166\u001B[39m, in \u001B[36mrun_locally.._run\u001B[39m\u001B[34m(root_flow)\u001B[39m\n\u001B[32m 164\u001B[39m \u001B[38;5;28;01mdef\u001B[39;00m\u001B[38;5;250m \u001B[39m\u001B[34m_run\u001B[39m(root_flow):\n\u001B[32m 165\u001B[39m encountered_bad_response = \u001B[38;5;28;01mFalse\u001B[39;00m\n\u001B[32m--> \u001B[39m\u001B[32m166\u001B[39m \u001B[43m \u001B[49m\u001B[38;5;28;43;01mfor\u001B[39;49;00m\u001B[43m \u001B[49m\u001B[43mjob\u001B[49m\u001B[43m,\u001B[49m\u001B[43m \u001B[49m\u001B[43mparents\u001B[49m\u001B[43m \u001B[49m\u001B[38;5;129;43;01min\u001B[39;49;00m\u001B[43m \u001B[49m\u001B[43mroot_flow\u001B[49m\u001B[43m.\u001B[49m\u001B[43miterflow\u001B[49m\u001B[43m(\u001B[49m\u001B[43m)\u001B[49m\u001B[43m:\u001B[49m\n\u001B[32m 167\u001B[39m \u001B[43m \u001B[49m\u001B[43mjob_dir\u001B[49m\u001B[43m \u001B[49m\u001B[43m=\u001B[49m\u001B[43m \u001B[49m\u001B[43m_get_job_dir\u001B[49m\u001B[43m(\u001B[49m\u001B[43m)\u001B[49m\n\u001B[32m 168\u001B[39m \u001B[43m \u001B[49m\u001B[38;5;28;43;01mwith\u001B[39;49;00m\u001B[43m \u001B[49m\u001B[43mcd\u001B[49m\u001B[43m(\u001B[49m\u001B[43mjob_dir\u001B[49m\u001B[43m)\u001B[49m\u001B[43m:\u001B[49m\n", + "\u001B[36mFile \u001B[39m\u001B[32m~/miniconda3/envs/2025_PWD_Extension_nested_flows/lib/python3.11/site-packages/jobflow/core/flow.py:423\u001B[39m, in \u001B[36mFlow.iterflow\u001B[39m\u001B[34m(self)\u001B[39m\n\u001B[32m 419\u001B[39m \u001B[38;5;28;01mfrom\u001B[39;00m\u001B[38;5;250m \u001B[39m\u001B[34;01mnetworkx\u001B[39;00m\u001B[38;5;250m \u001B[39m\u001B[38;5;28;01mimport\u001B[39;00m is_directed_acyclic_graph\n\u001B[32m 421\u001B[39m \u001B[38;5;28;01mfrom\u001B[39;00m\u001B[38;5;250m \u001B[39m\u001B[34;01mjobflow\u001B[39;00m\u001B[34;01m.\u001B[39;00m\u001B[34;01mutils\u001B[39;00m\u001B[34;01m.\u001B[39;00m\u001B[34;01mgraph\u001B[39;00m\u001B[38;5;250m \u001B[39m\u001B[38;5;28;01mimport\u001B[39;00m itergraph\n\u001B[32m--> \u001B[39m\u001B[32m423\u001B[39m graph = \u001B[38;5;28;43mself\u001B[39;49m\u001B[43m.\u001B[49m\u001B[43mgraph\u001B[49m\n\u001B[32m 425\u001B[39m \u001B[38;5;28;01mif\u001B[39;00m \u001B[38;5;129;01mnot\u001B[39;00m is_directed_acyclic_graph(graph):\n\u001B[32m 426\u001B[39m \u001B[38;5;28;01mraise\u001B[39;00m \u001B[38;5;167;01mValueError\u001B[39;00m(\n\u001B[32m 427\u001B[39m \u001B[33m\"\u001B[39m\u001B[33mJob connectivity contains cycles therefore job execution order \u001B[39m\u001B[33m\"\u001B[39m\n\u001B[32m 428\u001B[39m \u001B[33m\"\u001B[39m\u001B[33mcannot be determined.\u001B[39m\u001B[33m\"\u001B[39m\n\u001B[32m 429\u001B[39m )\n", + "\u001B[36mFile \u001B[39m\u001B[32m~/miniconda3/envs/2025_PWD_Extension_nested_flows/lib/python3.11/site-packages/jobflow/core/flow.py:348\u001B[39m, in \u001B[36mFlow.graph\u001B[39m\u001B[34m(self)\u001B[39m\n\u001B[32m 344\u001B[39m \u001B[38;5;28;01mfrom\u001B[39;00m\u001B[38;5;250m \u001B[39m\u001B[34;01mitertools\u001B[39;00m\u001B[38;5;250m \u001B[39m\u001B[38;5;28;01mimport\u001B[39;00m product\n\u001B[32m 346\u001B[39m \u001B[38;5;28;01mimport\u001B[39;00m\u001B[38;5;250m \u001B[39m\u001B[34;01mnetworkx\u001B[39;00m\u001B[38;5;250m \u001B[39m\u001B[38;5;28;01mas\u001B[39;00m\u001B[38;5;250m \u001B[39m\u001B[34;01mnx\u001B[39;00m\n\u001B[32m--> \u001B[39m\u001B[32m348\u001B[39m graph = \u001B[43mnx\u001B[49m\u001B[43m.\u001B[49m\u001B[43mcompose_all\u001B[49m\u001B[43m(\u001B[49m\u001B[43m[\u001B[49m\u001B[43mjob\u001B[49m\u001B[43m.\u001B[49m\u001B[43mgraph\u001B[49m\u001B[43m \u001B[49m\u001B[38;5;28;43;01mfor\u001B[39;49;00m\u001B[43m \u001B[49m\u001B[43mjob\u001B[49m\u001B[43m \u001B[49m\u001B[38;5;129;43;01min\u001B[39;49;00m\u001B[43m \u001B[49m\u001B[38;5;28;43mself\u001B[39;49m\u001B[43m]\u001B[49m\u001B[43m)\u001B[49m\n\u001B[32m 350\u001B[39m \u001B[38;5;28;01mfor\u001B[39;00m node \u001B[38;5;129;01min\u001B[39;00m graph:\n\u001B[32m 351\u001B[39m node_props = graph.nodes[node]\n", + "\u001B[36mFile \u001B[39m\u001B[32m~/miniconda3/envs/2025_PWD_Extension_nested_flows/lib/python3.11/site-packages/networkx/utils/decorators.py:784\u001B[39m, in \u001B[36margmap.__call__..func\u001B[39m\u001B[34m(_argmap__wrapper, *args, **kwargs)\u001B[39m\n\u001B[32m 783\u001B[39m \u001B[38;5;28;01mdef\u001B[39;00m\u001B[38;5;250m \u001B[39m\u001B[34mfunc\u001B[39m(*args, __wrapper=\u001B[38;5;28;01mNone\u001B[39;00m, **kwargs):\n\u001B[32m--> \u001B[39m\u001B[32m784\u001B[39m \u001B[38;5;28;01mreturn\u001B[39;00m \u001B[43margmap\u001B[49m\u001B[43m.\u001B[49m\u001B[43m_lazy_compile\u001B[49m\u001B[43m(\u001B[49m\u001B[43m__wrapper\u001B[49m\u001B[43m)\u001B[49m\u001B[43m(\u001B[49m\u001B[43m*\u001B[49m\u001B[43margs\u001B[49m\u001B[43m,\u001B[49m\u001B[43m \u001B[49m\u001B[43m*\u001B[49m\u001B[43m*\u001B[49m\u001B[43mkwargs\u001B[49m\u001B[43m)\u001B[49m\n", + "\u001B[36mFile \u001B[39m\u001B[32m compilation 4:3\u001B[39m, in \u001B[36margmap_compose_all_1\u001B[39m\u001B[34m(graphs, backend, **backend_kwargs)\u001B[39m\n\u001B[32m 1\u001B[39m \u001B[38;5;28;01mimport\u001B[39;00m\u001B[38;5;250m \u001B[39m\u001B[34;01mbz2\u001B[39;00m\n\u001B[32m 2\u001B[39m \u001B[38;5;28;01mimport\u001B[39;00m\u001B[38;5;250m \u001B[39m\u001B[34;01mcollections\u001B[39;00m\n\u001B[32m----> \u001B[39m\u001B[32m3\u001B[39m \u001B[38;5;28;01mimport\u001B[39;00m\u001B[38;5;250m \u001B[39m\u001B[34;01mgzip\u001B[39;00m\n\u001B[32m 4\u001B[39m \u001B[38;5;28;01mimport\u001B[39;00m\u001B[38;5;250m \u001B[39m\u001B[34;01minspect\u001B[39;00m\n\u001B[32m 5\u001B[39m \u001B[38;5;28;01mimport\u001B[39;00m\u001B[38;5;250m \u001B[39m\u001B[34;01mitertools\u001B[39;00m\n", + "\u001B[36mFile \u001B[39m\u001B[32m~/miniconda3/envs/2025_PWD_Extension_nested_flows/lib/python3.11/site-packages/networkx/utils/backends.py:535\u001B[39m, in \u001B[36m_dispatchable._call_if_no_backends_installed\u001B[39m\u001B[34m(self, backend, *args, **kwargs)\u001B[39m\n\u001B[32m 529\u001B[39m \u001B[38;5;28;01mif\u001B[39;00m \u001B[33m\"\u001B[39m\u001B[33mnetworkx\u001B[39m\u001B[33m\"\u001B[39m \u001B[38;5;129;01mnot\u001B[39;00m \u001B[38;5;129;01min\u001B[39;00m \u001B[38;5;28mself\u001B[39m.backends:\n\u001B[32m 530\u001B[39m \u001B[38;5;28;01mraise\u001B[39;00m \u001B[38;5;167;01mNotImplementedError\u001B[39;00m(\n\u001B[32m 531\u001B[39m \u001B[33mf\u001B[39m\u001B[33m\"\u001B[39m\u001B[33m'\u001B[39m\u001B[38;5;132;01m{\u001B[39;00m\u001B[38;5;28mself\u001B[39m.name\u001B[38;5;132;01m}\u001B[39;00m\u001B[33m'\u001B[39m\u001B[33m is not implemented by \u001B[39m\u001B[33m'\u001B[39m\u001B[33mnetworkx\u001B[39m\u001B[33m'\u001B[39m\u001B[33m backend. \u001B[39m\u001B[33m\"\u001B[39m\n\u001B[32m 532\u001B[39m \u001B[33m\"\u001B[39m\u001B[33mThis function is included in NetworkX as an API to dispatch to \u001B[39m\u001B[33m\"\u001B[39m\n\u001B[32m 533\u001B[39m \u001B[33m\"\u001B[39m\u001B[33mother backends.\u001B[39m\u001B[33m\"\u001B[39m\n\u001B[32m 534\u001B[39m )\n\u001B[32m--> \u001B[39m\u001B[32m535\u001B[39m \u001B[38;5;28;01mreturn\u001B[39;00m \u001B[38;5;28;43mself\u001B[39;49m\u001B[43m.\u001B[49m\u001B[43morig_func\u001B[49m\u001B[43m(\u001B[49m\u001B[43m*\u001B[49m\u001B[43margs\u001B[49m\u001B[43m,\u001B[49m\u001B[43m \u001B[49m\u001B[43m*\u001B[49m\u001B[43m*\u001B[49m\u001B[43mkwargs\u001B[49m\u001B[43m)\u001B[49m\n", + "\u001B[36mFile \u001B[39m\u001B[32m~/miniconda3/envs/2025_PWD_Extension_nested_flows/lib/python3.11/site-packages/networkx/algorithms/operators/all.py:228\u001B[39m, in \u001B[36mcompose_all\u001B[39m\u001B[34m(graphs)\u001B[39m\n\u001B[32m 223\u001B[39m R.add_edges_from(\n\u001B[32m 224\u001B[39m G.edges(keys=\u001B[38;5;28;01mTrue\u001B[39;00m, data=\u001B[38;5;28;01mTrue\u001B[39;00m) \u001B[38;5;28;01mif\u001B[39;00m G.is_multigraph() \u001B[38;5;28;01melse\u001B[39;00m G.edges(data=\u001B[38;5;28;01mTrue\u001B[39;00m)\n\u001B[32m 225\u001B[39m )\n\u001B[32m 227\u001B[39m \u001B[38;5;28;01mif\u001B[39;00m R \u001B[38;5;129;01mis\u001B[39;00m \u001B[38;5;28;01mNone\u001B[39;00m:\n\u001B[32m--> \u001B[39m\u001B[32m228\u001B[39m \u001B[38;5;28;01mraise\u001B[39;00m \u001B[38;5;167;01mValueError\u001B[39;00m(\u001B[33m\"\u001B[39m\u001B[33mcannot apply compose_all to an empty list\u001B[39m\u001B[33m\"\u001B[39m)\n\u001B[32m 230\u001B[39m \u001B[38;5;28;01mreturn\u001B[39;00m R\n", + "\u001B[31mValueError\u001B[39m: cannot apply compose_all to an empty list" + ] + } + ], + "execution_count": 4 } ], "metadata": { diff --git a/src/python_workflow_definition/jobflow.py b/src/python_workflow_definition/jobflow.py index 969251c..442ee38 100644 --- a/src/python_workflow_definition/jobflow.py +++ b/src/python_workflow_definition/jobflow.py @@ -241,8 +241,37 @@ def get_attr_helper(obj, source_handle): return getattr(getattr(obj, "output"), source_handle) memory_dict = {} + print(total_dict) for k in total_dict.keys(): v = nodes_dict[k] + if type(v) is list: + fn=v + job1_dict=v[0].as_dict() + uuid=job1_dict["uuid"] + mod = import_module(job1_dict["function"]["@module"]) + method=getattr(mod, job1_dict["function"]["@callable"]) + if k in source_handles_dict.keys(): + new_job1 = job( + method=method, + data=[el for el in source_handles_dict[k] if el is not None], + uuid=uuid, + ) + else: + new_job1 = job(method=method, uuid=uuid) + kwargs = { + kw: ( + input_dict[vw[SOURCE_LABEL]] + if vw[SOURCE_LABEL] in input_dict + else get_attr_helper( + obj=memory_dict[vw[SOURCE_LABEL]], + source_handle=vw[SOURCE_PORT_LABEL], + ) + ) + for kw, vw in total_dict[k].items() + } + memory_stuff = new_job1(**kwargs) + fn = [memory_stuff]+v[1:] + memory_dict[k] =Flow(fn) if isfunction(v): if k in source_handles_dict.keys(): fn = job( @@ -263,6 +292,8 @@ def get_attr_helper(obj, source_handle): for kw, vw in total_dict[k].items() } memory_dict[k] = fn(**kwargs) + print(memory_dict) + return list(memory_dict.values()) @@ -274,6 +305,18 @@ def _get_item_from_tuple(input_obj, index, index_lst): def load_workflow_json(file_name: str) -> Flow: + nodes_new_dict, input_dict, new_total_dict, source_handles_dict = recursive_load_workflow_json(file_name) + task_lst = _get_workflow( + nodes_dict=nodes_new_dict, + input_dict=input_dict, + total_dict=new_total_dict, + source_handles_dict=source_handles_dict, + ) + print(task_lst) + return Flow(task_lst) + + +def recursive_load_workflow_json(file_name: str) -> list: content = remove_result( workflow_dict=PythonWorkflowDefinitionWorkflow.load_json_file( file_name=file_name @@ -296,7 +339,10 @@ def load_workflow_json(file_name: str) -> Flow: nodes_new_dict = {} for k, v in convert_nodes_list_to_dict(nodes_list=content[NODES_LABEL]).items(): - if isinstance(v, str) and "." in v: + if isinstance(v, str) and ".json" in v: + nodes_new_dict_here, input_dict_here, new_total_dict_here, sources_handles_dict_here = recursive_load_workflow_json(file_name=v) + nodes_new_dict[int(k)] = _get_workflow(nodes_new_dict_here, input_dict_here, new_total_dict_here, sources_handles_dict_here) + elif isinstance(v, str) and "." in v: p, m = v.rsplit(".", 1) mod = import_module(p) nodes_new_dict[int(k)] = getattr(mod, m) @@ -307,13 +353,10 @@ def load_workflow_json(file_name: str) -> Flow: total_dict = _group_edges(edges_lst=edges_new_lst) input_dict = _get_input_dict(nodes_dict=nodes_new_dict) new_total_dict = _resort_total_lst(total_dict=total_dict, nodes_dict=nodes_new_dict) - task_lst = _get_workflow( - nodes_dict=nodes_new_dict, - input_dict=input_dict, - total_dict=new_total_dict, - source_handles_dict=source_handles_dict, - ) - return Flow(task_lst) + + # go through the whole list again and update the dicts with the inputs from the outer workflwos + + return nodes_new_dict, input_dict, new_total_dict, source_handles_dict def write_workflow_json(flow: Flow, file_name: str = "workflow.json"): From 64de8b6b4af2a23ab5588fbbb9916e93f1fac9ea Mon Sep 17 00:00:00 2001 From: JaGeo Date: Thu, 23 Oct 2025 17:45:22 +0200 Subject: [PATCH 4/5] fix the jupyternotebook --- example_workflows/nested/get_jobflow.py | 2 +- example_workflows/nested/jobflow.ipynb | 107 ++-------------------- example_workflows/nested/main.pwd.json | 4 +- src/python_workflow_definition/jobflow.py | 1 - 4 files changed, 13 insertions(+), 101 deletions(-) diff --git a/example_workflows/nested/get_jobflow.py b/example_workflows/nested/get_jobflow.py index 67d107f..ba623fd 100644 --- a/example_workflows/nested/get_jobflow.py +++ b/example_workflows/nested/get_jobflow.py @@ -5,4 +5,4 @@ from jobflow import run_locally flow.draw_graph(figsize=(3, 3)).show() print(flow.as_dict()) -run_locally(flow[0]) +print(run_locally(flow)) diff --git a/example_workflows/nested/jobflow.ipynb b/example_workflows/nested/jobflow.ipynb index f2f1a8b..1554140 100644 --- a/example_workflows/nested/jobflow.ipynb +++ b/example_workflows/nested/jobflow.ipynb @@ -1,108 +1,21 @@ { "cells": [ { + "metadata": {}, "cell_type": "code", - "id": "initial_id", - "metadata": { - "collapsed": true, - "ExecuteTime": { - "end_time": "2025-10-23T13:36:38.417376Z", - "start_time": "2025-10-23T13:36:38.034604Z" - } - }, - "source": "from python_workflow_definition.jobflow import load_workflow_json", - "outputs": [ - { - "name": "stderr", - "output_type": "stream", - "text": [ - "/home/jgeorge/miniconda3/envs/2025_PWD_Extension_nested_flows/lib/python3.11/site-packages/tqdm/auto.py:21: TqdmWarning: IProgress not found. Please update jupyter and ipywidgets. See https://ipywidgets.readthedocs.io/en/stable/user_install.html\n", - " from .autonotebook import tqdm as notebook_tqdm\n" - ] - } - ], - "execution_count": 1 - }, - { - "metadata": { - "ExecuteTime": { - "end_time": "2025-10-23T13:36:38.677677Z", - "start_time": "2025-10-23T13:36:38.425149Z" - } - }, - "cell_type": "code", + "outputs": [], + "execution_count": null, "source": [ + "from python_workflow_definition.jobflow import load_workflow_json\n", + "\n", "flow=load_workflow_json(\"main.pwd.json\")\n", - "flow.as_dict()" - ], - "id": "7e1707c47e14fbcc", - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "{0: , 1: , 2: , 3: 1, 4: 2}\n" - ] - }, - { - "ename": "UnboundLocalError", - "evalue": "cannot access local variable 'sources_handles_dict' where it is not associated with a value", - "output_type": "error", - "traceback": [ - "\u001B[31m---------------------------------------------------------------------------\u001B[39m", - "\u001B[31mUnboundLocalError\u001B[39m Traceback (most recent call last)", - "\u001B[36mCell\u001B[39m\u001B[36m \u001B[39m\u001B[32mIn[2]\u001B[39m\u001B[32m, line 1\u001B[39m\n\u001B[32m----> \u001B[39m\u001B[32m1\u001B[39m flow=\u001B[43mload_workflow_json\u001B[49m\u001B[43m(\u001B[49m\u001B[33;43m\"\u001B[39;49m\u001B[33;43mmain.pwd.json\u001B[39;49m\u001B[33;43m\"\u001B[39;49m\u001B[43m)\u001B[49m\n\u001B[32m 2\u001B[39m flow.as_dict()\n", - "\u001B[36mFile \u001B[39m\u001B[32m/smb/jgeorge/hpc-user/PycharmProjects/2025_PWD_Extension_nested_flows/python-workflow-definition/src/python_workflow_definition/jobflow.py:279\u001B[39m, in \u001B[36mload_workflow_json\u001B[39m\u001B[34m(file_name)\u001B[39m\n\u001B[32m 278\u001B[39m \u001B[38;5;28;01mdef\u001B[39;00m\u001B[38;5;250m \u001B[39m\u001B[34mload_workflow_json\u001B[39m(file_name: \u001B[38;5;28mstr\u001B[39m) -> Flow:\n\u001B[32m--> \u001B[39m\u001B[32m279\u001B[39m nodes_new_dict, input_dict, new_total_dict, sources_handles_dict = \u001B[43mrecursive_load_workflow_json\u001B[49m\u001B[43m(\u001B[49m\u001B[43mfile_name\u001B[49m\u001B[43m)\u001B[49m\n\u001B[32m 280\u001B[39m task_lst = _get_workflow(\n\u001B[32m 281\u001B[39m nodes_dict=nodes_new_dict,\n\u001B[32m 282\u001B[39m input_dict=input_dict,\n\u001B[32m 283\u001B[39m total_dict=new_total_dict,\n\u001B[32m 284\u001B[39m source_handles_dict=source_handles_dict,\n\u001B[32m 285\u001B[39m )\n\u001B[32m 286\u001B[39m \u001B[38;5;28;01mreturn\u001B[39;00m Flow(task_lst)\n", - "\u001B[36mFile \u001B[39m\u001B[32m/smb/jgeorge/hpc-user/PycharmProjects/2025_PWD_Extension_nested_flows/python-workflow-definition/src/python_workflow_definition/jobflow.py:313\u001B[39m, in \u001B[36mrecursive_load_workflow_json\u001B[39m\u001B[34m(file_name)\u001B[39m\n\u001B[32m 311\u001B[39m \u001B[38;5;28;01mfor\u001B[39;00m k, v \u001B[38;5;129;01min\u001B[39;00m convert_nodes_list_to_dict(nodes_list=content[NODES_LABEL]).items():\n\u001B[32m 312\u001B[39m \u001B[38;5;28;01mif\u001B[39;00m \u001B[38;5;28misinstance\u001B[39m(v, \u001B[38;5;28mstr\u001B[39m) \u001B[38;5;129;01mand\u001B[39;00m \u001B[33m\"\u001B[39m\u001B[33m.json\u001B[39m\u001B[33m\"\u001B[39m \u001B[38;5;129;01min\u001B[39;00m v:\n\u001B[32m--> \u001B[39m\u001B[32m313\u001B[39m nodes_new_dict, input_dict, new_total_dict, sources_handles_dict = \u001B[43mrecursive_load_workflow_json\u001B[49m\u001B[43m(\u001B[49m\u001B[43mfile_name\u001B[49m\u001B[43m=\u001B[49m\u001B[43mv\u001B[49m\u001B[43m)\u001B[49m\n\u001B[32m 314\u001B[39m nodes_new_dict[\u001B[38;5;28mint\u001B[39m(k)] = _get_workflow(nodes_new_dict, input_dict, new_total_dict, sources_handles_dict)\n\u001B[32m 315\u001B[39m \u001B[38;5;28;01melif\u001B[39;00m \u001B[38;5;28misinstance\u001B[39m(v, \u001B[38;5;28mstr\u001B[39m) \u001B[38;5;129;01mand\u001B[39;00m \u001B[33m\"\u001B[39m\u001B[33m.\u001B[39m\u001B[33m\"\u001B[39m \u001B[38;5;129;01min\u001B[39;00m v:\n", - "\u001B[36mFile \u001B[39m\u001B[32m/smb/jgeorge/hpc-user/PycharmProjects/2025_PWD_Extension_nested_flows/python-workflow-definition/src/python_workflow_definition/jobflow.py:331\u001B[39m, in \u001B[36mrecursive_load_workflow_json\u001B[39m\u001B[34m(file_name)\u001B[39m\n\u001B[32m 328\u001B[39m input_dict = _get_input_dict(nodes_dict=nodes_new_dict)\n\u001B[32m 329\u001B[39m new_total_dict = _resort_total_lst(total_dict=total_dict, nodes_dict=nodes_new_dict)\n\u001B[32m--> \u001B[39m\u001B[32m331\u001B[39m \u001B[38;5;28;01mreturn\u001B[39;00m nodes_new_dict, input_dict, new_total_dict, \u001B[43msources_handles_dict\u001B[49m\n", - "\u001B[31mUnboundLocalError\u001B[39m: cannot access local variable 'sources_handles_dict' where it is not associated with a value" - ] - } - ], - "execution_count": 2 - }, - { - "metadata": { - "ExecuteTime": { - "end_time": "2025-10-23T13:36:38.681890549Z", - "start_time": "2025-10-23T13:23:37.712387Z" - } - }, - "cell_type": "code", - "source": [ + "\n", "from jobflow import run_locally\n", - "run_locally(flow)" - ], - "id": "ae57f67080d3a26c", - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "2025-10-23 15:23:37,721 INFO Started executing jobs locally\n" - ] - }, - { - "ename": "ValueError", - "evalue": "cannot apply compose_all to an empty list", - "output_type": "error", - "traceback": [ - "\u001B[31m---------------------------------------------------------------------------\u001B[39m", - "\u001B[31mValueError\u001B[39m Traceback (most recent call last)", - "\u001B[36mCell\u001B[39m\u001B[36m \u001B[39m\u001B[32mIn[4]\u001B[39m\u001B[32m, line 2\u001B[39m\n\u001B[32m 1\u001B[39m \u001B[38;5;28;01mfrom\u001B[39;00m\u001B[38;5;250m \u001B[39m\u001B[34;01mjobflow\u001B[39;00m\u001B[38;5;250m \u001B[39m\u001B[38;5;28;01mimport\u001B[39;00m run_locally\n\u001B[32m----> \u001B[39m\u001B[32m2\u001B[39m \u001B[43mrun_locally\u001B[49m\u001B[43m(\u001B[49m\u001B[43mflow\u001B[49m\u001B[43m)\u001B[49m\n", - "\u001B[36mFile \u001B[39m\u001B[32m~/miniconda3/envs/2025_PWD_Extension_nested_flows/lib/python3.11/site-packages/jobflow/managers/local.py:180\u001B[39m, in \u001B[36mrun_locally\u001B[39m\u001B[34m(flow, log, store, create_folders, root_dir, ensure_success, allow_external_references, raise_immediately)\u001B[39m\n\u001B[32m 177\u001B[39m \u001B[38;5;28;01mreturn\u001B[39;00m \u001B[38;5;129;01mnot\u001B[39;00m encountered_bad_response\n\u001B[32m 179\u001B[39m logger.info(\u001B[33m\"\u001B[39m\u001B[33mStarted executing jobs locally\u001B[39m\u001B[33m\"\u001B[39m)\n\u001B[32m--> \u001B[39m\u001B[32m180\u001B[39m finished_successfully = \u001B[43m_run\u001B[49m\u001B[43m(\u001B[49m\u001B[43mflow\u001B[49m\u001B[43m)\u001B[49m\n\u001B[32m 181\u001B[39m logger.info(\u001B[33m\"\u001B[39m\u001B[33mFinished executing jobs locally\u001B[39m\u001B[33m\"\u001B[39m)\n\u001B[32m 183\u001B[39m \u001B[38;5;28;01mif\u001B[39;00m ensure_success \u001B[38;5;129;01mand\u001B[39;00m \u001B[38;5;129;01mnot\u001B[39;00m finished_successfully:\n", - "\u001B[36mFile \u001B[39m\u001B[32m~/miniconda3/envs/2025_PWD_Extension_nested_flows/lib/python3.11/site-packages/jobflow/managers/local.py:166\u001B[39m, in \u001B[36mrun_locally.._run\u001B[39m\u001B[34m(root_flow)\u001B[39m\n\u001B[32m 164\u001B[39m \u001B[38;5;28;01mdef\u001B[39;00m\u001B[38;5;250m \u001B[39m\u001B[34m_run\u001B[39m(root_flow):\n\u001B[32m 165\u001B[39m encountered_bad_response = \u001B[38;5;28;01mFalse\u001B[39;00m\n\u001B[32m--> \u001B[39m\u001B[32m166\u001B[39m \u001B[43m \u001B[49m\u001B[38;5;28;43;01mfor\u001B[39;49;00m\u001B[43m \u001B[49m\u001B[43mjob\u001B[49m\u001B[43m,\u001B[49m\u001B[43m \u001B[49m\u001B[43mparents\u001B[49m\u001B[43m \u001B[49m\u001B[38;5;129;43;01min\u001B[39;49;00m\u001B[43m \u001B[49m\u001B[43mroot_flow\u001B[49m\u001B[43m.\u001B[49m\u001B[43miterflow\u001B[49m\u001B[43m(\u001B[49m\u001B[43m)\u001B[49m\u001B[43m:\u001B[49m\n\u001B[32m 167\u001B[39m \u001B[43m \u001B[49m\u001B[43mjob_dir\u001B[49m\u001B[43m \u001B[49m\u001B[43m=\u001B[49m\u001B[43m \u001B[49m\u001B[43m_get_job_dir\u001B[49m\u001B[43m(\u001B[49m\u001B[43m)\u001B[49m\n\u001B[32m 168\u001B[39m \u001B[43m \u001B[49m\u001B[38;5;28;43;01mwith\u001B[39;49;00m\u001B[43m \u001B[49m\u001B[43mcd\u001B[49m\u001B[43m(\u001B[49m\u001B[43mjob_dir\u001B[49m\u001B[43m)\u001B[49m\u001B[43m:\u001B[49m\n", - "\u001B[36mFile \u001B[39m\u001B[32m~/miniconda3/envs/2025_PWD_Extension_nested_flows/lib/python3.11/site-packages/jobflow/core/flow.py:423\u001B[39m, in \u001B[36mFlow.iterflow\u001B[39m\u001B[34m(self)\u001B[39m\n\u001B[32m 419\u001B[39m \u001B[38;5;28;01mfrom\u001B[39;00m\u001B[38;5;250m \u001B[39m\u001B[34;01mnetworkx\u001B[39;00m\u001B[38;5;250m \u001B[39m\u001B[38;5;28;01mimport\u001B[39;00m is_directed_acyclic_graph\n\u001B[32m 421\u001B[39m \u001B[38;5;28;01mfrom\u001B[39;00m\u001B[38;5;250m \u001B[39m\u001B[34;01mjobflow\u001B[39;00m\u001B[34;01m.\u001B[39;00m\u001B[34;01mutils\u001B[39;00m\u001B[34;01m.\u001B[39;00m\u001B[34;01mgraph\u001B[39;00m\u001B[38;5;250m \u001B[39m\u001B[38;5;28;01mimport\u001B[39;00m itergraph\n\u001B[32m--> \u001B[39m\u001B[32m423\u001B[39m graph = \u001B[38;5;28;43mself\u001B[39;49m\u001B[43m.\u001B[49m\u001B[43mgraph\u001B[49m\n\u001B[32m 425\u001B[39m \u001B[38;5;28;01mif\u001B[39;00m \u001B[38;5;129;01mnot\u001B[39;00m is_directed_acyclic_graph(graph):\n\u001B[32m 426\u001B[39m \u001B[38;5;28;01mraise\u001B[39;00m \u001B[38;5;167;01mValueError\u001B[39;00m(\n\u001B[32m 427\u001B[39m \u001B[33m\"\u001B[39m\u001B[33mJob connectivity contains cycles therefore job execution order \u001B[39m\u001B[33m\"\u001B[39m\n\u001B[32m 428\u001B[39m \u001B[33m\"\u001B[39m\u001B[33mcannot be determined.\u001B[39m\u001B[33m\"\u001B[39m\n\u001B[32m 429\u001B[39m )\n", - "\u001B[36mFile \u001B[39m\u001B[32m~/miniconda3/envs/2025_PWD_Extension_nested_flows/lib/python3.11/site-packages/jobflow/core/flow.py:348\u001B[39m, in \u001B[36mFlow.graph\u001B[39m\u001B[34m(self)\u001B[39m\n\u001B[32m 344\u001B[39m \u001B[38;5;28;01mfrom\u001B[39;00m\u001B[38;5;250m \u001B[39m\u001B[34;01mitertools\u001B[39;00m\u001B[38;5;250m \u001B[39m\u001B[38;5;28;01mimport\u001B[39;00m product\n\u001B[32m 346\u001B[39m \u001B[38;5;28;01mimport\u001B[39;00m\u001B[38;5;250m \u001B[39m\u001B[34;01mnetworkx\u001B[39;00m\u001B[38;5;250m \u001B[39m\u001B[38;5;28;01mas\u001B[39;00m\u001B[38;5;250m \u001B[39m\u001B[34;01mnx\u001B[39;00m\n\u001B[32m--> \u001B[39m\u001B[32m348\u001B[39m graph = \u001B[43mnx\u001B[49m\u001B[43m.\u001B[49m\u001B[43mcompose_all\u001B[49m\u001B[43m(\u001B[49m\u001B[43m[\u001B[49m\u001B[43mjob\u001B[49m\u001B[43m.\u001B[49m\u001B[43mgraph\u001B[49m\u001B[43m \u001B[49m\u001B[38;5;28;43;01mfor\u001B[39;49;00m\u001B[43m \u001B[49m\u001B[43mjob\u001B[49m\u001B[43m \u001B[49m\u001B[38;5;129;43;01min\u001B[39;49;00m\u001B[43m \u001B[49m\u001B[38;5;28;43mself\u001B[39;49m\u001B[43m]\u001B[49m\u001B[43m)\u001B[49m\n\u001B[32m 350\u001B[39m \u001B[38;5;28;01mfor\u001B[39;00m node \u001B[38;5;129;01min\u001B[39;00m graph:\n\u001B[32m 351\u001B[39m node_props = graph.nodes[node]\n", - "\u001B[36mFile \u001B[39m\u001B[32m~/miniconda3/envs/2025_PWD_Extension_nested_flows/lib/python3.11/site-packages/networkx/utils/decorators.py:784\u001B[39m, in \u001B[36margmap.__call__..func\u001B[39m\u001B[34m(_argmap__wrapper, *args, **kwargs)\u001B[39m\n\u001B[32m 783\u001B[39m \u001B[38;5;28;01mdef\u001B[39;00m\u001B[38;5;250m \u001B[39m\u001B[34mfunc\u001B[39m(*args, __wrapper=\u001B[38;5;28;01mNone\u001B[39;00m, **kwargs):\n\u001B[32m--> \u001B[39m\u001B[32m784\u001B[39m \u001B[38;5;28;01mreturn\u001B[39;00m \u001B[43margmap\u001B[49m\u001B[43m.\u001B[49m\u001B[43m_lazy_compile\u001B[49m\u001B[43m(\u001B[49m\u001B[43m__wrapper\u001B[49m\u001B[43m)\u001B[49m\u001B[43m(\u001B[49m\u001B[43m*\u001B[49m\u001B[43margs\u001B[49m\u001B[43m,\u001B[49m\u001B[43m \u001B[49m\u001B[43m*\u001B[49m\u001B[43m*\u001B[49m\u001B[43mkwargs\u001B[49m\u001B[43m)\u001B[49m\n", - "\u001B[36mFile \u001B[39m\u001B[32m compilation 4:3\u001B[39m, in \u001B[36margmap_compose_all_1\u001B[39m\u001B[34m(graphs, backend, **backend_kwargs)\u001B[39m\n\u001B[32m 1\u001B[39m \u001B[38;5;28;01mimport\u001B[39;00m\u001B[38;5;250m \u001B[39m\u001B[34;01mbz2\u001B[39;00m\n\u001B[32m 2\u001B[39m \u001B[38;5;28;01mimport\u001B[39;00m\u001B[38;5;250m \u001B[39m\u001B[34;01mcollections\u001B[39;00m\n\u001B[32m----> \u001B[39m\u001B[32m3\u001B[39m \u001B[38;5;28;01mimport\u001B[39;00m\u001B[38;5;250m \u001B[39m\u001B[34;01mgzip\u001B[39;00m\n\u001B[32m 4\u001B[39m \u001B[38;5;28;01mimport\u001B[39;00m\u001B[38;5;250m \u001B[39m\u001B[34;01minspect\u001B[39;00m\n\u001B[32m 5\u001B[39m \u001B[38;5;28;01mimport\u001B[39;00m\u001B[38;5;250m \u001B[39m\u001B[34;01mitertools\u001B[39;00m\n", - "\u001B[36mFile \u001B[39m\u001B[32m~/miniconda3/envs/2025_PWD_Extension_nested_flows/lib/python3.11/site-packages/networkx/utils/backends.py:535\u001B[39m, in \u001B[36m_dispatchable._call_if_no_backends_installed\u001B[39m\u001B[34m(self, backend, *args, **kwargs)\u001B[39m\n\u001B[32m 529\u001B[39m \u001B[38;5;28;01mif\u001B[39;00m \u001B[33m\"\u001B[39m\u001B[33mnetworkx\u001B[39m\u001B[33m\"\u001B[39m \u001B[38;5;129;01mnot\u001B[39;00m \u001B[38;5;129;01min\u001B[39;00m \u001B[38;5;28mself\u001B[39m.backends:\n\u001B[32m 530\u001B[39m \u001B[38;5;28;01mraise\u001B[39;00m \u001B[38;5;167;01mNotImplementedError\u001B[39;00m(\n\u001B[32m 531\u001B[39m \u001B[33mf\u001B[39m\u001B[33m\"\u001B[39m\u001B[33m'\u001B[39m\u001B[38;5;132;01m{\u001B[39;00m\u001B[38;5;28mself\u001B[39m.name\u001B[38;5;132;01m}\u001B[39;00m\u001B[33m'\u001B[39m\u001B[33m is not implemented by \u001B[39m\u001B[33m'\u001B[39m\u001B[33mnetworkx\u001B[39m\u001B[33m'\u001B[39m\u001B[33m backend. \u001B[39m\u001B[33m\"\u001B[39m\n\u001B[32m 532\u001B[39m \u001B[33m\"\u001B[39m\u001B[33mThis function is included in NetworkX as an API to dispatch to \u001B[39m\u001B[33m\"\u001B[39m\n\u001B[32m 533\u001B[39m \u001B[33m\"\u001B[39m\u001B[33mother backends.\u001B[39m\u001B[33m\"\u001B[39m\n\u001B[32m 534\u001B[39m )\n\u001B[32m--> \u001B[39m\u001B[32m535\u001B[39m \u001B[38;5;28;01mreturn\u001B[39;00m \u001B[38;5;28;43mself\u001B[39;49m\u001B[43m.\u001B[49m\u001B[43morig_func\u001B[49m\u001B[43m(\u001B[49m\u001B[43m*\u001B[49m\u001B[43margs\u001B[49m\u001B[43m,\u001B[49m\u001B[43m \u001B[49m\u001B[43m*\u001B[49m\u001B[43m*\u001B[49m\u001B[43mkwargs\u001B[49m\u001B[43m)\u001B[49m\n", - "\u001B[36mFile \u001B[39m\u001B[32m~/miniconda3/envs/2025_PWD_Extension_nested_flows/lib/python3.11/site-packages/networkx/algorithms/operators/all.py:228\u001B[39m, in \u001B[36mcompose_all\u001B[39m\u001B[34m(graphs)\u001B[39m\n\u001B[32m 223\u001B[39m R.add_edges_from(\n\u001B[32m 224\u001B[39m G.edges(keys=\u001B[38;5;28;01mTrue\u001B[39;00m, data=\u001B[38;5;28;01mTrue\u001B[39;00m) \u001B[38;5;28;01mif\u001B[39;00m G.is_multigraph() \u001B[38;5;28;01melse\u001B[39;00m G.edges(data=\u001B[38;5;28;01mTrue\u001B[39;00m)\n\u001B[32m 225\u001B[39m )\n\u001B[32m 227\u001B[39m \u001B[38;5;28;01mif\u001B[39;00m R \u001B[38;5;129;01mis\u001B[39;00m \u001B[38;5;28;01mNone\u001B[39;00m:\n\u001B[32m--> \u001B[39m\u001B[32m228\u001B[39m \u001B[38;5;28;01mraise\u001B[39;00m \u001B[38;5;167;01mValueError\u001B[39;00m(\u001B[33m\"\u001B[39m\u001B[33mcannot apply compose_all to an empty list\u001B[39m\u001B[33m\"\u001B[39m)\n\u001B[32m 230\u001B[39m \u001B[38;5;28;01mreturn\u001B[39;00m R\n", - "\u001B[31mValueError\u001B[39m: cannot apply compose_all to an empty list" - ] - } + "flow.draw_graph(figsize=(3, 3)).show()\n", + "print(flow.as_dict())\n", + "print(run_locally(flow))" ], - "execution_count": 4 + "id": "d9504baf7f5e0362" } ], "metadata": { diff --git a/example_workflows/nested/main.pwd.json b/example_workflows/nested/main.pwd.json index 64737fb..7725b09 100644 --- a/example_workflows/nested/main.pwd.json +++ b/example_workflows/nested/main.pwd.json @@ -2,8 +2,8 @@ "version": "0.1.1", "nodes": [ { "id": 0, "type": "workflow", "value": "prod_div.json" }, - { "id": 1, "value": 1, "type": "input", "name": "a" }, - { "id": 2, "value": 2, "type": "input", "name": "b" }, + { "id": 1, "value": 4, "type": "input", "name": "a" }, + { "id": 2, "value": 3, "type": "input", "name": "b" }, { "id": 3, "type": "output", "name": "final_result" } ], "edges": [ diff --git a/src/python_workflow_definition/jobflow.py b/src/python_workflow_definition/jobflow.py index 442ee38..2e0351f 100644 --- a/src/python_workflow_definition/jobflow.py +++ b/src/python_workflow_definition/jobflow.py @@ -292,7 +292,6 @@ def get_attr_helper(obj, source_handle): for kw, vw in total_dict[k].items() } memory_dict[k] = fn(**kwargs) - print(memory_dict) return list(memory_dict.values()) From 42ae5959fab8c44fefd9c84c108af7051942543f Mon Sep 17 00:00:00 2001 From: JaGeo Date: Sun, 26 Oct 2025 17:48:14 +0100 Subject: [PATCH 5/5] add more complicated example --- example_workflows/nested/get_jobflow.py | 2 +- example_workflows/nested/jobflow.ipynb | 63 +++++++++++++- example_workflows/nested/main2.pwd.json | 21 +++++ example_workflows/nested/main_two_wf.pwd.json | 16 ++++ example_workflows/nested/prod.json | 12 +++ src/python_workflow_definition/jobflow.py | 87 +++++++++++++------ 6 files changed, 171 insertions(+), 30 deletions(-) create mode 100644 example_workflows/nested/main2.pwd.json create mode 100644 example_workflows/nested/main_two_wf.pwd.json create mode 100644 example_workflows/nested/prod.json diff --git a/example_workflows/nested/get_jobflow.py b/example_workflows/nested/get_jobflow.py index ba623fd..6f29aa9 100644 --- a/example_workflows/nested/get_jobflow.py +++ b/example_workflows/nested/get_jobflow.py @@ -1,6 +1,6 @@ from python_workflow_definition.jobflow import load_workflow_json -flow=load_workflow_json("main.pwd.json") +flow=load_workflow_json("main2.pwd.json") from jobflow import run_locally flow.draw_graph(figsize=(3, 3)).show() diff --git a/example_workflows/nested/jobflow.ipynb b/example_workflows/nested/jobflow.ipynb index 1554140..f19ef1f 100644 --- a/example_workflows/nested/jobflow.ipynb +++ b/example_workflows/nested/jobflow.ipynb @@ -1,10 +1,13 @@ { "cells": [ { - "metadata": {}, + "metadata": { + "ExecuteTime": { + "end_time": "2025-10-23T15:46:43.189518Z", + "start_time": "2025-10-23T15:46:42.391900Z" + } + }, "cell_type": "code", - "outputs": [], - "execution_count": null, "source": [ "from python_workflow_definition.jobflow import load_workflow_json\n", "\n", @@ -15,7 +18,59 @@ "print(flow.as_dict())\n", "print(run_locally(flow))" ], - "id": "d9504baf7f5e0362" + "id": "d9504baf7f5e0362", + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "/home/jgeorge/miniconda3/envs/2025_PWD_Extension_nested_flows/lib/python3.11/site-packages/tqdm/auto.py:21: TqdmWarning: IProgress not found. Please update jupyter and ipywidgets. See https://ipywidgets.readthedocs.io/en/stable/user_install.html\n", + " from .autonotebook import tqdm as notebook_tqdm\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "{0: {'x': {'source': 3, 'sourcePort': None}, 'y': {'source': 4, 'sourcePort': None}}, 1: {'x': {'source': 0, 'sourcePort': 'prod'}, 'y': {'source': 0, 'sourcePort': 'div'}}, 2: {'x': {'source': 1, 'sourcePort': None}}}\n", + "{0: {'x': {'source': 1, 'sourcePort': None}, 'y': {'source': 2, 'sourcePort': None}}}\n", + "[Flow(name='Flow', uuid='fb70ce2f-9bee-4fc6-998d-46dda59170b8')\n", + "1. Job(name='get_prod_and_div', uuid='3652b7f9-c6a5-4c8e-b38e-bd6a83197ed6')\n", + "2. Job(name='get_sum', uuid='5c9f05a7-9a4a-454d-9671-ac914da3b10b')\n", + "3. Job(name='get_square', uuid='3fb091b0-5369-4be2-b915-f7b6faa47a89')]\n" + ] + }, + { + "data": { + "text/plain": [ + "
" + ], + "image/png": "" + }, + "metadata": {}, + "output_type": "display_data", + "jetTransient": { + "display_id": null + } + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "{'@module': 'jobflow.core.flow', '@class': 'Flow', '@version': '0.2.0', 'jobs': [{'@module': 'jobflow.core.flow', '@class': 'Flow', '@version': '0.2.0', 'jobs': [{'@module': 'jobflow.core.job', '@class': 'Job', '@version': '0.2.0', 'function': {'@module': 'workflow', '@callable': 'get_prod_and_div', '@bound': None}, 'function_args': [], 'function_kwargs': {'x': 4, 'y': 3}, 'output_schema': None, 'uuid': '3652b7f9-c6a5-4c8e-b38e-bd6a83197ed6', 'index': 1, 'name': 'get_prod_and_div', 'metadata': {}, 'config': {'@module': 'jobflow.core.job', '@class': 'JobConfig', '@version': '0.2.0', 'resolve_references': True, 'on_missing_references': 'error', 'manager_config': {}, 'expose_store': False, 'pass_manager_config': True, 'response_manager_config': {}}, 'hosts': ['fb70ce2f-9bee-4fc6-998d-46dda59170b8', 'd2c2153b-bd66-4333-aaac-668afed20362'], 'metadata_updates': [], 'config_updates': [], 'name_updates': []}, {'@module': 'jobflow.core.job', '@class': 'Job', '@version': '0.2.0', 'function': {'@module': 'workflow', '@callable': 'get_sum', '@bound': None}, 'function_args': [], 'function_kwargs': {'x': {'@module': 'jobflow.core.reference', '@class': 'OutputReference', '@version': None, 'uuid': '3652b7f9-c6a5-4c8e-b38e-bd6a83197ed6', 'attributes': [['a', 'prod']], 'output_schema': None}, 'y': {'@module': 'jobflow.core.reference', '@class': 'OutputReference', '@version': None, 'uuid': '3652b7f9-c6a5-4c8e-b38e-bd6a83197ed6', 'attributes': [['a', 'div']], 'output_schema': None}}, 'output_schema': None, 'uuid': '5c9f05a7-9a4a-454d-9671-ac914da3b10b', 'index': 1, 'name': 'get_sum', 'metadata': {}, 'config': {'@module': 'jobflow.core.job', '@class': 'JobConfig', '@version': '0.2.0', 'resolve_references': True, 'on_missing_references': 'error', 'manager_config': {}, 'expose_store': False, 'pass_manager_config': True, 'response_manager_config': {}}, 'hosts': ['fb70ce2f-9bee-4fc6-998d-46dda59170b8', 'd2c2153b-bd66-4333-aaac-668afed20362'], 'metadata_updates': [], 'config_updates': [], 'name_updates': [], 'data': []}, {'@module': 'jobflow.core.job', '@class': 'Job', '@version': '0.2.0', 'function': {'@module': 'workflow', '@callable': 'get_square', '@bound': None}, 'function_args': [], 'function_kwargs': {'x': {'@module': 'jobflow.core.reference', '@class': 'OutputReference', '@version': None, 'uuid': '5c9f05a7-9a4a-454d-9671-ac914da3b10b', 'attributes': [], 'output_schema': None}}, 'output_schema': None, 'uuid': '3fb091b0-5369-4be2-b915-f7b6faa47a89', 'index': 1, 'name': 'get_square', 'metadata': {}, 'config': {'@module': 'jobflow.core.job', '@class': 'JobConfig', '@version': '0.2.0', 'resolve_references': True, 'on_missing_references': 'error', 'manager_config': {}, 'expose_store': False, 'pass_manager_config': True, 'response_manager_config': {}}, 'hosts': ['fb70ce2f-9bee-4fc6-998d-46dda59170b8', 'd2c2153b-bd66-4333-aaac-668afed20362'], 'metadata_updates': [], 'config_updates': [], 'name_updates': []}], 'output': None, 'name': 'Flow', 'order': 'auto', 'uuid': 'fb70ce2f-9bee-4fc6-998d-46dda59170b8', 'hosts': ['d2c2153b-bd66-4333-aaac-668afed20362'], 'metadata': {}, 'metadata_updates': []}], 'output': None, 'name': 'Flow', 'order': 'auto', 'uuid': 'd2c2153b-bd66-4333-aaac-668afed20362', 'hosts': [], 'metadata': {}, 'metadata_updates': []}\n", + "2025-10-23 17:46:43,180 INFO Started executing jobs locally\n", + "2025-10-23 17:46:43,183 INFO Starting job - get_prod_and_div (3652b7f9-c6a5-4c8e-b38e-bd6a83197ed6)\n", + "2025-10-23 17:46:43,184 INFO Finished job - get_prod_and_div (3652b7f9-c6a5-4c8e-b38e-bd6a83197ed6)\n", + "2025-10-23 17:46:43,184 INFO Starting job - get_sum (5c9f05a7-9a4a-454d-9671-ac914da3b10b)\n", + "2025-10-23 17:46:43,186 INFO Finished job - get_sum (5c9f05a7-9a4a-454d-9671-ac914da3b10b)\n", + "2025-10-23 17:46:43,186 INFO Starting job - get_square (3fb091b0-5369-4be2-b915-f7b6faa47a89)\n", + "2025-10-23 17:46:43,187 INFO Finished job - get_square (3fb091b0-5369-4be2-b915-f7b6faa47a89)\n", + "2025-10-23 17:46:43,187 INFO Finished executing jobs locally\n", + "{'3652b7f9-c6a5-4c8e-b38e-bd6a83197ed6': {1: Response(output={'prod': 12, 'div': 1.3333333333333333}, detour=None, addition=None, replace=None, stored_data=None, stop_children=False, stop_jobflow=False, job_dir=PosixPath('/smb/jgeorge/hpc-user/PycharmProjects/2025_PWD_Extension_nested_flows/python-workflow-definition/example_workflows/nested'))}, '5c9f05a7-9a4a-454d-9671-ac914da3b10b': {1: Response(output=13.333333333333334, detour=None, addition=None, replace=None, stored_data=None, stop_children=False, stop_jobflow=False, job_dir=PosixPath('/smb/jgeorge/hpc-user/PycharmProjects/2025_PWD_Extension_nested_flows/python-workflow-definition/example_workflows/nested'))}, '3fb091b0-5369-4be2-b915-f7b6faa47a89': {1: Response(output=177.7777777777778, detour=None, addition=None, replace=None, stored_data=None, stop_children=False, stop_jobflow=False, job_dir=PosixPath('/smb/jgeorge/hpc-user/PycharmProjects/2025_PWD_Extension_nested_flows/python-workflow-definition/example_workflows/nested'))}}\n" + ] + } + ], + "execution_count": 1 } ], "metadata": { diff --git a/example_workflows/nested/main2.pwd.json b/example_workflows/nested/main2.pwd.json new file mode 100644 index 0000000..1761bdf --- /dev/null +++ b/example_workflows/nested/main2.pwd.json @@ -0,0 +1,21 @@ +{ + "version": "0.1.1", + "nodes": [ + { "id": 0, "value": 3, "type": "input", "name": "a" }, + { "id": 1, "value": 2, "type": "input", "name": "b" }, + { "id": 2, "value": 4, "type": "input", "name": "c" }, + { "id": 3, "type": "function", "value": "workflow.get_prod_and_div" }, + { "id": 4, "type": "workflow", "value": "prod_div.json" }, + { "id": 5, "type": "function", "value": "workflow.get_sum" }, + { "id": 6, "type": "output", "name": "final_result" } + ], + "edges": [ + { "target": 3, "targetPort": "x", "source": 0, "sourcePort": null }, + { "target": 3, "targetPort": "y", "source": 2, "sourcePort": null }, + { "target": 4, "targetPort": "x", "source": 3, "sourcePort": "prod" }, + { "target": 4, "targetPort": "y", "source": 3, "sourcePort": "div" }, + { "target": 5, "targetPort": "x", "source": 4, "sourcePort": "result" }, + { "target": 5, "targetPort": "y", "source": 1, "sourcePort": null }, + { "target": 6, "targetPort": null, "source": 5, "sourcePort": null } + ] +} \ No newline at end of file diff --git a/example_workflows/nested/main_two_wf.pwd.json b/example_workflows/nested/main_two_wf.pwd.json new file mode 100644 index 0000000..f67096c --- /dev/null +++ b/example_workflows/nested/main_two_wf.pwd.json @@ -0,0 +1,16 @@ +{ + "version": "0.1.1", + "nodes": [ + { "id": 0, "type": "workflow", "value": "prod_div.json" }, + { "id": 1, "value": 4, "type": "input", "name": "a" }, + { "id": 2, "value": 3, "type": "input", "name": "b" }, + { "id": 3, "type": "workflow", "value": "prod.json" }, + { "id": 4, "type": "output", "name": "final_result" } + ], + "edges": [ + { "target": 0, "targetPort": "x", "source": 1, "sourcePort": null }, + { "target": 0, "targetPort": "y", "source": 2, "sourcePort": null }, + { "target": 3, "targetPort": "x", "source": 0, "sourcePort": "result" }, + { "target": 4, "targetPort": "null", "source": 3, "sourcePort": "result" } + ] +} \ No newline at end of file diff --git a/example_workflows/nested/prod.json b/example_workflows/nested/prod.json new file mode 100644 index 0000000..947a2a0 --- /dev/null +++ b/example_workflows/nested/prod.json @@ -0,0 +1,12 @@ +{ + "version": "0.1.1", + "nodes": [ + { "id": 0, "type": "function", "value": "workflow.get_square" }, + { "id": 1, "type": "input", "value": 1, "name": "x" }, + { "id": 2, "type": "output", "name": "result" } + ], + "edges": [ + { "target": 0, "targetPort": "x", "source": 1, "sourcePort": null }, + { "target": 2, "targetPort": null, "source": 1, "sourcePort": null } + ] +} \ No newline at end of file diff --git a/src/python_workflow_definition/jobflow.py b/src/python_workflow_definition/jobflow.py index 2e0351f..2b37c33 100644 --- a/src/python_workflow_definition/jobflow.py +++ b/src/python_workflow_definition/jobflow.py @@ -235,21 +235,33 @@ def _get_workflow( nodes_dict: dict, input_dict: dict, total_dict: dict, source_handles_dict: dict ) -> list: def get_attr_helper(obj, source_handle): - if source_handle is None: - return getattr(obj, "output") + print("here2") + print(obj) + if type(obj) is not list: + if source_handle is None: + return getattr(obj, "output") + else: + return getattr(getattr(obj, "output"), source_handle) else: - return getattr(getattr(obj, "output"), source_handle) + if source_handle is None: + return getattr(obj[-1], "output") + else: + return getattr(getattr(obj[-1], "output"), source_handle) memory_dict = {} - print(total_dict) + + output_reference=None for k in total_dict.keys(): v = nodes_dict[k] if type(v) is list: + # i need to export the uuid and output references + # from workflows and jobs fn=v job1_dict=v[0].as_dict() uuid=job1_dict["uuid"] mod = import_module(job1_dict["function"]["@module"]) method=getattr(mod, job1_dict["function"]["@callable"]) + if k in source_handles_dict.keys(): new_job1 = job( method=method, @@ -258,21 +270,35 @@ def get_attr_helper(obj, source_handle): ) else: new_job1 = job(method=method, uuid=uuid) - kwargs = { - kw: ( - input_dict[vw[SOURCE_LABEL]] - if vw[SOURCE_LABEL] in input_dict - else get_attr_helper( - obj=memory_dict[vw[SOURCE_LABEL]], - source_handle=vw[SOURCE_PORT_LABEL], - ) - ) - for kw, vw in total_dict[k].items() - } + kwargs = {} + + for kw, vw in total_dict[k].items(): + if output_reference is not None: + # not sure this works? + kwargs[kw] = output_reference + output_reference = None + else: + if vw[SOURCE_LABEL] in input_dict: + kwargs[kw] = input_dict[vw[SOURCE_LABEL]] + else: + kwargs[kw] = get_attr_helper( + obj=memory_dict[vw[SOURCE_LABEL]], + source_handle=vw[SOURCE_PORT_LABEL], + ) + output_reference=v[-1].output + + memory_stuff = new_job1(**kwargs) + fn = [memory_stuff]+v[1:] + memory_dict[k] =Flow(fn) + if isfunction(v): + #if output_reference is None: + + # output_reference=None + if k in source_handles_dict.keys(): fn = job( method=v, @@ -280,17 +306,28 @@ def get_attr_helper(obj, source_handle): ) else: fn = job(method=v) - kwargs = { - kw: ( - input_dict[vw[SOURCE_LABEL]] - if vw[SOURCE_LABEL] in input_dict - else get_attr_helper( + + kwargs={} + + for kw, vw in total_dict[k].items(): + if output_reference is not None: + # not sure this works? + kwargs[kw] = output_reference + output_reference=None + else: + if vw[SOURCE_LABEL] in input_dict: + kwargs[kw]=input_dict[vw[SOURCE_LABEL]] + else: + kwargs[kw]= get_attr_helper( obj=memory_dict[vw[SOURCE_LABEL]], source_handle=vw[SOURCE_PORT_LABEL], - ) - ) - for kw, vw in total_dict[k].items() - } + ) + + print("here") + print(kwargs) + + + memory_dict[k] = fn(**kwargs) return list(memory_dict.values()) @@ -312,7 +349,7 @@ def load_workflow_json(file_name: str) -> Flow: source_handles_dict=source_handles_dict, ) print(task_lst) - return Flow(task_lst) + return Flow(task_lst, output=task_lst[-1].output) def recursive_load_workflow_json(file_name: str) -> list: