Async Sockudo client SDK for Python.
sockudo-python is a Pusher-compatible realtime client for Python applications. It preserves the familiar subscribe/bind/channel model while adding Sockudo-native features such as filter-aware subscriptions, delta reconstruction, and encrypted channel handling.
- Protocol V2 by default, with V1 compatibility
- Public, private, presence, and encrypted channels
- Proxy-backed presence history and presence snapshot helpers
- Tag filter and per-subscription event filter helpers
- Continuity-aware connection recovery (
stream_id+serial) - Message deduplication
- JSON, MessagePack, and Protobuf wire formats
- Fossil and Xdelta3/VCDIFF delta compression support
- User sign-in and watchlist event handling
pip install sockudo-pythonUsing requirements.txt:
sockudo-python>=1.0.0
Using pyproject.toml:
[project]
dependencies = [
"sockudo-python>=1.0.0",
]For local development from this workspace:
pip install -e client-sdks/sockudo-pythonimport asyncio
from sockudo_python import SockudoClient, SockudoOptions
async def main() -> None:
client = SockudoClient(
"app-key",
SockudoOptions(
cluster="local",
force_tls=False,
ws_host="127.0.0.1",
ws_port=6001,
),
)
channel = client.subscribe("public-updates")
channel.bind("price-updated", lambda payload, meta: print(payload))
await client.connect()
await asyncio.sleep(30)
await client.disconnect()
asyncio.run(main())Use an endpoint URL (the default) or supply a fully custom async handler:
from sockudo_python import (
SockudoClient,
SockudoOptions,
ChannelAuthorizationOptions,
ChannelAuthorizationData,
ChannelAuthorizationRequest,
)
async def my_auth_handler(request: ChannelAuthorizationRequest) -> ChannelAuthorizationData:
# Call your own backend to produce a signed auth token.
return ChannelAuthorizationData(
auth="app-key:hmac-sha256-signature",
channel_data='{"user_id":"42"}',
)
client = SockudoClient(
"app-key",
SockudoOptions(
cluster="local",
ws_host="127.0.0.1",
ws_port=6001,
channel_authorization=ChannelAuthorizationOptions(
endpoint="https://api.example.com/sockudo/auth",
# Or override entirely:
custom_handler=my_auth_handler,
),
),
)
channel = client.subscribe("private-orders")
channel.bind("order-placed", lambda data, meta: print(data))
await client.connect()channel = client.subscribe("presence-lobby")
channel.bind(
"pusher:subscription_succeeded",
lambda data, meta: print("members:", data),
)
channel.bind(
"pusher:member_added",
lambda data, meta: print("joined:", data),
)
channel.bind(
"pusher:member_removed",
lambda data, meta: print("left:", data),
)
await client.connect()Client-side presence history is proxy-backed. The Python client does not sign the server REST API directly; configure a backend endpoint that accepts {channel, params, action} and proxies the request with server credentials.
from sockudo_python import PresenceHistoryOptions, PresenceHistoryParams, PresenceSnapshotParams
client = SockudoClient(
"app-key",
SockudoOptions(
cluster="local",
ws_host="127.0.0.1",
ws_port=6001,
presence_history=PresenceHistoryOptions(
endpoint="https://api.example.com/sockudo/presence-history",
),
),
)
channel = client.subscribe("presence-lobby")
page = await channel.history(
PresenceHistoryParams(limit=50, direction="newest_first")
)
if page.has_next():
next_page = await page.next()
snapshot = await channel.snapshot(PresenceSnapshotParams(at_serial=4))Server-side tag filtering is a V2 feature. Only messages whose tags match the filter expression are delivered to this subscription.
from sockudo_python import SubscriptionOptions, Filter
channel = client.subscribe(
"price:btc",
options=SubscriptionOptions(
filter=Filter.eq("market", "spot"),
),
)
# Compound filters
channel = client.subscribe(
"price:btc",
options=SubscriptionOptions(
filter=Filter.and_(
Filter.eq("market", "spot"),
Filter.gt("spread", "0"),
),
),
)Request delta-compressed delivery to reduce bandwidth for channels that carry frequently-updated payloads:
from sockudo_python import SubscriptionOptions, ChannelDeltaSettings, DeltaAlgorithm
channel = client.subscribe(
"orderbook:btc-usd",
options=SubscriptionOptions(
delta=ChannelDeltaSettings(
enabled=True,
algorithm=DeltaAlgorithm.XDELTA3,
),
),
)
channel.bind("snapshot", lambda data, meta: print(data))
channel = client.subscribe(
"market:btc",
options=SubscriptionOptions(
rewind=SubscriptionRewind.seconds_back(30),
),
)
client.bind("sockudo:resume_success", lambda data, _: print(data))
channel.bind("sockudo:rewind_complete", lambda data, _: print(data))private-encrypted-* channels decrypt payloads automatically using the shared_secret returned by your auth endpoint or custom handler.
channel = client.subscribe("private-encrypted-documents")
channel.bind("doc-updated", lambda data, meta: print(data)) # data is already decryptedYour auth handler must populate shared_secret in ChannelAuthorizationData:
async def encrypted_auth(request: ChannelAuthorizationRequest) -> ChannelAuthorizationData:
return ChannelAuthorizationData(
auth="app-key:hmac-sha256-signature",
shared_secret="base64-encoded-32-byte-secret",
)from sockudo_python import UserAuthenticationOptions
client = SockudoClient(
"app-key",
SockudoOptions(
cluster="local",
ws_host="127.0.0.1",
ws_port=6001,
user_authentication=UserAuthenticationOptions(
endpoint="https://api.example.com/sockudo/user-auth",
),
),
)
await client.connect()
await client.user.sign_in()Bind to connection state changes to react to connect, disconnect, and reconnect events:
def on_state_change(change) -> None:
print(f"connection: {change.previous} -> {change.current}")
client.connection.bind("state_change", on_state_change)
client.connection.bind("connected", lambda data, _: print("socket id:", data.get("socket_id")))
client.connection.bind("disconnected", lambda data, _: print("disconnected"))
client.connection.bind("error", lambda data, _: print("error:", data))
await client.connect()V2 is the default. To explicitly request it or to downgrade to V1 for strict Pusher SDK compatibility:
# V2 (default) — enables continuity tokens, message_id, recovery, filters, delta
client = SockudoClient("app-key", SockudoOptions(cluster="local", protocol_version=2))
# V1 — plain Pusher protocol, compatible with official Pusher SDKs
client = SockudoClient("app-key", SockudoOptions(cluster="local", protocol_version=1))- Python 3.11+
asyncio-based; designed for use withasync/await
Run the unit and integration test suite:
pytest client-sdks/sockudo-python/testsLive integration tests against a local Sockudo server on port 6001:
SOCKUDO_LIVE_TESTS=1 pytest client-sdks/sockudo-python/testsThe live suite covers:
- public subscribe + publish round-trip
- delta-enabled channel delivery
- encrypted channel decryption
GitHub Actions:
- CI:
.github/workflows/ci.yml - Publish:
.github/workflows/publish.yml
The package covers the core Sockudo feature set, including VCDIFF decoding, encrypted channel handling, and both supported delta algorithms, and is suitable for publishing as the official Python SDK.