Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 94 additions & 5 deletions src/schematic/client.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import atexit
import datetime as dt
import logging
from dataclasses import dataclass
from typing import Any, Callable, Dict, List, Optional, Union
Expand Down Expand Up @@ -38,6 +39,66 @@ class CheckFlagOptions:

default_value: Optional[Union[bool, Callable[[], bool]]] = None
timeout: Optional[float] = None
# Client-supplied dedupe key for the resulting flag_check event. Only
# applied when the SDK evaluates the flag locally via DataStream and
# fires its own flag_check event (the REST API path sets its own).
# Duplicate events with the same key are dropped server-side for 24h.
idempotency_key: Optional[str] = None


@dataclass
class TrackOptions:
"""Optional metadata for a track event.

Fields map directly to the corresponding ``CreateEventRequestBody``
properties. Omit any field you don't need; the SDK only sends fields
that are explicitly set.
"""

# Client-supplied dedupe key. Duplicate events with the same key
# (scoped to the environment) are dropped server-side for 24 hours.
idempotency_key: Optional[str] = None
# Timestamp the event was sent. Required when trusted_client_clock=True.
sent_at: Optional[dt.datetime] = None
# When True, use sent_at as the effective event timestamp instead of
# server receipt time. Requires a secret API key and sent_at.
trusted_client_clock: Optional[bool] = None
# Import historical data without affecting billing. Requires a secret
# API key and trusted_client_clock.
backfill: Optional[bool] = None


@dataclass
class IdentifyOptions:
"""Optional metadata for an identify event.

Fields map directly to the corresponding ``CreateEventRequestBody``
properties. Omit any field you don't need; the SDK only sends fields
that are explicitly set.
"""

# Client-supplied dedupe key. Duplicate events with the same key
# (scoped to the environment) are dropped server-side for 24 hours.
idempotency_key: Optional[str] = None


def _event_options_to_kwargs(
options: Optional[Union[TrackOptions, IdentifyOptions, CheckFlagOptions]],
) -> Dict[str, Any]:
"""Flatten an options dataclass into kwargs for CreateEventRequestBody.

Only fields that were explicitly set on the dataclass are returned, so
unset fields don't override CreateEventRequestBody's own defaults and
don't appear on the wire as explicit nulls.
"""
if options is None:
return {}
kwargs: Dict[str, Any] = {}
for field in ("idempotency_key", "sent_at", "trusted_client_clock", "backfill"):
value = getattr(options, field, None)
if value is not None:
kwargs[field] = value
return kwargs


@dataclass
Expand Down Expand Up @@ -274,6 +335,7 @@ def identify(
company: Optional[EventBodyIdentifyCompany] = None,
name: Optional[str] = None,
traits: Optional[Dict[str, Any]] = None,
options: Optional[IdentifyOptions] = None,
) -> None:
self._enqueue_event(
"identify",
Expand All @@ -283,6 +345,7 @@ def identify(
name=name,
traits=traits,
),
options=options,
)

def track(
Expand All @@ -292,6 +355,7 @@ def track(
user: Optional[Dict[str, str]] = None,
traits: Optional[Dict[str, Any]] = None,
quantity: Optional[int] = None,
options: Optional[TrackOptions] = None,
) -> None:
self._enqueue_event(
"track",
Expand All @@ -302,13 +366,23 @@ def track(
traits=traits,
user=user,
),
options=options,
)

def _enqueue_event(self, event_type: str, body: EventBody) -> None:
def _enqueue_event(
self,
event_type: str,
body: EventBody,
options: Optional[Union[TrackOptions, IdentifyOptions, CheckFlagOptions]] = None,
) -> None:
if self.offline:
return
try:
event_body = CreateEventRequestBody(event_type=event_type, body=body)
event_body = CreateEventRequestBody(
event_type=event_type,
body=body,
**_event_options_to_kwargs(options),
)
self.event_buffer.push(event_body)
except Exception as e:
self.logger.error(e)
Expand Down Expand Up @@ -492,7 +566,7 @@ async def check_flag_with_entitlement(
CheckFlagRequestBody(company=company, user=user),
flag_key,
)
await self._enqueue_flag_check_event(flag_key, resp, company, user)
await self._enqueue_flag_check_event(flag_key, resp, company, user, options)
return self._ds_result_to_response(flag_key, resp, options)
except Exception as e:
self.logger.debug(f"Datastream flag check failed ({e}), falling back to API")
Expand Down Expand Up @@ -627,6 +701,7 @@ async def _enqueue_flag_check_event(
resp: RulesengineCheckFlagResult,
company: Optional[Dict[str, str]],
user: Optional[Dict[str, str]],
options: Optional[CheckFlagOptions] = None,
) -> None:
"""Enqueue a flag_check event for a DataStream-evaluated flag."""
await self._enqueue_event(
Expand All @@ -642,6 +717,7 @@ async def _enqueue_flag_check_event(
req_company=company,
req_user=user,
),
options=options,
)

def _ds_result_to_response(
Expand Down Expand Up @@ -700,6 +776,7 @@ async def identify(
company: Optional[EventBodyIdentifyCompany] = None,
name: Optional[str] = None,
traits: Optional[Dict[str, Any]] = None,
options: Optional[IdentifyOptions] = None,
) -> None:
await self._enqueue_event(
"identify",
Expand All @@ -709,6 +786,7 @@ async def identify(
name=name,
traits=traits,
),
options=options,
)

async def track(
Expand All @@ -718,6 +796,7 @@ async def track(
user: Optional[Dict[str, str]] = None,
traits: Optional[Dict[str, Any]] = None,
quantity: Optional[int] = None,
options: Optional[TrackOptions] = None,
) -> None:
await self._enqueue_event(
"track",
Expand All @@ -728,6 +807,7 @@ async def track(
traits=traits,
user=user,
),
options=options,
)

# Update company metrics in DataStream if available and connected
Expand All @@ -742,11 +822,20 @@ async def track(
except Exception as e:
self.logger.error(f"Failed to update company metrics: {e}")

async def _enqueue_event(self, event_type: str, body: EventBody) -> None:
async def _enqueue_event(
self,
event_type: str,
body: EventBody,
options: Optional[Union[TrackOptions, IdentifyOptions, CheckFlagOptions]] = None,
) -> None:
if self.offline:
return
try:
event_body = CreateEventRequestBody(event_type=event_type, body=body)
event_body = CreateEventRequestBody(
event_type=event_type,
body=body,
**_event_options_to_kwargs(options),
)
await self.event_buffer.push(event_body)
except Exception as e:
self.logger.error(e)
Expand Down
32 changes: 25 additions & 7 deletions src/schematic/event_capture.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,26 +14,44 @@ class _CaptureEventPayload(UniversalBaseModel):
"""Wire format for a single event sent to the capture service.

Mirrors the shape used by the Go/Ruby/C# SDKs: `type` (not `event_type`)
and an `api_key` field embedded on each event.
and an `api_key` field embedded on each event. The optional metadata
fields (idempotency_key, sent_at, trusted_client_clock, backfill) map
directly to the equivalent fields on ``CreateEventRequestBody``.
"""

api_key: str = pydantic.Field()
body: typing.Optional[EventBody] = None
type: EventType = pydantic.Field()
idempotency_key: typing.Optional[str] = None
sent_at: typing.Optional[dt.datetime] = None
trusted_client_clock: typing.Optional[bool] = None
backfill: typing.Optional[bool] = None


class _CaptureBatchPayload(UniversalBaseModel):
events: typing.List[_CaptureEventPayload]


def _to_payload(event: CreateEventRequestBody, api_key: str) -> _CaptureEventPayload:
return _CaptureEventPayload(
api_key=api_key,
body=event.body,
type=event.event_type,
sent_at=event.sent_at,
)
# Build kwargs conditionally so unset optional fields stay unset on the
# model. The capture wire format uses `exclude_unset`-style semantics —
# we don't want to send `"idempotency_key": null` for events that didn't
# set one.
kwargs: typing.Dict[str, typing.Any] = {
"api_key": api_key,
"type": event.event_type,
}
if event.body is not None:
kwargs["body"] = event.body
if event.idempotency_key is not None:
kwargs["idempotency_key"] = event.idempotency_key
if event.sent_at is not None:
kwargs["sent_at"] = event.sent_at
if event.trusted_client_clock is not None:
kwargs["trusted_client_clock"] = event.trusted_client_clock
if event.backfill is not None:
kwargs["backfill"] = event.backfill
return _CaptureEventPayload(**kwargs)


def _build_endpoint(base_url: str) -> str:
Expand Down
Loading
Loading