Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 11 additions & 7 deletions bridge_sdk/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,13 @@ class WebhookPipelineAction(BaseModel):
name: Unique name for this webhook action within the pipeline + branch.
branch: The git branch this webhook is indexed from and whose pipeline
code runs when it fires.
on: CEL expression evaluated against the payload and headers.
Must return bool. The action triggers only when this evaluates to true.
transform: CEL expression that transforms the payload into step inputs.
on: CEL expression evaluated against the request. Must return bool.
The action triggers only when this evaluates to true.
Available variables: ``headers`` (request headers), ``body`` (raw body
bytes), ``body_json`` (parsed JSON body).
transform: CEL expression that transforms the request into step inputs.
Must return ``map(string, map(string, dyn))`` keyed by step name.
Available variables: same as ``on``.
webhook_endpoint: Name of the webhook endpoint configured in Console
(e.g. ``"linear_issues"``).

Expand All @@ -111,8 +114,8 @@ class WebhookPipelineAction(BaseModel):
WebhookPipelineAction(
name="linear-issues",
branch="main",
on='payload.type == "Issue" && payload.action == "update"',
transform='{"triage_step": {"issue": payload.data}}',
on='body_json.type == "Issue" && body_json.action == "update"',
transform='{"triage_step": {"issue": body_json.data}}',
webhook_endpoint="linear_issues",
),
],
Expand All @@ -129,16 +132,17 @@ class WebhookPipelineAction(BaseModel):
"""CEL expression that determines whether this action should fire. Must return bool."""

transform: str
"""CEL expression that transforms the payload into step inputs. Must return map(string, map(string, dyn))."""
"""CEL expression that transforms the request into step inputs. Must return map(string, map(string, dyn))."""

webhook_endpoint: str
"""Name of the webhook endpoint configured in Console."""

@model_validator(mode="after")
def _validate_cel_expressions(self) -> "WebhookPipelineAction":
env = CelEnvironment(annotations={
"payload": celtypes.Value,
"headers": celtypes.MapType,
"body": celtypes.BytesType,
"body_json": celtypes.Value,
})
for field_name in ("on", "transform"):
try:
Expand Down
25 changes: 13 additions & 12 deletions examples/webhook_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
This module shows how to:
1. Define webhook actions on a Pipeline to trigger it from external events
2. Reference webhook endpoints configured in Console by name
3. Use CEL ``on`` expressions to filter specific webhook payloads
4. Use CEL ``transform`` expressions to extract step inputs from payloads
3. Use CEL ``on`` expressions to filter specific webhook requests
4. Use CEL ``transform`` expressions to extract step inputs from requests
5. Route different webhook sources to different first steps that normalise
into a shared downstream step
"""
Expand All @@ -45,8 +45,9 @@
# keyed by step name (what inputs should each step receive?)
#
# CEL expressions can reference:
# - payload: the parsed JSON body of the webhook request
# - headers: HTTP headers as map(string, string)
# - body: raw request body as bytes (use string(body) to convert)
# - body_json: parsed JSON body as map(string, dyn)

pipeline = Pipeline(
name="issue_triage",
Expand All @@ -60,14 +61,14 @@
name="linear-autofix",
branch="main",
on=(
'payload.type == "Issue"'
' && payload.action == "create"'
' && payload.data.labels.exists(l, l.name == "autofix")'
'body_json.type == "Issue"'
' && body_json.action == "create"'
' && body_json.data.labels.exists(l, l.name == "autofix")'
),
# Both DAG root steps must receive input; fetch_pr gets an empty
# object so it returns None.
transform=(
'{"fetch_issue": {"issue_id": payload.data.id, "title": payload.data.title},'
'{"fetch_issue": {"issue_id": body_json.data.id, "title": body_json.data.title},'
' "fetch_pr": {}}'
),
webhook_endpoint="linear_issues",
Expand All @@ -81,16 +82,16 @@
branch="production",
on=(
'headers["x-github-event"] == "pull_request"'
' && payload.action == "opened"'
' && payload.pull_request.base.ref == "main"'
' && body_json.action == "opened"'
' && body_json.pull_request.base.ref == "main"'
),
# Both DAG root steps must receive input; fetch_issue gets an
# empty object so it returns None.
transform=(
'{"fetch_issue": {},'
' "fetch_pr": {"pr_number": payload.pull_request.number,'
' "repo": payload.repository.full_name,'
' "title": payload.pull_request.title}}'
' "fetch_pr": {"pr_number": body_json.pull_request.number,'
' "repo": body_json.repository.full_name,'
' "title": body_json.pull_request.title}}'
),
webhook_endpoint="github_prs",
),
Expand Down
8 changes: 4 additions & 4 deletions examples/webhook_generic_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,11 @@
WebhookPipelineAction(
name="custom-alerts",
branch="staging",
on='payload.status == "firing" && payload.severity == "critical"',
on='body_json.status == "firing" && body_json.severity == "critical"',
transform=(
'{"handle_alert": {"alert_id": payload.alert_id,'
' "service": payload.service,'
' "message": payload.message}}'
'{"handle_alert": {"alert_id": body_json.alert_id,'
' "service": body_json.service,'
' "message": body_json.message}}'
),
webhook_endpoint="monitoring_alerts",
),
Expand Down
40 changes: 20 additions & 20 deletions tests/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -674,28 +674,28 @@ def test_webhook_minimal(self):
name="my-hook",
branch="main",
on="true",
transform='{"triage_step": {"issue_url": payload.data.url}}',
transform='{"triage_step": {"issue_url": body_json.data.url}}',
webhook_endpoint="linear_issues",
)
assert wh.name == "my-hook"
assert wh.branch == "main"
assert wh.webhook_endpoint == "linear_issues"
assert wh.on == "true"
assert wh.transform == '{"triage_step": {"issue_url": payload.data.url}}'
assert wh.transform == '{"triage_step": {"issue_url": body_json.data.url}}'

def test_webhook_serialization(self):
"""Test WebhookPipelineAction model serialization round-trip."""
wh = WebhookPipelineAction(
name="serial-hook",
branch="main",
on='payload.type == "invoice.paid"',
transform='{"billing_step": {"invoice_id": payload.data.object.id, "amount": payload.data.object.amount_paid}}',
on='body_json.type == "invoice.paid"',
transform='{"billing_step": {"invoice_id": body_json.data.object.id, "amount": body_json.data.object.amount_paid}}',
webhook_endpoint="stripe_invoices",
)
dumped = wh.model_dump()
assert dumped["name"] == "serial-hook"
assert dumped["webhook_endpoint"] == "stripe_invoices"
assert dumped["on"] == 'payload.type == "invoice.paid"'
assert dumped["on"] == 'body_json.type == "invoice.paid"'
assert "provider" not in dumped
assert "idempotency_key" not in dumped
assert "filter" not in dumped
Expand All @@ -710,8 +710,8 @@ def test_pipeline_with_webhooks(self):
WebhookPipelineAction(
name="on-push",
branch="main",
on='payload.ref == "refs/heads/main"',
transform='{"index_step": {"repo": payload.repository.full_name, "commit_sha": payload.head_commit.id}}',
on='body_json.ref == "refs/heads/main"',
transform='{"index_step": {"repo": body_json.repository.full_name, "commit_sha": body_json.head_commit.id}}',
webhook_endpoint="github_pushes",
),
]
Expand All @@ -732,7 +732,7 @@ def test_pipeline_webhooks_in_registry(self):
name="hook-a",
branch="main",
on="true",
transform='{"triage_step": {"issue_id": payload.data.id}}',
transform='{"triage_step": {"issue_id": body_json.data.id}}',
webhook_endpoint="linear_issues",
),
]
Expand All @@ -748,15 +748,15 @@ def test_pipeline_multiple_webhooks(self):
WebhookPipelineAction(
name="linear-hook",
branch="main",
on='payload.type == "Issue"',
transform='{"triage_step": {"issue_id": payload.data.id, "title": payload.data.title}}',
on='body_json.type == "Issue"',
transform='{"triage_step": {"issue_id": body_json.data.id, "title": body_json.data.title}}',
webhook_endpoint="linear_issues",
),
WebhookPipelineAction(
name="github-hook",
branch="main",
on='payload.action == "opened"',
transform='{"review_step": {"pr_number": payload.pull_request.number, "head_sha": payload.pull_request.head.sha}}',
on='body_json.action == "opened"',
transform='{"review_step": {"pr_number": body_json.pull_request.number, "head_sha": body_json.pull_request.head.sha}}',
webhook_endpoint="github_prs",
),
]
Expand Down Expand Up @@ -811,14 +811,14 @@ def test_pipeline_same_endpoint_different_name_allowed(self):
WebhookPipelineAction(
name="hook-a",
branch="main",
on='payload.action == "create"',
on='body_json.action == "create"',
transform='{"step": {"k": "v"}}',
webhook_endpoint="linear_issues",
),
WebhookPipelineAction(
name="hook-b",
branch="main",
on='payload.action == "update"',
on='body_json.action == "update"',
transform='{"step": {"k": "v"}}',
webhook_endpoint="linear_issues",
),
Expand All @@ -832,8 +832,8 @@ def test_pipeline_data_with_webhooks(self):
WebhookPipelineAction(
name="data-hook",
branch="main",
on='payload.type == "message"',
transform='{"chat_step": {"channel": payload.channel, "text": payload.text}}',
on='body_json.type == "message"',
transform='{"chat_step": {"channel": body_json.channel, "text": body_json.text}}',
webhook_endpoint="slack_events",
),
]
Expand Down Expand Up @@ -862,8 +862,8 @@ def test_pipeline_webhooks_in_dsl_output(self):
WebhookPipelineAction(
name="dsl-hook",
branch="main",
on='payload.state == "alerting"',
transform='{"alert_step": {"alertname": payload.alerts[0].labels.alertname, "severity": payload.alerts[0].labels.severity}}',
on='body_json.state == "alerting"',
transform='{"alert_step": {"alertname": body_json.alerts[0].labels.alertname, "severity": body_json.alerts[0].labels.severity}}',
webhook_endpoint="grafana_alerts",
),
]
Expand Down Expand Up @@ -905,7 +905,7 @@ def test_pipeline_webhooks_in_repr(self):
name="repr-hook",
branch="main",
on="true",
transform='{"process_step": {"issue_url": payload.data.url}}',
transform='{"process_step": {"issue_url": body_json.data.url}}',
webhook_endpoint="linear_issues",
),
]
Expand All @@ -919,7 +919,7 @@ def test_webhook_invalid_on_rejected(self) -> None:
WebhookPipelineAction(
name="bad-on",
branch="main",
on="payload.type ==",
on="body_json.type ==",
transform='{"step": {"k": "v"}}',
webhook_endpoint="ep",
)
Expand Down
Loading