-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_run_workflow.py
More file actions
136 lines (105 loc) · 3.96 KB
/
test_run_workflow.py
File metadata and controls
136 lines (105 loc) · 3.96 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
############ Script for showing how to set up a workflow #########
import asyncio
import os
from datetime import timedelta
from dotenv import load_dotenv
from tc_temporal_backend.client import TemporalClient
from temporalio.client import (
Schedule,
ScheduleActionStartWorkflow,
ScheduleIntervalSpec,
ScheduleSpec,
ScheduleState,
)
async def start_workflow():
"""Start a new workflow and demonstrate various interactions."""
load_dotenv()
task_queue = os.getenv("TEMPORAL_TASK_QUEUE")
client = await TemporalClient().get_client()
try:
# Start a workflow
# handle = await client.start_workflow(
# "SayHello", # Workflow name
# id=f"workflow-123456", # Unique workflow ID
# task_queue=task_queue,
# retry_policy=RetryPolicy(
# initial_interval=timedelta(seconds=1),
# maximum_interval=timedelta(seconds=10),
# maximum_attempts=3
# ),
# execution_timeout=timedelta(minutes=5)
# )
handle = await client.create_schedule(
id="website-ingestion-schedule",
schedule=Schedule(
action=ScheduleActionStartWorkflow(
# "SayHello",
"WebsiteIngestionSchedulerWorkflow",
# id="schedules-say-hello",
id="schedules-website-ingestion",
task_queue=task_queue,
args=["platform_id"],
),
spec=ScheduleSpec(
intervals=[ScheduleIntervalSpec(every=timedelta(minutes=2))]
),
state=ScheduleState(note="Here's a note on my Schedule."),
),
)
print(f"Started workflow {handle.id}")
# # Wait for the workflow to complete
# result = await handle.result()
# print("Workflow result:", result)
# # Query workflow state
# state = await handle.query("getCurrentState")
# print("Current state:", state)
# # Signal the workflow
# await handle.signal("signalName", "signal parameter")
# Terminate workflow if needed
# await handle.terminate("Termination reason")
except Exception as e:
print(f"Error executing workflow: {e}")
async def check_workflow_status(workflow_id: str):
"""Check the status of a specific workflow."""
client = await TemporalClient().get_client()
try:
handle = client.get_workflow_handle(workflow_id)
# Get workflow details
desc = await handle.describe()
print(f"Workflow status: {desc.status}")
print(f"Start time: {desc.start_time}")
print(f"Workflow type: {desc.workflow_type}")
# Check if workflow is running
workflow_desc = await handle.describe()
print(f"Description: {workflow_desc.status}")
return desc
except Exception as e:
print(f"Error checking workflow status: {e}")
raise
async def list_workflows():
"""List all workflows of a specific type."""
client = await TemporalClient().get_client()
try:
workflows = client.list_workflows(
query="WorkflowType = 'WebsiteIngestionSchedulerWorkflow'",
)
async for workflow in workflows:
print("Workflow ID:", workflow.id)
print("Status:", workflow.status)
print("Start Time:", workflow.start_time)
print("Type:", workflow.workflow_type)
print("-" * 50)
except Exception as e:
print(f"Error listing workflows: {e}")
raise
async def main():
"""Main function to demonstrate workflow operations."""
# Start a new workflow
await start_workflow()
# Check status of a specific workflow
# await check_workflow_status("website-ingestion-schedule")
# List all workflows
await list_workflows()
if __name__ == "__main__":
# Run the async main function
asyncio.run(main())