diff --git a/MIGRATION.md b/MIGRATION.md index 30132a2..66af981 100644 --- a/MIGRATION.md +++ b/MIGRATION.md @@ -1,12 +1,14 @@ # Migration Guide -## 1.4.x -> 1.5.0 (REST client) +## 1.4.x -> 1.5.0 (REST / Stream clients) - `x10.perpetual.trading_client.PerpetualTradingClient` has been replaced with `x10.clients.rest.RestApiClient` (client has the same interface but new name reflects its purpose better). - Leftover models were migrated to `x10.models.*`. - Most of the dataclasses are immutable now. - `markets_info` module has been merged into `info` module. +- `x10.perpetual.stream_client.PerpetualStreamClient` has been replaced with + `x10.clients.stream.StreamClient` (same interface, renamed to match the `RestApiClient` naming convention). --- diff --git a/examples/cases/advanced/load_testing.py b/examples/cases/advanced/load_testing.py index fba89e8..29b724d 100644 --- a/examples/cases/advanced/load_testing.py +++ b/examples/cases/advanced/load_testing.py @@ -6,10 +6,10 @@ from examples.utils import BTC_USD_MARKET, create_rest_client from x10.clients.rest import RestApiClient +from x10.clients.stream import StreamClient from x10.models.market import MarketModel from x10.models.order import OrderSide from x10.perpetual.order_object import create_order_object -from x10.perpetual.stream_client.stream_client import PerpetualStreamClient LOGGER = logging.getLogger() MARKET_NAME = BTC_USD_MARKET @@ -58,7 +58,7 @@ async def create_orders_loop(*, rest_client: RestApiClient, market: MarketModel, async def order_confirmation_loop(*, stream_url: str, api_key: str): - stream_client = PerpetualStreamClient(api_url=stream_url) + stream_client = StreamClient(api_url=stream_url) async with stream_client.subscribe_to_account_updates(api_key) as account_stream: while not stop_event.is_set(): diff --git a/examples/utils.py b/examples/utils.py index 4e6abeb..977b0c3 100644 --- a/examples/utils.py +++ b/examples/utils.py @@ -11,11 +11,11 @@ from dotenv import load_dotenv from x10.clients.rest import RestApiClient +from x10.clients.stream import StreamClient from x10.config import TESTNET_CONFIG, Config from x10.core.stark_account import StarkPerpetualAccount from x10.models.market import TradingConfigModel from x10.perpetual.simple_client.simple_trading_client import BlockingTradingClient -from x10.perpetual.stream_client import PerpetualStreamClient from x10.utils.string import is_hex_string BTC_USD_MARKET = "BTC-USD" @@ -88,7 +88,7 @@ def create_blocking_client(config: Config = TESTNET_CONFIG): def create_stream_client(config: Config = TESTNET_CONFIG): - return PerpetualStreamClient(api_url=config.endpoints.stream_url) + return StreamClient(api_url=config.endpoints.stream_url) def get_adjust_price_by_pct(config: TradingConfigModel): diff --git a/tests/perpetual/test_stream_client.py b/tests/clients/test_stream_client.py similarity index 89% rename from tests/perpetual/test_stream_client.py rename to tests/clients/test_stream_client.py index ea4649f..f2db2e9 100644 --- a/tests/perpetual/test_stream_client.py +++ b/tests/clients/test_stream_client.py @@ -18,12 +18,12 @@ async def _serve_message(websocket): @pytest.mark.asyncio async def test_orderbook_stream(create_orderbook_message): - from x10.perpetual.stream_client import PerpetualStreamClient + from x10.clients.stream import StreamClient message_model = create_orderbook_message() async with websockets.serve(serve_message(message_model.model_dump_json()), "127.0.0.1", 0) as server: - stream_client = PerpetualStreamClient(api_url=get_url_from_server(server)) + stream_client = StreamClient(api_url=get_url_from_server(server)) stream = await stream_client.subscribe_to_orderbooks() msg = await stream.recv() await stream.close() @@ -48,13 +48,13 @@ async def test_orderbook_stream(create_orderbook_message): @pytest.mark.asyncio async def test_account_update_trade_stream(create_account_update_trade_message): - from x10.perpetual.stream_client import PerpetualStreamClient + from x10.clients.stream import StreamClient api_key = "dummy_api_key" message_model = create_account_update_trade_message() async with websockets.serve(serve_message(message_model.model_dump_json()), "127.0.0.1", 0) as server: - stream_client = PerpetualStreamClient(api_url=get_url_from_server(server)) + stream_client = StreamClient(api_url=get_url_from_server(server)) stream = await stream_client.subscribe_to_account_updates(api_key) msg = await stream.recv() await stream.close() @@ -95,13 +95,13 @@ async def test_account_update_trade_stream(create_account_update_trade_message): @pytest.mark.asyncio async def test_account_update_stream_with_unexpected_type(create_account_update_unknown_message): - from x10.perpetual.stream_client import PerpetualStreamClient + from x10.clients.stream import StreamClient api_key = "dummy_api_key" message_model = create_account_update_unknown_message() async with websockets.serve(serve_message(message_model.model_dump_json()), "127.0.0.1", 0) as server: - stream_client = PerpetualStreamClient(api_url=get_url_from_server(server)) + stream_client = StreamClient(api_url=get_url_from_server(server)) stream = await stream_client.subscribe_to_account_updates(api_key) msg = await stream.recv() await stream.close() @@ -123,12 +123,12 @@ async def test_account_update_stream_with_unexpected_type(create_account_update_ @pytest.mark.asyncio async def test_candle_stream(): from tests.fixtures.candle import create_candle_stream_message - from x10.perpetual.stream_client import PerpetualStreamClient + from x10.clients.stream import StreamClient message_model = create_candle_stream_message() async with websockets.serve(serve_message(message_model.model_dump_json()), "127.0.0.1", 0) as server: - stream_client = PerpetualStreamClient(api_url=get_url_from_server(server)) + stream_client = StreamClient(api_url=get_url_from_server(server)) stream = await stream_client.subscribe_to_candles("ETH-USD", "trades", "PT1M") msg = await stream.recv() await stream.close() diff --git a/tests/perpetual/test_trading_client.py b/tests/clients/test_trading_client.py similarity index 100% rename from tests/perpetual/test_trading_client.py rename to tests/clients/test_trading_client.py diff --git a/x10/clients/rest/rest_api_client.py b/x10/clients/rest/rest_api_client.py index a016ff1..570c71c 100644 --- a/x10/clients/rest/rest_api_client.py +++ b/x10/clients/rest/rest_api_client.py @@ -28,7 +28,7 @@ class RestApiClient: """ - X10 REST API Client. + Extended REST API Client. """ __markets: Dict[str, MarketModel] | None diff --git a/x10/clients/stream/__init__.py b/x10/clients/stream/__init__.py new file mode 100644 index 0000000..6f5498d --- /dev/null +++ b/x10/clients/stream/__init__.py @@ -0,0 +1,2 @@ +from x10.clients.stream.stream_client import StreamClient # noqa: F401 +from x10.clients.stream.stream_connection import StreamConnection # noqa: F401 diff --git a/x10/perpetual/stream_client/stream_client.py b/x10/clients/stream/stream_client.py similarity index 88% rename from x10/perpetual/stream_client/stream_client.py rename to x10/clients/stream/stream_client.py index 706a32c..2c6db1b 100644 --- a/x10/perpetual/stream_client/stream_client.py +++ b/x10/clients/stream/stream_client.py @@ -1,21 +1,18 @@ from typing import Dict, List, Optional, Type +from x10.clients.stream.stream_connection import StreamConnection, StreamMsgResponseType from x10.models.account import AccountStreamDataModel from x10.models.candle import CandleInterval, CandleModel, CandleType from x10.models.funding_rate import FundingRateModel from x10.models.http import WrappedStreamResponseModel from x10.models.orderbook import OrderbookUpdateModel from x10.models.trade import PublicTradeModel -from x10.perpetual.stream_client.perpetual_stream_connection import ( - PerpetualStreamConnection, - StreamMsgResponseType, -) from x10.utils.http import UrlQueryParam, get_url -class PerpetualStreamClient: +class StreamClient: """ - X10 Perpetual Stream Client for the X10 WebSocket v1. + Extended Stream (WebSocket) Client. """ __api_url: str @@ -80,5 +77,5 @@ def __connect( stream_url: str, msg_model_class: Type[StreamMsgResponseType], api_key: Optional[str] = None, - ) -> PerpetualStreamConnection[StreamMsgResponseType]: - return PerpetualStreamConnection(stream_url, msg_model_class, api_key) + ) -> StreamConnection[StreamMsgResponseType]: + return StreamConnection(stream_url, msg_model_class, api_key) diff --git a/x10/perpetual/stream_client/perpetual_stream_connection.py b/x10/clients/stream/stream_connection.py similarity index 93% rename from x10/perpetual/stream_client/perpetual_stream_connection.py rename to x10/clients/stream/stream_connection.py index f379a4b..b65af92 100644 --- a/x10/perpetual/stream_client/perpetual_stream_connection.py +++ b/x10/clients/stream/stream_connection.py @@ -13,11 +13,10 @@ StreamMsgResponseType = TypeVar("StreamMsgResponseType", bound=X10BaseModel) -class PerpetualStreamConnection(Generic[StreamMsgResponseType]): +class StreamConnection(Generic[StreamMsgResponseType]): __stream_url: str __msg_model_class: Type[StreamMsgResponseType] __api_key: Optional[str] - __msgs_count: int __websocket: Optional[WebSocketClientProtocol] def __init__( @@ -31,7 +30,6 @@ def __init__( self.__stream_url = stream_url self.__msg_model_class = msg_model_class self.__api_key = api_key - self.__msgs_count = 0 self.__websocket = None async def send(self, data): @@ -44,13 +42,11 @@ async def recv(self) -> StreamMsgResponseType: async def close(self): assert self.__websocket is not None + if not self.__websocket.closed: await self.__websocket.close() - LOGGER.debug("Stream closed: %s", self.__stream_url) - @property - def msgs_count(self): - return self.__msgs_count + LOGGER.debug("Stream closed: %s", self.__stream_url) @property def closed(self): @@ -66,6 +62,7 @@ async def __anext__(self) -> StreamMsgResponseType: if self.__websocket.closed: raise StopAsyncIteration + try: return await self.__receive() except websockets.ConnectionClosed: @@ -75,7 +72,6 @@ async def __receive(self) -> StreamMsgResponseType: assert self.__websocket is not None data = await self.__websocket.recv() - self.__msgs_count += 1 return self.__msg_model_class.model_validate_json(data) @@ -95,6 +91,10 @@ async def __aexit__( await self.close() async def __await_impl__(self): + await self.__connect() + return self + + async def __connect(self): extra_headers: dict[str, str] = { RequestHeader.USER_AGENT: USER_AGENT, } @@ -105,5 +105,3 @@ async def __await_impl__(self): self.__websocket = await websockets.connect(self.__stream_url, extra_headers=extra_headers) LOGGER.debug("Connected to stream: %s", self.__stream_url) - - return self diff --git a/x10/perpetual/orderbook.py b/x10/perpetual/orderbook.py index abe24f4..dc87a2c 100644 --- a/x10/perpetual/orderbook.py +++ b/x10/perpetual/orderbook.py @@ -1,21 +1,21 @@ import asyncio -import decimal from collections.abc import Awaitable from dataclasses import dataclass +from decimal import Decimal from typing import Callable, Iterable, Tuple from sortedcontainers import SortedDict +from x10.clients.stream import StreamClient from x10.config import Config from x10.models.http import StreamDataType from x10.models.orderbook import OrderbookUpdateModel -from x10.perpetual.stream_client.stream_client import PerpetualStreamClient @dataclass class OrderBookEntry: - price: decimal.Decimal - amount: decimal.Decimal + price: Decimal + amount: Decimal def __repr__(self) -> str: return f"OrderBookEntry(price={self.price}, amount={self.amount})" @@ -23,8 +23,8 @@ def __repr__(self) -> str: @dataclass(frozen=True) class ImpactDetails: - price: decimal.Decimal - amount: decimal.Decimal + price: Decimal + amount: Decimal class OrderBook: @@ -50,11 +50,11 @@ def __init__( best_bid_change_callback: Callable[[OrderBookEntry | None], Awaitable[None]] | None = None, depth: int | None = None, ) -> None: - self.__stream_client = PerpetualStreamClient(api_url=config.endpoints.stream_url) + self.__stream_client = StreamClient(api_url=config.endpoints.stream_url) self.__market_name = market_name self.__task: asyncio.Task | None = None - self._bid_prices: "SortedDict[decimal.Decimal, OrderBookEntry]" = SortedDict() # type: ignore - self._ask_prices: "SortedDict[decimal.Decimal, OrderBookEntry]" = SortedDict() # type: ignore + self._bid_prices: SortedDict = SortedDict() + self._ask_prices: SortedDict = SortedDict() self.best_ask_change_callback = best_ask_change_callback self.best_bid_change_callback = best_bid_change_callback self.depth = depth @@ -159,12 +159,10 @@ def best_ask(self) -> OrderBookEntry | None: except IndexError: return None - def __price_impact_notional( - self, notional: decimal.Decimal, levels: Iterable[Tuple[decimal.Decimal, OrderBookEntry]] - ): + def __price_impact_notional(self, notional: Decimal, levels: Iterable[Tuple[Decimal, OrderBookEntry]]): remaining_to_spend = notional - total_amount = decimal.Decimal(0) - weighted_sum = decimal.Decimal(0) + total_amount = Decimal(0) + weighted_sum = Decimal(0) for price, entry in levels: available_at_price = entry.amount amount_to_purchase = min(remaining_to_spend / price, available_at_price) @@ -183,10 +181,10 @@ def __price_impact_notional( average_price = weighted_sum / total_amount return ImpactDetails(price=average_price, amount=total_amount) - def __price_impact_qty(self, qty: decimal.Decimal, levels: Iterable[Tuple[decimal.Decimal, OrderBookEntry]]): + def __price_impact_qty(self, qty: Decimal, levels: Iterable[Tuple[Decimal, OrderBookEntry]]): remaining_qty = qty - total_amount = decimal.Decimal(0) - total_spent = decimal.Decimal(0) + total_amount = Decimal(0) + total_spent = Decimal(0) for price, entry in levels: available_at_price = entry.amount take = min(remaining_qty, available_at_price) @@ -203,7 +201,7 @@ def __price_impact_qty(self, qty: decimal.Decimal, levels: Iterable[Tuple[decima average_price = total_spent / total_amount return ImpactDetails(price=average_price, amount=total_amount) - def calculate_price_impact_notional(self, notional: decimal.Decimal, side: str) -> ImpactDetails | None: + def calculate_price_impact_notional(self, notional: Decimal, side: str) -> ImpactDetails | None: if notional <= 0: return None if side == "SELL": @@ -216,7 +214,7 @@ def calculate_price_impact_notional(self, notional: decimal.Decimal, side: str) return self.__price_impact_notional(notional, self._ask_prices.items()) return None - def calculate_price_impact_qty(self, qty: decimal.Decimal, side: str) -> ImpactDetails | None: + def calculate_price_impact_qty(self, qty: Decimal, side: str) -> ImpactDetails | None: if qty <= 0: return None if side == "SELL": diff --git a/x10/perpetual/simple_client/simple_trading_client.py b/x10/perpetual/simple_client/simple_trading_client.py index cfb39c1..00ef108 100644 --- a/x10/perpetual/simple_client/simple_trading_client.py +++ b/x10/perpetual/simple_client/simple_trading_client.py @@ -2,10 +2,11 @@ import time from dataclasses import dataclass from decimal import Decimal -from typing import Awaitable, Dict, Union, cast +from typing import Awaitable, Dict, Optional, cast from x10.clients.rest.modules.info_module import InfoModule from x10.clients.rest.modules.order_management_module import OrderManagementModule +from x10.clients.stream import StreamClient, StreamConnection from x10.config import Config from x10.core.stark_account import StarkPerpetualAccount from x10.errors import SdkError, ValidationError @@ -21,10 +22,6 @@ TimeInForce, ) from x10.perpetual.order_object import create_order_object -from x10.perpetual.stream_client.perpetual_stream_connection import ( - PerpetualStreamConnection, -) -from x10.perpetual.stream_client.stream_client import PerpetualStreamClient def condition_to_awaitable(condition: asyncio.Condition) -> Awaitable: @@ -85,15 +82,12 @@ def __init__(self, config: Config, account: StarkPerpetualAccount): self.__account = account self.__info_module = InfoModule(config, api_key=account.api_key) self.__orders_module = OrderManagementModule(config, api_key=account.api_key) - self.__markets: Union[None, Dict[str, MarketModel]] = None - self.__stream_client: PerpetualStreamClient = PerpetualStreamClient(api_url=config.endpoints.stream_url) - self.__account_stream: Union[ - None, - PerpetualStreamConnection[WrappedStreamResponseModel[AccountStreamDataModel]], - ] = None + self.__markets: Optional[Dict[str, MarketModel]] = None + self.__stream_client: StreamClient = StreamClient(api_url=config.endpoints.stream_url) + self.__account_stream: Optional[StreamConnection[WrappedStreamResponseModel[AccountStreamDataModel]]] = None self.__order_waiters: Dict[str, OrderWaiter] = {} self.__cancel_waiters: Dict[str, CancelWaiter] = {} - self.__stream_task = asyncio.create_task(self.___order_stream()) + self.__stream_task = asyncio.create_task(self.__order_stream()) @staticmethod async def create(config: Config, account: StarkPerpetualAccount) -> "BlockingTradingClient": @@ -130,7 +124,7 @@ async def __handle_order(self, order: OpenOrderModel): else: await self.__handle_update(order) - async def ___order_stream(self): + async def __order_stream(self): self.__account_stream = await self.__stream_client.subscribe_to_account_updates(self.__account.api_key) async for event in self.__account_stream: if not (event.data and event.data.orders): @@ -138,7 +132,7 @@ async def ___order_stream(self): for order in event.data.orders: await self.__handle_order(order) print("Order stream closed, reconnecting...") - await self.___order_stream() + await self.__order_stream() async def cancel_order(self, order_external_id: str) -> TimedCancel: awaitable: Awaitable diff --git a/x10/perpetual/stream_client/__init__.py b/x10/perpetual/stream_client/__init__.py deleted file mode 100644 index 8d2ec4a..0000000 --- a/x10/perpetual/stream_client/__init__.py +++ /dev/null @@ -1,3 +0,0 @@ -from x10.perpetual.stream_client.stream_client import ( # noqa: F401 - PerpetualStreamClient, -)