Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 0 additions & 41 deletions python/hyperstack-sdk/examples/basic_usage.py

This file was deleted.

14 changes: 13 additions & 1 deletion python/hyperstack-sdk/examples/custom_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,18 @@ class Game:
events: Optional[Dict[str, Any]] = None


class SettlementGame:
NAME = "SettlementGame"

@staticmethod
def state_view() -> str:
return "SettlementGame/state"

@staticmethod
def list_view() -> str:
return "SettlementGame/list"


def parse_game(data: Dict[str, Any]) -> Game:
id_data = data.get("id", {})
status_data = data.get("status", {})
Expand Down Expand Up @@ -70,7 +82,7 @@ def parse_game(data: Dict[str, Any]) -> Game:

async def main():
async with HyperStackClient("ws://localhost:8080") as client:
game_store = client.subscribe(view="SettlementGame/list", parser=parse_game)
game_store = client.watch(SettlementGame, parser=parse_game)

print(f"connected, watching {game_store.view}\n")

Expand Down
77 changes: 77 additions & 0 deletions python/hyperstack-sdk/examples/pumpfun.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import asyncio
from dataclasses import dataclass
from typing import Any, Dict, Optional

from hyperstack import HyperStackClient


@dataclass
class PumpfunTokenData:
mint: str
name: str
symbol: str
creator: Optional[str]
timestamp: Optional[int]


def parse_token(payload: Dict[str, Any]) -> Optional[PumpfunTokenData]:
info = payload.get("info") if isinstance(payload.get("info"), dict) else {}
token_id = payload.get("id") if isinstance(payload.get("id"), dict) else {}
events = payload.get("events") if isinstance(payload.get("events"), dict) else {}

name = info.get("name")
symbol = info.get("symbol")
mint = token_id.get("mint")

creator = None
timestamp = None
create_event = events.get("create")
if isinstance(create_event, dict):
name = name or create_event.get("name")
symbol = symbol or create_event.get("symbol")
mint = mint or create_event.get("mint")
creator = create_event.get("creator")
timestamp = create_event.get("timestamp")

if not mint or not name or not symbol:
return None

return PumpfunTokenData(
mint=mint,
name=name,
symbol=symbol,
creator=creator,
timestamp=timestamp,
)


async def main() -> None:
print("Connecting to Solana via Hyperstack...\n")
async with HyperStackClient(
"wss://pumpfun-token-rfx6zp.stack.usehyperstack.com"
) as client:
print("Connected! Streaming live pump.fun tokens:\n")
async for update in client.subscribe("PumpfunToken/list"):
if not isinstance(update.data, dict):
continue
token = parse_token(update.data)
if not token:
continue
print(f"New token: {token.name} ({token.symbol})")
print(f" Mint: {token.mint}")
if token.creator:
creator_short = (
f"{token.creator[:8]}..."
if len(token.creator) > 8
else token.creator
)
print(f" Creator: {creator_short}")
print("")


if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print("Keyboard interrupt received. Exiting gracefully.")

17 changes: 16 additions & 1 deletion python/hyperstack-sdk/hyperstack/__init__.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,21 @@
"""HyperStack Python SDK - Real-time data synchronization with authentication support."""

from hyperstack.client import HyperStackClient
from hyperstack.store import Store, Update
from hyperstack.store import Store, Update, SharedStore
from hyperstack.types import (
Entity,
StackDefinition,
ViewDef,
ViewGroup,
state_view,
list_view,
SortOrder,
SortConfig,
SubscribedFrame,
try_parse_subscribed_frame,
ConnectionState,
)
from hyperstack.views import TypedViews
from hyperstack.auth import (
AuthConfig,
AuthToken,
Expand All @@ -32,12 +39,20 @@
"HyperStackClient",
"Store",
"Update",
"SharedStore",
# Types
"Entity",
"StackDefinition",
"ViewDef",
"ViewGroup",
"state_view",
"list_view",
"SortOrder",
"SortConfig",
"SubscribedFrame",
"try_parse_subscribed_frame",
"ConnectionState",
"TypedViews",
# Auth
"AuthConfig",
"AuthToken",
Expand Down
115 changes: 79 additions & 36 deletions python/hyperstack-sdk/hyperstack/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,19 @@
import asyncio
import json
import logging
from typing import Dict, List, Optional, Callable

from hyperstack.websocket import WebSocketManager
from hyperstack.store import Store, Mode
from hyperstack.types import Subscription, Unsubscription, Frame
from typing import Any, Awaitable, Callable, Dict, List, Optional

from hyperstack.connection import ConnectionManager
from hyperstack.store import Store, Mode, SharedStore
from hyperstack.types import (
Subscription,
Unsubscription,
Frame,
Entity,
StackDefinition,
ConnectionState,
)
from hyperstack.views import create_typed_views, TypedViews
from hyperstack.auth import AuthConfig

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -55,10 +63,10 @@ def __init__(
url: str,
reconnect_intervals: Optional[List[int]] = None,
ping_interval: int = 15,
on_connect: Optional[Callable] = None,
on_disconnect: Optional[Callable] = None,
on_error: Optional[Callable] = None,
on_socket_issue: Optional[Callable[[dict], None]] = None,
on_connect: Optional[Callable[[], Awaitable[None]]] = None,
on_disconnect: Optional[Callable[[], Awaitable[None]]] = None,
on_error: Optional[Callable[[Exception], Awaitable[None]]] = None,
on_socket_issue: Optional[Callable[[dict], Awaitable[None]]] = None,
auth: Optional[AuthConfig] = None,
):
"""
Expand All @@ -76,11 +84,11 @@ def __init__(
auth: Optional authentication configuration. Required for hosted Hyperstack URLs.
"""
self.url = url
self._stores: Dict[str, Store] = {}
self._store = SharedStore()
self._pending_subs: List[Subscription] = []
self._active_subs: Dict[str, Subscription] = {}
self._user_on_connect = on_connect

self.ws_manager = WebSocketManager(
self.ws_manager = ConnectionManager(
url=url,
reconnect_intervals=reconnect_intervals,
ping_interval=ping_interval,
Expand All @@ -100,15 +108,15 @@ async def disconnect(self) -> None:
"""Disconnect from server."""
await self.ws_manager.disconnect()

async def __aenter__(self):
async def __aenter__(self) -> "HyperStackClient":
await self.connect()
return self

async def __aexit__(self, exc_type, exc_val, exc_tb):
async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
await self.disconnect()

def subscribe(
self, view: str, key: Optional[str] = None, parser: Optional[Callable] = None
self, view: str, key: Optional[str] = None, parser: Optional[Callable[[Dict[str, Any]], Any]] = None
) -> Store:
"""
Subscribe to updates for the specified view (and optional key) on the HyperStack server.
Expand All @@ -125,23 +133,58 @@ def subscribe(
raise ValueError(f"Invalid view '{view}'. Expected: Entity/mode")

mode = parse_mode(view)
store = Store(mode=mode, parser=parser, view=view)

store_key = f"{view}:{key or '*'}"
self._stores[store_key] = store
store = self._store.get_store(view, mode=mode, parser=parser)

sub = Subscription(view=view, key=key)
if self.ws_manager.is_running:
asyncio.create_task(self._send_sub(sub))
else:
self._pending_subs.append(sub)
sub_key = sub.sub_key()
if sub_key not in self._active_subs:
self._active_subs[sub_key] = sub
if self.ws_manager.is_running:
asyncio.create_task(self._send_sub(sub))
else:
self._pending_subs.append(sub)

return store

async def get(
self,
entity: Entity,
key: str,
parser: Optional[Callable[[Dict[str, Any]], Any]] = None,
timeout: Optional[float] = None,
) -> Optional[Dict]:
view = entity.state_view()
self.subscribe(view, key=key, parser=parser)
await self._store.wait_for_view_ready(view, timeout=timeout)
return await self._store.get(entity.state_view(), key)

async def list(
self,
entity: Entity,
parser: Optional[Callable[[Dict[str, Any]], Any]] = None,
timeout: Optional[float] = None,
) -> List:
view = entity.list_view()
self.subscribe(view, parser=parser)
await self._store.wait_for_view_ready(view, timeout=timeout)
return await self._store.list(entity.list_view())

def watch(self, entity: Entity, parser: Optional[Callable[[Dict[str, Any]], Any]] = None) -> Store:
return self.subscribe(entity.list_view(), parser=parser)

def watch_key(
self, entity: Entity, key: str, parser: Optional[Callable[[Dict[str, Any]], Any]] = None
) -> Store:
return self.subscribe(entity.list_view(), key=key, parser=parser)

def views(self, stack: StackDefinition) -> TypedViews:
return create_typed_views(stack, self)

async def _on_connect(self) -> None:
"""Send queued subscriptions on connect."""
while self._pending_subs:
await self._send_sub(self._pending_subs.pop(0))
for sub in self._active_subs.values():
await self._send_sub(sub)
self._pending_subs.clear()

if self._user_on_connect:
await self._user_on_connect()
Expand All @@ -159,8 +202,10 @@ async def _send_sub(self, sub: Subscription) -> None:

async def unsubscribe(self, view: str, key: Optional[str] = None) -> None:
"""Unsubscribe from a view."""
store_key = f"{view}:{key or '*'}"
self._stores.pop(store_key, None)
sub = Subscription(view=view, key=key)
sub_key = sub.sub_key()
self._active_subs.pop(sub_key, None)
self._pending_subs = [s for s in self._pending_subs if s.sub_key() != sub_key]

if not self.ws_manager.ws or not self.ws_manager.is_running:
return
Expand Down Expand Up @@ -191,15 +236,13 @@ async def _on_message(self, message) -> None:
logger.debug(
f"Frame: entity={frame.entity}, op={frame.op}, key={frame.key}"
)

view = frame.entity
store_keys = [f"{view}:{frame.key}", f"{view}:*"]

for store_key in store_keys:
store = self._stores.get(store_key)
if store:
logger.debug(f"Routing to: {store_key}")
await store.handle_frame(frame)
await self._store.apply_frame(frame)

except Exception as e:
logger.error(f"Message error: {e}", exc_info=True)

def store(self) -> SharedStore:
return self._store

def connection_state(self) -> ConnectionState:
return self.ws_manager.state()
Loading
Loading