forked from argoproj-labs/hera
-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathconftest.py
More file actions
129 lines (85 loc) · 2.6 KB
/
conftest.py
File metadata and controls
129 lines (85 loc) · 2.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
from typing import Dict, List, Tuple
import pytest
from pydantic import BaseModel
from hera.artifact import InputArtifact, OutputArtifact
from hera.cron_workflow import CronWorkflow
from hera.cron_workflow_service import CronWorkflowService
from hera.workflow import Workflow
from hera.workflow_service import WorkflowService
from hera.workflow_template import WorkflowTemplate
from hera.workflow_template_service import WorkflowTemplateService
@pytest.fixture(scope='session')
def ws():
yield WorkflowService(host='https://abc.com', token='abc')
@pytest.fixture(scope='function')
def w(ws):
yield Workflow('w', service=ws)
@pytest.fixture(scope='session')
def wts():
yield WorkflowTemplateService(host='https://abc.com', token='abc')
@pytest.fixture(scope='function')
def wt(wts):
yield WorkflowTemplate('wt', service=wts)
@pytest.fixture(scope='function')
def cws():
yield CronWorkflowService(host='https://abc.com', token='abc')
@pytest.fixture(scope='session')
def schedule():
yield "* * * * *"
@pytest.fixture(scope='function')
def cw(cws, schedule):
yield CronWorkflow('cw', schedule, service=cws)
@pytest.fixture(scope='session')
def in_artifact():
yield InputArtifact(name='test', path='/test', from_task='test-o', artifact_name='test-o')
@pytest.fixture(scope='session')
def out_artifact():
yield OutputArtifact(name='test', path='/test')
@pytest.fixture(scope='session')
def mock_model():
class MockModel(BaseModel):
field1: int = 1
field2: int = 2
yield MockModel
@pytest.fixture(scope='session')
def no_op():
def _no_op():
pass
yield _no_op
@pytest.fixture(scope='session')
def op():
def _op(a):
print(a)
yield _op
@pytest.fixture(scope='session')
def kwarg_op():
def _kwarg_op(a: int = 42):
print(a)
yield _kwarg_op
@pytest.fixture(scope='session')
def kwarg_multi_op():
def _kwarg_multi_op(a: int = 42, b: int = 43):
print(a, b)
yield _kwarg_multi_op
@pytest.fixture(scope='session')
def multi_op():
def _multi_op(a, b, c):
print(a, b, c)
yield _multi_op
@pytest.fixture(scope='session')
def typed_op():
def _typed_op(a) -> List[Dict[str, Tuple[int, int]]]:
print(a)
return [{'a': (a, a)}]
yield _typed_op
@pytest.fixture(scope='session')
def long_op():
def _long_op(
very_long_parameter_name,
very_very_long_parameter_name,
very_very_very_long_parameter_name,
very_very_very_very_long_parameter_name,
very_very_very_very_very_long_parameter_name,
):
print(42)
yield _long_op