From 64205eeeaaa3d474ffa80f26b189b6b11b1f2928 Mon Sep 17 00:00:00 2001 From: floor-licker Date: Thu, 29 Jan 2026 18:26:54 -0500 Subject: [PATCH 01/32] chore(cargo): fix manifest and dev dep - Remove duplicate package key in Cargo.toml - Add local polymarket-rs-client dev-dependency for examples --- Cargo.lock | 84 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ Cargo.toml | 5 ++-- 2 files changed, 86 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 322aa8f..711dfa4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2686,6 +2686,7 @@ dependencies = [ "hickory-resolver", "hmac", "mockito", + "polymarket-rs-client", "proptest", "rand 0.8.5", "reqwest", @@ -2705,6 +2706,28 @@ dependencies = [ "uuid", ] +[[package]] +name = "polymarket-rs-client" +version = "0.1.1" +dependencies = [ + "alloy-primitives", + "alloy-signer", + "alloy-signer-local", + "alloy-sol-types", + "anyhow", + "base64", + "hmac", + "rand 0.8.5", + "reqwest", + "rust_decimal", + "serde", + "serde-json-fmt", + "serde_json", + "sha1", + "sha2", + "ureq", +] + [[package]] name = "potential_utf" version = "0.1.4" @@ -3217,7 +3240,9 @@ version = "0.23.35" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "533f54bc6a7d4f647e46ad909549eda97bf5afc1585190ef692b4286b198bd8f" dependencies = [ + "log", "once_cell", + "ring", "rustls-pki-types", "rustls-webpki", "subtle", @@ -3369,6 +3394,17 @@ dependencies = [ "serde_derive", ] +[[package]] +name = "serde-json-fmt" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4a33b7a5f52a26d520099339add40c48baf2e5ada194c8cc1b18cafa2b5e419" +dependencies = [ + "serde", + "serde_json", + "smartstring", +] + [[package]] name = "serde_core" version = "1.0.228" @@ -3539,6 +3575,17 @@ dependencies = [ "serde", ] +[[package]] +name = "smartstring" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3fb72c633efbaa2dd666986505016c32c3044395ceaf881518399d2f4127ee29" +dependencies = [ + "autocfg", + "static_assertions", + "version_check", +] + [[package]] name = "socket2" version = "0.5.10" @@ -4112,6 +4159,25 @@ version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1" +[[package]] +name = "ureq" +version = "2.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "02d1a66277ed75f640d608235660df48c8e3c19f3b4edb6a263315626cc3c01d" +dependencies = [ + "base64", + "encoding_rs", + "flate2", + "log", + "once_cell", + "rustls", + "rustls-pki-types", + "serde", + "serde_json", + "url", + "webpki-roots 0.26.11", +] + [[package]] name = "url" version = "2.5.7" @@ -4302,6 +4368,24 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "webpki-roots" +version = "0.26.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "521bc38abb08001b01866da9f51eb7c5d647a19260e00054a8c7fd5f9e57f7a9" +dependencies = [ + "webpki-roots 1.0.5", +] + +[[package]] +name = "webpki-roots" +version = "1.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "12bed680863276c63889429bfd6cab3b99943659923822de1c8a39c49e4d722c" +dependencies = [ + "rustls-pki-types", +] + [[package]] name = "widestring" version = "1.2.1" diff --git a/Cargo.toml b/Cargo.toml index 6b24261..85cb726 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,7 +11,6 @@ keywords = ["polymarket", "trading", "prediction-market", "kalshi", "clob", "cry categories = ["api-bindings", "network-programming", "finance", "prediction-markets", "polymarket", "kalshi", "clob"] documentation = "https://docs.rs/polyfill-rs" homepage = "https://github.com/floor-licker/polyfill-rs" -license = "MIT OR Apache-2.0" [dependencies] # Pin base64ct to avoid Edition 2024 requirement base64ct = "=1.6.0" @@ -71,7 +70,7 @@ tokio-test = "0.4" mockito = "1.0" proptest = "1.0" env_logger = "0.10" -# polymarket-rs-client = { path = "external/polymarket-rs-client" } # Uncomment to run side-by-side benchmark +polymarket-rs-client = { path = "external/polymarket-rs-client" } [features] default = ["stream"] @@ -107,4 +106,4 @@ opt-level = 1 [profile.test] # Optimizations for test performance opt-level = 2 -debug = true \ No newline at end of file +debug = true From e28e1afa0b8a8f5d5d9acc80bdddb24ff057e19e Mon Sep 17 00:00:00 2001 From: floor-licker Date: Thu, 29 Jan 2026 18:28:42 -0500 Subject: [PATCH 02/32] fix(auth): only fallback derive on API errors Match upstream behavior: only retry derive-api-key when create-api-key fails with an HTTP status error. --- src/client.rs | 70 ++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 69 insertions(+), 1 deletion(-) diff --git a/src/client.rs b/src/client.rs index daaf4b1..227f5c1 100644 --- a/src/client.rs +++ b/src/client.rs @@ -541,7 +541,10 @@ impl ClobClient { pub async fn create_or_derive_api_key(&self, nonce: Option) -> Result { match self.create_api_key(nonce).await { Ok(creds) => Ok(creds), - Err(_) => self.derive_api_key(nonce).await, + // Only fall back to derive on API status errors (server responded). + // Propagate network/parse/internal errors so callers can handle them appropriately. + Err(PolyfillError::Api { .. }) => self.derive_api_key(nonce).await, + Err(err) => Err(err), } } @@ -2271,6 +2274,71 @@ mod tests { let api_creds = result.unwrap(); assert_eq!(api_creds.api_key, "test-api-key-123"); } + + #[tokio::test(flavor = "multi_thread")] + async fn test_create_or_derive_api_key_falls_back_on_api_error() { + let mut server = Server::new_async().await; + + // Create fails with a status error -> should fall back to derive. + let create_mock = server + .mock("POST", "/auth/api-key") + .with_status(400) + .with_header("content-type", "application/json") + .with_body(r#"{"error":"key exists"}"#) + .create_async() + .await; + + let derive_mock = server + .mock("GET", "/auth/derive-api-key") + .with_status(200) + .with_header("content-type", "application/json") + .with_body( + r#"{"apiKey":"derived-api-key","secret":"derived-secret","passphrase":"derived-pass"}"#, + ) + .create_async() + .await; + + let client = create_test_client_with_auth(&server.url()); + let result = client.create_or_derive_api_key(None).await; + + create_mock.assert_async().await; + derive_mock.assert_async().await; + assert!(result.is_ok()); + assert_eq!(result.unwrap().api_key, "derived-api-key"); + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_create_or_derive_api_key_does_not_fallback_on_non_api_error() { + let mut server = Server::new_async().await; + + // Create returns 200 but with invalid JSON -> not an API status error. + let create_mock = server + .mock("POST", "/auth/api-key") + .with_status(200) + .with_header("content-type", "application/json") + .with_body("not-json") + .create_async() + .await; + + // If we incorrectly fall back, this would be called. + let derive_mock = server + .mock("GET", "/auth/derive-api-key") + .with_status(200) + .with_header("content-type", "application/json") + .with_body( + r#"{"apiKey":"derived-api-key","secret":"derived-secret","passphrase":"derived-pass"}"#, + ) + .expect(0) + .create_async() + .await; + + let client = create_test_client_with_auth(&server.url()); + let result = client.create_or_derive_api_key(None).await; + + create_mock.assert_async().await; + derive_mock.assert_async().await; + assert!(result.is_err()); + } #[tokio::test(flavor = "multi_thread")] async fn test_get_order_books_batch() { let mut server = Server::new_async().await; From 703bd9f509537cddf1d81d7c1f3cd7b081819b73 Mon Sep 17 00:00:00 2001 From: floor-licker Date: Thu, 29 Jan 2026 18:33:27 -0500 Subject: [PATCH 03/32] feat(book): align /book response fields --- src/client.rs | 16 +++++++++++-- src/decode.rs | 65 +++++++++++++++++++++++++++++++++++++++++++++++++++ src/types.rs | 15 ++++++++++-- 3 files changed, 92 insertions(+), 4 deletions(-) diff --git a/src/client.rs b/src/client.rs index 227f5c1..8e41290 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1914,7 +1914,11 @@ mod tests { ], "asks": [ {"price": "0.76", "size": "50.0"} - ] + ], + "min_order_size": "1", + "neg_risk": false, + "tick_size": "0.01", + "last_trade_price": "0.755" }"#; let mock = server @@ -1935,6 +1939,10 @@ mod tests { assert_eq!(book.market, "0x123"); assert_eq!(book.bids.len(), 1); assert_eq!(book.asks.len(), 1); + assert_eq!(book.min_order_size, Decimal::from_str("1").unwrap()); + assert!(!book.neg_risk); + assert_eq!(book.tick_size, Decimal::from_str("0.01").unwrap()); + assert_eq!(book.last_trade_price, Some(Decimal::from_str("0.755").unwrap())); } #[tokio::test(flavor = "multi_thread")] @@ -2349,7 +2357,11 @@ mod tests { "hash": "test-hash", "timestamp": "1234567890", "bids": [{"price": "0.75", "size": "100.0"}], - "asks": [{"price": "0.76", "size": "50.0"}] + "asks": [{"price": "0.76", "size": "50.0"}], + "min_order_size": "1", + "neg_risk": false, + "tick_size": "0.01", + "last_trade_price": null } ]"#; diff --git a/src/decode.rs b/src/decode.rs index c7c8070..aabbc73 100644 --- a/src/decode.rs +++ b/src/decode.rs @@ -108,6 +108,71 @@ pub mod deserializers { None => Ok(None), } } + + /// Deserialize a vec that may be `null` (treat `null` as empty vec). + pub fn vec_from_null<'de, D, T>(deserializer: D) -> std::result::Result, D::Error> + where + D: Deserializer<'de>, + T: serde::Deserialize<'de>, + { + Ok(Option::>::deserialize(deserializer)?.unwrap_or_default()) + } + + /// Deserialize an optional Decimal from string/number/null. + /// + /// - `null` => `None` + /// - `""` => `None` + /// - invalid values => error + pub fn optional_decimal_from_string<'de, D>( + deserializer: D, + ) -> std::result::Result, D::Error> + where + D: Deserializer<'de>, + { + let value = serde_json::Value::deserialize(deserializer)?; + match value { + serde_json::Value::Null => Ok(None), + serde_json::Value::String(s) => { + let s = s.trim(); + if s.is_empty() { + Ok(None) + } else { + s.parse::() + .map(Some) + .map_err(serde::de::Error::custom) + } + } + serde_json::Value::Number(n) => Decimal::from_str(&n.to_string()) + .map(Some) + .map_err(serde::de::Error::custom), + other => Err(serde::de::Error::custom(format!( + "Expected decimal as string/number/null, got {other}" + ))), + } + } + + /// Like `optional_decimal_from_string`, but returns `None` on parse errors. + pub fn optional_decimal_from_string_default_on_error<'de, D>( + deserializer: D, + ) -> std::result::Result, D::Error> + where + D: Deserializer<'de>, + { + let value = serde_json::Value::deserialize(deserializer)?; + match value { + serde_json::Value::Null => Ok(None), + serde_json::Value::String(s) => { + let s = s.trim(); + if s.is_empty() { + Ok(None) + } else { + Ok(s.parse::().ok()) + } + } + serde_json::Value::Number(n) => Ok(Decimal::from_str(&n.to_string()).ok()), + _ => Ok(None), + } + } } /// Raw API response types for efficient parsing diff --git a/src/types.rs b/src/types.rs index 79d3fa9..ca6041c 100644 --- a/src/types.rs +++ b/src/types.rs @@ -1024,14 +1024,25 @@ pub struct BookParams { pub struct OrderBookSummary { pub market: String, pub asset_id: String, - pub hash: String, + #[serde(default)] + pub hash: Option, #[serde(deserialize_with = "crate::decode::deserializers::number_from_string")] pub timestamp: u64, + #[serde(default, deserialize_with = "crate::decode::deserializers::vec_from_null")] pub bids: Vec, + #[serde(default, deserialize_with = "crate::decode::deserializers::vec_from_null")] pub asks: Vec, + pub min_order_size: Decimal, + pub neg_risk: bool, + pub tick_size: Decimal, + #[serde( + default, + deserialize_with = "crate::decode::deserializers::optional_decimal_from_string_default_on_error" + )] + pub last_trade_price: Option, } -#[derive(Debug, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct OrderSummary { #[serde(with = "rust_decimal::serde::str")] pub price: Decimal, From a7ca8e43418838b219634f5a5bdde67e0ad1ad21 Mon Sep 17 00:00:00 2001 From: floor-licker Date: Thu, 29 Jan 2026 18:35:02 -0500 Subject: [PATCH 04/32] feat(ws): align auth and event_type parsing --- examples/demo.rs | 107 +++++++++--------- examples/snipe.rs | 122 ++++++++++++++------- src/decode.rs | 93 +++++++++------- src/stream.rs | 221 ++++++++++++------------------------- src/types.rs | 274 ++++++++++++++++++++++++++++++++++++++++------ 5 files changed, 509 insertions(+), 308 deletions(-) diff --git a/examples/demo.rs b/examples/demo.rs index c53be6d..d505e81 100644 --- a/examples/demo.rs +++ b/examples/demo.rs @@ -26,7 +26,7 @@ use polyfill_rs::{ types::*, // Utility functions - utils::{address, crypto, math, rate_limit, retry, time, url}, + utils::{address, math, rate_limit, retry, time, url}, // Configuration ClientConfig, @@ -43,7 +43,7 @@ use rust_decimal::Decimal; use rust_decimal_macros::dec; use std::time::Duration; use tokio::time::sleep; -use tracing::{debug, error, info}; +use tracing::{error, info}; /// Demo showcasing polyfill-rs functionality #[allow(dead_code)] @@ -592,10 +592,9 @@ impl PolyfillDemo { initial_dump: Some(true), custom_feature_enabled: None, auth: Some(WssAuth { - address: "0x1234567890123456789012345678901234567890".to_string(), - signature: "mock_signature".to_string(), - timestamp: time::now_secs(), - nonce: crypto::generate_nonce().to_string(), + api_key: "test-api-key".to_string(), + secret: "test-secret".to_string(), + passphrase: "test-passphrase".to_string(), }), }; @@ -603,33 +602,33 @@ impl PolyfillDemo { // Simulate receiving stream messages let messages = vec![ - StreamMessage::Heartbeat { - timestamp: chrono::Utc::now(), - }, - StreamMessage::BookUpdate { - data: OrderDelta { - token_id: "12345".to_string(), - timestamp: chrono::Utc::now(), - side: Side::BUY, + StreamMessage::Book(BookUpdate { + asset_id: "12345".to_string(), + market: "market1".to_string(), + timestamp: time::now_millis(), + bids: vec![OrderSummary { price: dec!(0.75), size: dec!(100.0), - sequence: 1, - }, - }, - StreamMessage::Trade { - data: FillEvent { - id: "fill1".to_string(), - order_id: "order1".to_string(), - token_id: "12345".to_string(), - side: Side::BUY, - price: dec!(0.75), + }], + asks: vec![OrderSummary { + price: dec!(0.76), size: dec!(50.0), - timestamp: chrono::Utc::now(), - maker_address: alloy_primitives::Address::ZERO, - taker_address: alloy_primitives::Address::ZERO, - fee: dec!(0.375), - }, - }, + }], + hash: None, + }), + StreamMessage::Trade(TradeMessage { + id: "fill1".to_string(), + market: "market1".to_string(), + asset_id: "12345".to_string(), + side: Side::BUY, + size: dec!(50.0), + price: dec!(0.75), + status: Some("MATCHED".to_string()), + msg_type: None, + last_update: None, + matchtime: None, + timestamp: None, + }), ]; for message in messages { @@ -638,31 +637,41 @@ impl PolyfillDemo { // Process message based on type match &message { - StreamMessage::BookUpdate { data } => { - info!(" Processing book update for token: {}", data.token_id); - if let Err(e) = self.book_manager.apply_delta(data.clone()) { - error!(" Failed to apply book update: {}", e); - self.stats.errors += 1; + StreamMessage::Book(book) => { + info!(" Processing book update for asset: {}", book.asset_id); + // This is a demo: apply snapshot levels as deltas. + for level in &book.bids { + let _ = self.book_manager.apply_delta(OrderDelta { + token_id: book.asset_id.clone(), + timestamp: chrono::Utc::now(), + side: Side::BUY, + price: level.price, + size: level.size, + sequence: book.timestamp, + }); } - }, - StreamMessage::Trade { data } => { + for level in &book.asks { + let _ = self.book_manager.apply_delta(OrderDelta { + token_id: book.asset_id.clone(), + timestamp: chrono::Utc::now(), + side: Side::SELL, + price: level.price, + size: level.size, + sequence: book.timestamp, + }); + } + } + StreamMessage::Trade(trade) => { info!( " Processing trade: {} {} @ {}", - data.side.as_str(), - data.size, - data.price + trade.side.as_str(), + trade.size, + trade.price ); - if let Err(e) = self.fill_processor.process_fill(data.clone()) { - error!(" Failed to process fill: {}", e); - self.stats.errors += 1; - } - }, - StreamMessage::Heartbeat { timestamp } => { - debug!(" Received heartbeat at: {}", timestamp); - }, + } _ => { info!(" Unhandled message type"); - }, + } } } diff --git a/examples/snipe.rs b/examples/snipe.rs index 4c8abb7..4fdd583 100644 --- a/examples/snipe.rs +++ b/examples/snipe.rs @@ -101,43 +101,86 @@ impl SnipeStrategy { /// Process a market data update pub fn process_update(&mut self, message: StreamMessage) -> Result<()> { match message { - StreamMessage::BookUpdate { data } => { - if data.token_id == self.token_id { - self.process_book_update(data)?; + StreamMessage::Book(book) => { + if book.asset_id == self.token_id { + self.process_book_update(book)?; } }, - StreamMessage::Trade { data } => { - if data.token_id == self.token_id { - self.process_trade(data)?; + StreamMessage::Trade(trade) => { + if trade.asset_id == self.token_id { + self.process_trade(trade)?; } }, - StreamMessage::Heartbeat { timestamp: _ } => { - self.check_stale_quotes()?; - }, _ => {}, } + + // Opportunistically check for staleness on any incoming update. + self.check_stale_quotes()?; Ok(()) } /// Process order book update - fn process_book_update(&mut self, delta: OrderDelta) -> Result<()> { + fn process_book_update(&mut self, book: BookUpdate) -> Result<()> { // Ensure book exists self.book_manager.get_or_create_book(&self.token_id)?; - // Update local order book - self.book_manager.apply_delta(delta.clone())?; + // Clear the existing book and rebuild from the snapshot. + if let Ok(current) = self.book_manager.get_book(&self.token_id) { + for level in ¤t.bids { + let _ = self.book_manager.apply_delta(OrderDelta { + token_id: self.token_id.clone(), + timestamp: chrono::Utc::now(), + side: Side::BUY, + price: level.price, + size: Decimal::ZERO, + sequence: book.timestamp, + }); + } + + for level in ¤t.asks { + let _ = self.book_manager.apply_delta(OrderDelta { + token_id: self.token_id.clone(), + timestamp: chrono::Utc::now(), + side: Side::SELL, + price: level.price, + size: Decimal::ZERO, + sequence: book.timestamp, + }); + } + } - // Get current book state - let book = self.book_manager.get_book(&self.token_id)?; + let ts = chrono::DateTime::from_timestamp( + (book.timestamp / 1000) as i64, + ((book.timestamp % 1000) * 1_000_000) as u32, + ) + .unwrap_or_else(chrono::Utc::now); - // Update best prices - if let Some(best_bid) = book.bids.first() { - self.last_best_bid = Some(best_bid.price); + for level in &book.bids { + let _ = self.book_manager.apply_delta(OrderDelta { + token_id: self.token_id.clone(), + timestamp: ts, + side: Side::BUY, + price: level.price, + size: level.size, + sequence: book.timestamp, + }); } - if let Some(best_ask) = book.asks.first() { - self.last_best_ask = Some(best_ask.price); + + for level in &book.asks { + let _ = self.book_manager.apply_delta(OrderDelta { + token_id: self.token_id.clone(), + timestamp: ts, + side: Side::SELL, + price: level.price, + size: level.size, + sequence: book.timestamp, + }); } + // Update best prices directly from the snapshot + self.last_best_bid = book.bids.first().map(|l| l.price); + self.last_best_ask = book.asks.first().map(|l| l.price); + self.last_update = time::now_secs(); // Check for trading opportunities @@ -147,17 +190,17 @@ impl SnipeStrategy { } /// Process trade update - fn process_trade(&mut self, fill: FillEvent) -> Result<()> { + fn process_trade(&mut self, trade: TradeMessage) -> Result<()> { info!( "Trade: {} {} @ {} (size: {})", - fill.side.as_str(), - fill.token_id, - fill.price, - fill.size + trade.side.as_str(), + trade.asset_id, + trade.price, + trade.size ); // Update statistics - self.stats.total_volume += fill.size; + self.stats.total_volume += trade.size; // Calculate P&L if this was our trade // (In a real implementation, you'd track your own orders) @@ -327,24 +370,19 @@ impl MockMarketData { let price_change = random_factor * Decimal::from(2) * self.volatility; let new_price = self.base_price * (Decimal::from(1) + price_change); - // Generate order book update - let side = if rand::random::() { - Side::BUY - } else { - Side::SELL - }; + // Generate a simple orderbook snapshot update let size = Decimal::from(rand::random::() % 1000 + 100); - - StreamMessage::BookUpdate { - data: OrderDelta { - token_id: self.token_id.clone(), - timestamp: chrono::Utc::now(), - side, - price: new_price, - size, - sequence: self.sequence, - }, - } + let bid = new_price - dec!(0.01); + let ask = new_price + dec!(0.01); + + StreamMessage::Book(BookUpdate { + asset_id: self.token_id.clone(), + market: "0xmock".to_string(), + timestamp: time::now_millis(), + bids: vec![OrderSummary { price: bid, size }], + asks: vec![OrderSummary { price: ask, size }], + hash: None, + }) } } diff --git a/src/decode.rs b/src/decode.rs index aabbc73..18c2715 100644 --- a/src/decode.rs +++ b/src/decode.rs @@ -432,44 +432,61 @@ impl Decoder for RawMarketResponse { } } -/// WebSocket message parsing -pub fn parse_stream_message(raw: &str) -> Result { - let value: Value = serde_json::from_str(raw)?; - - let msg_type = value["type"] - .as_str() - .ok_or_else(|| PolyfillError::parse("Missing message type".to_string(), None))?; - - match msg_type { - "book_update" => { - let data = value["data"].clone(); - let delta: OrderDelta = serde_json::from_value(data)?; - Ok(StreamMessage::BookUpdate { data: delta }) - }, - "trade" => { - let data = value["data"].clone(); - let raw_trade: RawTradeResponse = serde_json::from_value(data)?; - let fill = raw_trade.decode()?; - Ok(StreamMessage::Trade { data: fill }) - }, - "order_update" => { - let data = value["data"].clone(); - let raw_order: RawOrderResponse = serde_json::from_value(data)?; - let order = raw_order.decode()?; - Ok(StreamMessage::OrderUpdate { data: order }) - }, - "heartbeat" => { - let timestamp = value["timestamp"] - .as_str() - .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok()) - .map(|dt| dt.with_timezone(&Utc)) - .unwrap_or_else(Utc::now); - Ok(StreamMessage::Heartbeat { timestamp }) - }, - _ => Err(PolyfillError::parse( - format!("Unknown message type: {}", msg_type), - None, - )), +/// WebSocket message parsing (official `event_type` shape). +/// +/// Polymarket WebSocket servers may send either a single JSON object or a batch array. +/// This parser is tolerant: +/// - Unknown/unsupported `event_type`s are ignored. +/// - Invalid entries inside a batch are skipped (do not fail the whole batch). +pub fn parse_stream_messages(raw: &str) -> Result> { + parse_stream_messages_bytes(raw.as_bytes()) +} + +/// See `parse_stream_messages`. +pub fn parse_stream_messages_bytes(bytes: &[u8]) -> Result> { + let value: Value = serde_json::from_slice(bytes)?; + + match value { + Value::Object(map) => { + let event_type = map.get("event_type").and_then(Value::as_str); + match event_type { + None => Ok(vec![]), + Some(_) => { + let msg: StreamMessage = serde_json::from_value(Value::Object(map))?; + match msg { + StreamMessage::Unknown => Ok(vec![]), + other => Ok(vec![other]), + } + } + } + } + Value::Array(arr) => Ok(arr + .into_iter() + .filter_map(|elem| { + let obj = elem.as_object()?; + let event_type = obj.get("event_type").and_then(Value::as_str)?; + // Skip unknown event types early (forward compatibility). + match event_type { + "book" + | "price_change" + | "tick_size_change" + | "last_trade_price" + | "best_bid_ask" + | "new_market" + | "market_resolved" + | "trade" + | "order" => {} + _ => return None, + } + + match serde_json::from_value::(Value::Object(obj.clone())) { + Ok(StreamMessage::Unknown) => None, + Ok(msg) => Some(msg), + Err(_) => None, + } + }) + .collect()), + _ => Ok(vec![]), } } diff --git a/src/stream.rs b/src/stream.rs index 19b7342..dbca228 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -274,12 +274,12 @@ impl WebSocketStream { tokio_tungstenite::tungstenite::Message::Text(text) => { debug!("Received WebSocket message: {}", text); - // Parse the message according to Polymarket's format - let stream_message = self.parse_polymarket_message(&text)?; - - // Send to internal channel - if let Err(e) = self.tx.send(stream_message) { - error!("Failed to send message to internal channel: {}", e); + // Parse the message according to Polymarket's `event_type` format + let stream_messages = crate::decode::parse_stream_messages(&text)?; + for stream_message in stream_messages { + if let Err(e) = self.tx.send(stream_message) { + error!("Failed to send message to internal channel: {}", e); + } } self.stats.messages_received += 1; @@ -313,115 +313,10 @@ impl WebSocketStream { Ok(()) } - /// Parse Polymarket WebSocket message format + /// Parse Polymarket WebSocket message(s) in `event_type` format. #[allow(dead_code)] - fn parse_polymarket_message(&self, text: &str) -> Result { - let value: Value = serde_json::from_str(text).map_err(|e| { - PolyfillError::parse( - format!("Failed to parse WebSocket message: {}", e), - Some(Box::new(e)), - ) - })?; - - // Extract message type - let message_type = value.get("type").and_then(|v| v.as_str()).ok_or_else(|| { - PolyfillError::parse("Missing 'type' field in WebSocket message", None) - })?; - - match message_type { - "book_update" => { - let data = - serde_json::from_value(value.get("data").unwrap_or(&Value::Null).clone()) - .map_err(|e| { - PolyfillError::parse( - format!("Failed to parse book update: {}", e), - Some(Box::new(e)), - ) - })?; - Ok(StreamMessage::BookUpdate { data }) - }, - "trade" => { - let data = - serde_json::from_value(value.get("data").unwrap_or(&Value::Null).clone()) - .map_err(|e| { - PolyfillError::parse( - format!("Failed to parse trade: {}", e), - Some(Box::new(e)), - ) - })?; - Ok(StreamMessage::Trade { data }) - }, - "order_update" => { - let data = - serde_json::from_value(value.get("data").unwrap_or(&Value::Null).clone()) - .map_err(|e| { - PolyfillError::parse( - format!("Failed to parse order update: {}", e), - Some(Box::new(e)), - ) - })?; - Ok(StreamMessage::OrderUpdate { data }) - }, - "user_order_update" => { - let data = - serde_json::from_value(value.get("data").unwrap_or(&Value::Null).clone()) - .map_err(|e| { - PolyfillError::parse( - format!("Failed to parse user order update: {}", e), - Some(Box::new(e)), - ) - })?; - Ok(StreamMessage::UserOrderUpdate { data }) - }, - "user_trade" => { - let data = - serde_json::from_value(value.get("data").unwrap_or(&Value::Null).clone()) - .map_err(|e| { - PolyfillError::parse( - format!("Failed to parse user trade: {}", e), - Some(Box::new(e)), - ) - })?; - Ok(StreamMessage::UserTrade { data }) - }, - "market_book_update" => { - let data = - serde_json::from_value(value.get("data").unwrap_or(&Value::Null).clone()) - .map_err(|e| { - PolyfillError::parse( - format!("Failed to parse market book update: {}", e), - Some(Box::new(e)), - ) - })?; - Ok(StreamMessage::MarketBookUpdate { data }) - }, - "market_trade" => { - let data = - serde_json::from_value(value.get("data").unwrap_or(&Value::Null).clone()) - .map_err(|e| { - PolyfillError::parse( - format!("Failed to parse market trade: {}", e), - Some(Box::new(e)), - ) - })?; - Ok(StreamMessage::MarketTrade { data }) - }, - "heartbeat" => { - let timestamp = value - .get("timestamp") - .and_then(|v| v.as_u64()) - .map(|ts| chrono::DateTime::from_timestamp(ts as i64, 0).unwrap_or_default()) - .unwrap_or_else(Utc::now); - Ok(StreamMessage::Heartbeat { timestamp }) - }, - _ => { - warn!("Unknown message type: {}", message_type); - // Return heartbeat as fallback - Ok(StreamMessage::Heartbeat { - timestamp: Utc::now(), - }) - }, - } + fn parse_polymarket_messages(&self, text: &str) -> Result> { + crate::decode::parse_stream_messages(text) } /// Reconnect with exponential backoff @@ -476,33 +371,58 @@ impl Stream for WebSocketStream { type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - // First check internal channel - if let Poll::Ready(Some(message)) = self.rx.poll_recv(cx) { - return Poll::Ready(Some(Ok(message))); - } + loop { + // First drain any parsed messages + if let Poll::Ready(Some(message)) = self.rx.poll_recv(cx) { + return Poll::Ready(Some(Ok(message))); + } + + let Some(connection) = &mut self.connection else { + return Poll::Ready(None); + }; - // Then check WebSocket connection - if let Some(connection) = &mut self.connection { match connection.poll_next_unpin(cx) { - Poll::Ready(Some(Ok(_message))) => { - // Simplified message handling - Poll::Ready(Some(Ok(StreamMessage::Heartbeat { - timestamp: Utc::now(), - }))) + Poll::Pending => return Poll::Pending, + Poll::Ready(Some(Ok(ws_message))) => match ws_message { + tokio_tungstenite::tungstenite::Message::Text(text) => { + match crate::decode::parse_stream_messages(&text) { + Ok(messages) => { + for msg in messages { + let _ = self.tx.send(msg); + } + self.stats.messages_received += 1; + self.stats.last_message_time = Some(Utc::now()); + continue; + } + Err(e) => { + self.stats.errors += 1; + return Poll::Ready(Some(Err(e))); + } + } + } + tokio_tungstenite::tungstenite::Message::Close(_) => { + info!("WebSocket connection closed by server"); + self.connection = None; + return Poll::Ready(None); + } + tokio_tungstenite::tungstenite::Message::Ping(_) => { + // Best-effort: tokio-tungstenite/tungstenite may handle pings internally. + continue; + } + tokio_tungstenite::tungstenite::Message::Pong(_) => continue, + tokio_tungstenite::tungstenite::Message::Binary(_) => continue, + tokio_tungstenite::tungstenite::Message::Frame(_) => continue, }, Poll::Ready(Some(Err(e))) => { error!("WebSocket error: {}", e); self.stats.errors += 1; - Poll::Ready(Some(Err(e.into()))) - }, + return Poll::Ready(Some(Err(e.into()))); + } Poll::Ready(None) => { info!("WebSocket stream ended"); - Poll::Ready(None) - }, - Poll::Pending => Poll::Pending, + return Poll::Ready(None); + } } - } else { - Poll::Ready(None) } } } @@ -655,19 +575,19 @@ mod tests { let mut stream = MockStream::new(); // Add some test messages - stream.add_message(StreamMessage::Heartbeat { - timestamp: Utc::now(), - }); - stream.add_message(StreamMessage::BookUpdate { - data: OrderDelta { - token_id: "test".to_string(), - timestamp: Utc::now(), - side: Side::BUY, - price: rust_decimal_macros::dec!(0.5), - size: rust_decimal_macros::dec!(100), - sequence: 1, - }, - }); + stream.add_message(StreamMessage::Book(BookUpdate { + asset_id: "1".to_string(), + market: "0xabc".to_string(), + timestamp: 1_234_567_890, + bids: vec![], + asks: vec![], + hash: None, + })); + stream.add_message(StreamMessage::PriceChange(PriceChange { + market: "0xabc".to_string(), + timestamp: 1_234_567_891, + price_changes: vec![], + })); assert!(stream.is_connected()); assert_eq!(stream.get_stats().messages_received, 2); @@ -680,9 +600,14 @@ mod tests { manager.add_stream(mock_stream); // Test message broadcasting - let message = StreamMessage::Heartbeat { - timestamp: Utc::now(), - }; + let message = StreamMessage::Book(BookUpdate { + asset_id: "1".to_string(), + market: "0xabc".to_string(), + timestamp: 1_234_567_890, + bids: vec![], + asks: vec![], + hash: None, + }); assert!(manager.broadcast_message(message).is_ok()); } } diff --git a/src/types.rs b/src/types.rs index ca6041c..bd6ed89 100644 --- a/src/types.rs +++ b/src/types.rs @@ -659,18 +659,11 @@ impl Default for ClientConfig { } } -/// WebSocket authentication for Polymarket API -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct WssAuth { - /// User's Ethereum address - pub address: String, - /// EIP-712 signature - pub signature: String, - /// Unix timestamp - pub timestamp: u64, - /// Nonce for replay protection - pub nonce: String, -} +/// WebSocket authentication for Polymarket API user channel. +/// +/// Polymarket's CLOB WebSocket expects the same L2 API credentials used for HTTP calls: +/// `{ apiKey, secret, passphrase }`. +pub type WssAuth = ApiCredentials; /// WebSocket subscription request #[derive(Debug, Clone, Serialize, Deserialize)] @@ -699,28 +692,247 @@ pub struct WssSubscription { pub auth: Option, } -/// WebSocket message types for streaming +/// WebSocket message types for streaming (official Polymarket `event_type` format). #[derive(Debug, Clone, Serialize, Deserialize)] -#[serde(tag = "type")] +#[serde(tag = "event_type")] pub enum StreamMessage { - #[serde(rename = "book_update")] - BookUpdate { data: OrderDelta }, + /// Full or incremental orderbook update + #[serde(rename = "book")] + Book(BookUpdate), + /// Price change notification (single or batched) + #[serde(rename = "price_change")] + PriceChange(PriceChange), + /// Tick size change notification + #[serde(rename = "tick_size_change")] + TickSizeChange(TickSizeChange), + /// Last trade price update + #[serde(rename = "last_trade_price")] + LastTradePrice(LastTradePrice), + /// Best bid/ask update (requires `custom_feature_enabled`) + #[serde(rename = "best_bid_ask")] + BestBidAsk(BestBidAsk), + /// New market created (requires `custom_feature_enabled`) + #[serde(rename = "new_market")] + NewMarket(NewMarket), + /// Market resolved (requires `custom_feature_enabled`) + #[serde(rename = "market_resolved")] + MarketResolved(MarketResolved), + /// User trade execution (authenticated channel) #[serde(rename = "trade")] - Trade { data: FillEvent }, - #[serde(rename = "order_update")] - OrderUpdate { data: Order }, - #[serde(rename = "heartbeat")] - Heartbeat { timestamp: DateTime }, - /// User channel events - #[serde(rename = "user_order_update")] - UserOrderUpdate { data: Order }, - #[serde(rename = "user_trade")] - UserTrade { data: FillEvent }, - /// Market channel events - #[serde(rename = "market_book_update")] - MarketBookUpdate { data: OrderDelta }, - #[serde(rename = "market_trade")] - MarketTrade { data: FillEvent }, + Trade(TradeMessage), + /// User order update (authenticated channel) + #[serde(rename = "order")] + Order(OrderMessage), + /// Forward-compatible catch-all for new/unknown event types. + #[serde(other)] + Unknown, +} + +/// Orderbook update message (full snapshot or delta). +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct BookUpdate { + pub asset_id: String, + pub market: String, + #[serde(deserialize_with = "crate::decode::deserializers::number_from_string")] + pub timestamp: u64, + #[serde(default, deserialize_with = "crate::decode::deserializers::vec_from_null")] + pub bids: Vec, + #[serde(default, deserialize_with = "crate::decode::deserializers::vec_from_null")] + pub asks: Vec, + #[serde(default)] + pub hash: Option, +} + +/// Unified wire format for `price_change` events. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct PriceChange { + pub market: String, + #[serde(deserialize_with = "crate::decode::deserializers::number_from_string")] + pub timestamp: u64, + #[serde(default, deserialize_with = "crate::decode::deserializers::vec_from_null")] + pub price_changes: Vec, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct PriceChangeEntry { + pub asset_id: String, + pub price: Decimal, + #[serde( + default, + deserialize_with = "crate::decode::deserializers::optional_decimal_from_string" + )] + pub size: Option, + pub side: Side, + #[serde(default)] + pub hash: Option, + #[serde( + default, + deserialize_with = "crate::decode::deserializers::optional_decimal_from_string" + )] + pub best_bid: Option, + #[serde( + default, + deserialize_with = "crate::decode::deserializers::optional_decimal_from_string" + )] + pub best_ask: Option, +} + +/// Tick size change event. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct TickSizeChange { + pub asset_id: String, + pub market: String, + pub old_tick_size: Decimal, + pub new_tick_size: Decimal, + #[serde(deserialize_with = "crate::decode::deserializers::number_from_string")] + pub timestamp: u64, +} + +/// Last trade price update. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct LastTradePrice { + pub asset_id: String, + pub market: String, + pub price: Decimal, + #[serde(default)] + pub side: Option, + #[serde( + default, + deserialize_with = "crate::decode::deserializers::optional_decimal_from_string" + )] + pub size: Option, + #[serde( + default, + deserialize_with = "crate::decode::deserializers::optional_decimal_from_string" + )] + pub fee_rate_bps: Option, + #[serde(deserialize_with = "crate::decode::deserializers::number_from_string")] + pub timestamp: u64, +} + +/// Best bid/ask update. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct BestBidAsk { + pub market: String, + pub asset_id: String, + pub best_bid: Decimal, + pub best_ask: Decimal, + pub spread: Decimal, + #[serde(deserialize_with = "crate::decode::deserializers::number_from_string")] + pub timestamp: u64, +} + +/// New market created event. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct NewMarket { + pub id: String, + pub question: String, + pub market: String, + pub slug: String, + pub description: String, + #[serde(rename = "assets_ids", alias = "asset_ids")] + pub asset_ids: Vec, + #[serde(default, deserialize_with = "crate::decode::deserializers::vec_from_null")] + pub outcomes: Vec, + #[serde(default)] + pub event_message: Option, + #[serde(deserialize_with = "crate::decode::deserializers::number_from_string")] + pub timestamp: u64, +} + +/// Market resolved event. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct MarketResolved { + pub id: String, + #[serde(default)] + pub question: Option, + pub market: String, + #[serde(default)] + pub slug: Option, + #[serde(default)] + pub description: Option, + #[serde(rename = "assets_ids", alias = "asset_ids")] + pub asset_ids: Vec, + #[serde(default, deserialize_with = "crate::decode::deserializers::vec_from_null")] + pub outcomes: Vec, + pub winning_asset_id: String, + pub winning_outcome: String, + #[serde(default)] + pub event_message: Option, + #[serde(deserialize_with = "crate::decode::deserializers::number_from_string")] + pub timestamp: u64, +} + +/// Event message object for market events. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct EventMessage { + pub id: String, + pub ticker: String, + pub slug: String, + pub title: String, + pub description: String, +} + +/// User trade execution message. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct TradeMessage { + pub id: String, + pub market: String, + pub asset_id: String, + pub side: Side, + pub size: Decimal, + pub price: Decimal, + #[serde(default)] + pub status: Option, + #[serde(rename = "type", default)] + pub msg_type: Option, + #[serde( + default, + deserialize_with = "crate::decode::deserializers::optional_number_from_string" + )] + pub last_update: Option, + #[serde( + default, + alias = "match_time", + deserialize_with = "crate::decode::deserializers::optional_number_from_string" + )] + pub matchtime: Option, + #[serde( + default, + deserialize_with = "crate::decode::deserializers::optional_number_from_string" + )] + pub timestamp: Option, +} + +/// User order update message. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct OrderMessage { + pub id: String, + pub market: String, + pub asset_id: String, + pub side: Side, + pub price: Decimal, + #[serde(rename = "type", default)] + pub msg_type: Option, + #[serde( + default, + deserialize_with = "crate::decode::deserializers::optional_decimal_from_string" + )] + pub original_size: Option, + #[serde( + default, + deserialize_with = "crate::decode::deserializers::optional_decimal_from_string" + )] + pub size_matched: Option, + #[serde( + default, + deserialize_with = "crate::decode::deserializers::optional_number_from_string" + )] + pub timestamp: Option, + #[serde(default)] + pub associate_trades: Option>, + #[serde(default)] + pub status: Option, } /// Subscription parameters for streaming From da51caa25154ecdd19c0fa5132d38371a0b85227 Mon Sep 17 00:00:00 2001 From: floor-licker Date: Thu, 29 Jan 2026 18:47:19 -0500 Subject: [PATCH 05/32] chore(cargo): use crates.io polymarket-rs-client --- Cargo.lock | 2 ++ Cargo.toml | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index 711dfa4..c62fd4d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2709,6 +2709,8 @@ dependencies = [ [[package]] name = "polymarket-rs-client" version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8c797798ba56ed3bda214f2612afbf26ef3ed7479085645fbdc3329a5b9d5123" dependencies = [ "alloy-primitives", "alloy-signer", diff --git a/Cargo.toml b/Cargo.toml index 85cb726..e204510 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -70,7 +70,7 @@ tokio-test = "0.4" mockito = "1.0" proptest = "1.0" env_logger = "0.10" -polymarket-rs-client = { path = "external/polymarket-rs-client" } +polymarket-rs-client = "0.1.1" [features] default = ["stream"] From 41db8cabcdcfa71956e75d3fdb127e05b26b9175 Mon Sep 17 00:00:00 2001 From: floor-licker Date: Thu, 29 Jan 2026 18:47:30 -0500 Subject: [PATCH 06/32] chore(fmt): cargo fmt --- examples/demo.rs | 6 +++--- src/client.rs | 5 ++++- src/decode.rs | 19 ++++++------------- src/stream.rs | 14 +++++++------- src/types.rs | 35 ++++++++++++++++++++++++++++------- 5 files changed, 48 insertions(+), 31 deletions(-) diff --git a/examples/demo.rs b/examples/demo.rs index d505e81..7fb6319 100644 --- a/examples/demo.rs +++ b/examples/demo.rs @@ -660,7 +660,7 @@ impl PolyfillDemo { sequence: book.timestamp, }); } - } + }, StreamMessage::Trade(trade) => { info!( " Processing trade: {} {} @ {}", @@ -668,10 +668,10 @@ impl PolyfillDemo { trade.size, trade.price ); - } + }, _ => { info!(" Unhandled message type"); - } + }, } } diff --git a/src/client.rs b/src/client.rs index 8e41290..6e1e70c 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1942,7 +1942,10 @@ mod tests { assert_eq!(book.min_order_size, Decimal::from_str("1").unwrap()); assert!(!book.neg_risk); assert_eq!(book.tick_size, Decimal::from_str("0.01").unwrap()); - assert_eq!(book.last_trade_price, Some(Decimal::from_str("0.755").unwrap())); + assert_eq!( + book.last_trade_price, + Some(Decimal::from_str("0.755").unwrap()) + ); } #[tokio::test(flavor = "multi_thread")] diff --git a/src/decode.rs b/src/decode.rs index 18c2715..5a7285f 100644 --- a/src/decode.rs +++ b/src/decode.rs @@ -141,7 +141,7 @@ pub mod deserializers { .map(Some) .map_err(serde::de::Error::custom) } - } + }, serde_json::Value::Number(n) => Decimal::from_str(&n.to_string()) .map(Some) .map_err(serde::de::Error::custom), @@ -168,7 +168,7 @@ pub mod deserializers { } else { Ok(s.parse::().ok()) } - } + }, serde_json::Value::Number(n) => Ok(Decimal::from_str(&n.to_string()).ok()), _ => Ok(None), } @@ -457,9 +457,9 @@ pub fn parse_stream_messages_bytes(bytes: &[u8]) -> Result> { StreamMessage::Unknown => Ok(vec![]), other => Ok(vec![other]), } - } + }, } - } + }, Value::Array(arr) => Ok(arr .into_iter() .filter_map(|elem| { @@ -467,15 +467,8 @@ pub fn parse_stream_messages_bytes(bytes: &[u8]) -> Result> { let event_type = obj.get("event_type").and_then(Value::as_str)?; // Skip unknown event types early (forward compatibility). match event_type { - "book" - | "price_change" - | "tick_size_change" - | "last_trade_price" - | "best_bid_ask" - | "new_market" - | "market_resolved" - | "trade" - | "order" => {} + "book" | "price_change" | "tick_size_change" | "last_trade_price" + | "best_bid_ask" | "new_market" | "market_resolved" | "trade" | "order" => {}, _ => return None, } diff --git a/src/stream.rs b/src/stream.rs index dbca228..87192f2 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -393,22 +393,22 @@ impl Stream for WebSocketStream { self.stats.messages_received += 1; self.stats.last_message_time = Some(Utc::now()); continue; - } + }, Err(e) => { self.stats.errors += 1; return Poll::Ready(Some(Err(e))); - } + }, } - } + }, tokio_tungstenite::tungstenite::Message::Close(_) => { info!("WebSocket connection closed by server"); self.connection = None; return Poll::Ready(None); - } + }, tokio_tungstenite::tungstenite::Message::Ping(_) => { // Best-effort: tokio-tungstenite/tungstenite may handle pings internally. continue; - } + }, tokio_tungstenite::tungstenite::Message::Pong(_) => continue, tokio_tungstenite::tungstenite::Message::Binary(_) => continue, tokio_tungstenite::tungstenite::Message::Frame(_) => continue, @@ -417,11 +417,11 @@ impl Stream for WebSocketStream { error!("WebSocket error: {}", e); self.stats.errors += 1; return Poll::Ready(Some(Err(e.into()))); - } + }, Poll::Ready(None) => { info!("WebSocket stream ended"); return Poll::Ready(None); - } + }, } } } diff --git a/src/types.rs b/src/types.rs index bd6ed89..0c47a4e 100644 --- a/src/types.rs +++ b/src/types.rs @@ -735,9 +735,15 @@ pub struct BookUpdate { pub market: String, #[serde(deserialize_with = "crate::decode::deserializers::number_from_string")] pub timestamp: u64, - #[serde(default, deserialize_with = "crate::decode::deserializers::vec_from_null")] + #[serde( + default, + deserialize_with = "crate::decode::deserializers::vec_from_null" + )] pub bids: Vec, - #[serde(default, deserialize_with = "crate::decode::deserializers::vec_from_null")] + #[serde( + default, + deserialize_with = "crate::decode::deserializers::vec_from_null" + )] pub asks: Vec, #[serde(default)] pub hash: Option, @@ -749,7 +755,10 @@ pub struct PriceChange { pub market: String, #[serde(deserialize_with = "crate::decode::deserializers::number_from_string")] pub timestamp: u64, - #[serde(default, deserialize_with = "crate::decode::deserializers::vec_from_null")] + #[serde( + default, + deserialize_with = "crate::decode::deserializers::vec_from_null" + )] pub price_changes: Vec, } @@ -832,7 +841,10 @@ pub struct NewMarket { pub description: String, #[serde(rename = "assets_ids", alias = "asset_ids")] pub asset_ids: Vec, - #[serde(default, deserialize_with = "crate::decode::deserializers::vec_from_null")] + #[serde( + default, + deserialize_with = "crate::decode::deserializers::vec_from_null" + )] pub outcomes: Vec, #[serde(default)] pub event_message: Option, @@ -853,7 +865,10 @@ pub struct MarketResolved { pub description: Option, #[serde(rename = "assets_ids", alias = "asset_ids")] pub asset_ids: Vec, - #[serde(default, deserialize_with = "crate::decode::deserializers::vec_from_null")] + #[serde( + default, + deserialize_with = "crate::decode::deserializers::vec_from_null" + )] pub outcomes: Vec, pub winning_asset_id: String, pub winning_outcome: String, @@ -1240,9 +1255,15 @@ pub struct OrderBookSummary { pub hash: Option, #[serde(deserialize_with = "crate::decode::deserializers::number_from_string")] pub timestamp: u64, - #[serde(default, deserialize_with = "crate::decode::deserializers::vec_from_null")] + #[serde( + default, + deserialize_with = "crate::decode::deserializers::vec_from_null" + )] pub bids: Vec, - #[serde(default, deserialize_with = "crate::decode::deserializers::vec_from_null")] + #[serde( + default, + deserialize_with = "crate::decode::deserializers::vec_from_null" + )] pub asks: Vec, pub min_order_size: Decimal, pub neg_risk: bool, From cc419acaf3ab1199bfd70c6f559522c60d8ff86c Mon Sep 17 00:00:00 2001 From: floor-licker Date: Thu, 29 Jan 2026 19:07:11 -0500 Subject: [PATCH 07/32] chore(rustfmt): keep config stable-compatible --- rustfmt.toml | 34 +++------------------------------- 1 file changed, 3 insertions(+), 31 deletions(-) diff --git a/rustfmt.toml b/rustfmt.toml index 006a158..c8efc30 100644 --- a/rustfmt.toml +++ b/rustfmt.toml @@ -1,5 +1,8 @@ # Rustfmt configuration for polyfill-rs # Optimized for readability and consistency +# +# Note: This file is kept compatible with stable `rustfmt` so that local dev and +# CI (`cargo fmt --all -- --check`) behave consistently. # Basic formatting edition = "2021" @@ -7,52 +10,21 @@ max_width = 100 tab_spaces = 4 newline_style = "Unix" -# Indentation -indent_style = "Block" merge_derives = true use_small_heuristics = "Default" -# Spacing -spaces_around_ranges = false -binop_separator = "Front" remove_nested_parens = true -format_code_in_doc_comments = true -# Imports -imports_granularity = "Module" -group_imports = "StdExternalCrate" reorder_imports = true -# Comments -wrap_comments = true -comment_width = 80 - -# Match -match_arm_leading_commas = true match_block_trailing_comma = true -# Control flow -control_brace_style = "AlwaysSameLine" - # Functions fn_call_width = 60 fn_params_layout = "Tall" -# Structs and enums -struct_field_align_threshold = 0 -enum_discrim_align_threshold = 0 - # Arrays and tuples array_width = 60 -tuple_width = 60 # Chains chain_width = 60 -chain_split_single_child = true - -# Other -format_macro_matchers = true -format_macro_bodies = true -format_strings = true -overflow_delimited_expr = true -normalize_doc_attributes = true \ No newline at end of file From dea50163fe457ec58167cafa193979f502e116e4 Mon Sep 17 00:00:00 2001 From: floor-licker Date: Fri, 30 Jan 2026 18:01:44 -0500 Subject: [PATCH 08/32] test(no-alloc): guard hot paths against heap allocs --- tests/no_alloc_hot_paths.rs | 129 ++++++++++++++++++++++++++++++++++++ 1 file changed, 129 insertions(+) create mode 100644 tests/no_alloc_hot_paths.rs diff --git a/tests/no_alloc_hot_paths.rs b/tests/no_alloc_hot_paths.rs new file mode 100644 index 0000000..441b1f5 --- /dev/null +++ b/tests/no_alloc_hot_paths.rs @@ -0,0 +1,129 @@ +use std::alloc::{GlobalAlloc, Layout, System}; +use std::cell::Cell; +use std::collections::hash_map::DefaultHasher; +use std::hash::{Hash, Hasher}; + +use chrono::Utc; +use polyfill_rs::{OrderBookImpl, Side}; + +thread_local! { + static ALLOCATIONS: Cell = const { Cell::new(0) }; +} + +struct CountingAllocator; + +unsafe impl GlobalAlloc for CountingAllocator { + unsafe fn alloc(&self, layout: Layout) -> *mut u8 { + ALLOCATIONS.with(|count| count.set(count.get() + 1)); + System.alloc(layout) + } + + unsafe fn alloc_zeroed(&self, layout: Layout) -> *mut u8 { + ALLOCATIONS.with(|count| count.set(count.get() + 1)); + System.alloc_zeroed(layout) + } + + unsafe fn realloc(&self, ptr: *mut u8, layout: Layout, new_size: usize) -> *mut u8 { + ALLOCATIONS.with(|count| count.set(count.get() + 1)); + System.realloc(ptr, layout, new_size) + } + + unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) { + System.dealloc(ptr, layout) + } +} + +#[global_allocator] +static GLOBAL: CountingAllocator = CountingAllocator; + +fn allocation_count() -> usize { + ALLOCATIONS.with(|count| count.get()) +} + +struct NoAllocGuard { + before: usize, +} + +impl NoAllocGuard { + fn new() -> Self { + Self { + before: allocation_count(), + } + } + + fn assert_no_allocations(self) { + let after = allocation_count(); + assert_eq!( + after, + self.before, + "expected no heap allocations, but saw {} allocation(s)", + after - self.before + ); + } +} + +fn token_id_hash(token_id: &str) -> u64 { + let mut hasher = DefaultHasher::new(); + token_id.hash(&mut hasher); + hasher.finish() +} + +fn mk_delta( + token_id_hash: u64, + side: Side, + price_ticks: polyfill_rs::types::Price, + size_units: polyfill_rs::types::Qty, + sequence: u64, +) -> polyfill_rs::types::FastOrderDelta { + polyfill_rs::types::FastOrderDelta { + token_id_hash, + timestamp: chrono::DateTime::::from_timestamp(0, 0).unwrap(), + side, + price: price_ticks, + size: size_units, + sequence, + } +} + +#[test] +fn no_alloc_mid_and_spread_fast() { + let token_id = "test_token"; + let token_hash = token_id_hash(token_id); + let mut book = OrderBookImpl::new(token_id.to_string(), 100); + + // Allocate during setup: create initial price levels. + book.apply_delta_fast(mk_delta(token_hash, Side::BUY, 7500, 1_000_000, 1)) + .unwrap(); + book.apply_delta_fast(mk_delta(token_hash, Side::SELL, 7600, 1_000_000, 2)) + .unwrap(); + + // Warm up TLS access before measuring (defensive). + let _ = allocation_count(); + + let guard = NoAllocGuard::new(); + assert!(book.best_bid_fast().is_some()); + assert!(book.best_ask_fast().is_some()); + assert!(book.spread_fast().is_some()); + assert!(book.mid_price_fast().is_some()); + guard.assert_no_allocations(); +} + +#[test] +fn no_alloc_apply_delta_fast_existing_level_update() { + let token_id = "test_token"; + let token_hash = token_id_hash(token_id); + let mut book = OrderBookImpl::new(token_id.to_string(), 100); + + // Allocate during setup: create an initial level. + book.apply_delta_fast(mk_delta(token_hash, Side::BUY, 7500, 1_000_000, 1)) + .unwrap(); + + // Warm up TLS access before measuring (defensive). + let _ = allocation_count(); + + let guard = NoAllocGuard::new(); + // Updating an existing level should not require heap allocation. + book.apply_delta_fast(mk_delta(token_hash, Side::BUY, 7500, 2_000_000, 2)) + .unwrap(); + guard.assert_no_allocations(); +} From 9594058fd5ee07f72f0b56b6db21a56ee5057eaf Mon Sep 17 00:00:00 2001 From: floor-licker Date: Fri, 30 Jan 2026 18:17:45 -0500 Subject: [PATCH 09/32] feat(book): apply websocket book updates --- src/book.rs | 69 +++++++++++++++++++++++++++++++++++++ tests/no_alloc_hot_paths.rs | 37 ++++++++++++++++++++ 2 files changed, 106 insertions(+) diff --git a/src/book.rs b/src/book.rs index 244a97d..3de5c6c 100644 --- a/src/book.rs +++ b/src/book.rs @@ -396,6 +396,75 @@ impl OrderBook { Ok(()) } + /// Apply a WebSocket `book` update for this token. + /// + /// The official Polymarket CLOB WebSocket `book` event contains batches of + /// price levels for both sides. Unlike `apply_delta_fast`, this method can + /// apply many levels that share the same message timestamp. + /// + /// Notes: + /// - This performs upserts (update/insert/remove) for the provided levels. + /// - It does **not** infer removals for levels omitted from the message. + /// - Insertions of *new* price levels may allocate (BTreeMap node growth). + pub fn apply_book_update(&mut self, update: &BookUpdate) -> Result<()> { + if update.asset_id != self.token_id { + return Err(PolyfillError::validation("Token ID mismatch")); + } + + // Use the exchange-provided timestamp as our monotonic sequence marker. + // This is less strict than the REST/legacy delta sequence but works for + // ignoring obviously stale book snapshots. + if update.timestamp <= self.sequence { + return Ok(()); + } + + self.sequence = update.timestamp; + self.timestamp = chrono::DateTime::::from_timestamp(update.timestamp as i64, 0) + .unwrap_or_else(Utc::now); + + // Apply bids (BUY) and asks (SELL) as level upserts. + for level in &update.bids { + let price_ticks = decimal_to_price(level.price) + .map_err(|_| PolyfillError::validation("Invalid price"))?; + let size_units = decimal_to_qty(level.size) + .map_err(|_| PolyfillError::validation("Invalid size"))?; + + if let Some(tick_size_ticks) = self.tick_size_ticks { + if tick_size_ticks > 0 && !price_ticks.is_multiple_of(tick_size_ticks) { + return Err(PolyfillError::validation("Price not aligned to tick size")); + } + } + + if size_units == 0 { + self.bids.remove(&price_ticks); + } else { + self.bids.insert(price_ticks, size_units); + } + } + + for level in &update.asks { + let price_ticks = decimal_to_price(level.price) + .map_err(|_| PolyfillError::validation("Invalid price"))?; + let size_units = decimal_to_qty(level.size) + .map_err(|_| PolyfillError::validation("Invalid size"))?; + + if let Some(tick_size_ticks) = self.tick_size_ticks { + if tick_size_ticks > 0 && !price_ticks.is_multiple_of(tick_size_ticks) { + return Err(PolyfillError::validation("Price not aligned to tick size")); + } + } + + if size_units == 0 { + self.asks.remove(&price_ticks); + } else { + self.asks.insert(price_ticks, size_units); + } + } + + self.trim_depth(); + Ok(()) + } + /// Apply a bid-side delta (someone wants to buy) - LEGACY VERSION /// If size is 0, it means "remove this price level entirely" /// Otherwise, set the total size at this price level diff --git a/tests/no_alloc_hot_paths.rs b/tests/no_alloc_hot_paths.rs index 441b1f5..6e7465e 100644 --- a/tests/no_alloc_hot_paths.rs +++ b/tests/no_alloc_hot_paths.rs @@ -2,9 +2,11 @@ use std::alloc::{GlobalAlloc, Layout, System}; use std::cell::Cell; use std::collections::hash_map::DefaultHasher; use std::hash::{Hash, Hasher}; +use std::str::FromStr; use chrono::Utc; use polyfill_rs::{OrderBookImpl, Side}; +use rust_decimal::Decimal; thread_local! { static ALLOCATIONS: Cell = const { Cell::new(0) }; @@ -127,3 +129,38 @@ fn no_alloc_apply_delta_fast_existing_level_update() { .unwrap(); guard.assert_no_allocations(); } + +#[test] +fn no_alloc_apply_book_update_existing_levels() { + let asset_id = "test_asset_id"; + let token_hash = token_id_hash(asset_id); + let mut book = OrderBookImpl::new(asset_id.to_string(), 100); + + // Allocate during setup: create initial price levels. + book.apply_delta_fast(mk_delta(token_hash, Side::BUY, 7500, 1_000_000, 1)) + .unwrap(); + book.apply_delta_fast(mk_delta(token_hash, Side::SELL, 7600, 1_000_000, 2)) + .unwrap(); + + let update = polyfill_rs::types::BookUpdate { + asset_id: asset_id.to_string(), + market: "0xabc".to_string(), + timestamp: 10, + bids: vec![polyfill_rs::types::OrderSummary { + price: Decimal::from_str("0.75").unwrap(), + size: Decimal::from_str("200.0").unwrap(), + }], + asks: vec![polyfill_rs::types::OrderSummary { + price: Decimal::from_str("0.76").unwrap(), + size: Decimal::from_str("50.0").unwrap(), + }], + hash: None, + }; + + // Warm up TLS access before measuring (defensive). + let _ = allocation_count(); + + let guard = NoAllocGuard::new(); + book.apply_book_update(&update).unwrap(); + guard.assert_no_allocations(); +} From 75ce2614451b77929630a8ed7218af17745b7f0e Mon Sep 17 00:00:00 2001 From: floor-licker Date: Fri, 30 Jan 2026 18:50:28 -0500 Subject: [PATCH 10/32] perf(ws): avoid cloning batch stream messages --- src/decode.rs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/decode.rs b/src/decode.rs index 5a7285f..76c55e1 100644 --- a/src/decode.rs +++ b/src/decode.rs @@ -463,8 +463,11 @@ pub fn parse_stream_messages_bytes(bytes: &[u8]) -> Result> { Value::Array(arr) => Ok(arr .into_iter() .filter_map(|elem| { - let obj = elem.as_object()?; - let event_type = obj.get("event_type").and_then(Value::as_str)?; + let Value::Object(map) = elem else { + return None; + }; + + let event_type = map.get("event_type").and_then(Value::as_str)?; // Skip unknown event types early (forward compatibility). match event_type { "book" | "price_change" | "tick_size_change" | "last_trade_price" @@ -472,7 +475,7 @@ pub fn parse_stream_messages_bytes(bytes: &[u8]) -> Result> { _ => return None, } - match serde_json::from_value::(Value::Object(obj.clone())) { + match serde_json::from_value::(Value::Object(map)) { Ok(StreamMessage::Unknown) => None, Ok(msg) => Some(msg), Err(_) => None, From a0bf317d978ac368a4ef8ca792064470b6153bd5 Mon Sep 17 00:00:00 2001 From: floor-licker Date: Fri, 30 Jan 2026 18:50:45 -0500 Subject: [PATCH 11/32] refactor(ws): replace unbounded channel with bounded queue --- src/stream.rs | 46 +++++++++++++++++++++++++++++++--------------- 1 file changed, 31 insertions(+), 15 deletions(-) diff --git a/src/stream.rs b/src/stream.rs index 87192f2..0ba3b9a 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -8,6 +8,7 @@ use crate::types::*; use chrono::Utc; use futures::{SinkExt, Stream, StreamExt}; use serde_json::Value; +use std::collections::VecDeque; use std::pin::Pin; use std::task::{Context, Poll}; use tokio::sync::mpsc; @@ -44,10 +45,12 @@ pub struct WebSocketStream { auth: Option, /// Current subscriptions subscriptions: Vec, - /// Message sender for internal communication - tx: mpsc::UnboundedSender, - /// Message receiver - rx: mpsc::UnboundedReceiver, + /// Parsed messages awaiting delivery to the caller. + /// + /// This replaces an internal unbounded channel to avoid per-message + /// allocations in the buffering layer and to enforce a bounded backlog. + pending: VecDeque, + pending_capacity: usize, /// Connection statistics stats: StreamStats, /// Reconnection configuration @@ -60,6 +63,7 @@ pub struct StreamStats { pub messages_received: u64, pub messages_sent: u64, pub errors: u64, + pub dropped_messages: u64, pub last_message_time: Option>, pub connection_uptime: std::time::Duration, pub reconnect_count: u32, @@ -88,19 +92,20 @@ impl Default for ReconnectConfig { impl WebSocketStream { /// Create a new WebSocket stream pub fn new(url: &str) -> Self { - let (tx, rx) = mpsc::unbounded_channel(); + let pending_capacity = 1024; Self { connection: None, url: url.to_string(), auth: None, subscriptions: Vec::new(), - tx, - rx, + pending: VecDeque::with_capacity(pending_capacity), + pending_capacity, stats: StreamStats { messages_received: 0, messages_sent: 0, errors: 0, + dropped_messages: 0, last_message_time: None, connection_uptime: std::time::Duration::ZERO, reconnect_count: 0, @@ -109,6 +114,14 @@ impl WebSocketStream { } } + fn enqueue(&mut self, message: StreamMessage) { + if self.pending.len() >= self.pending_capacity { + let _ = self.pending.pop_front(); + self.stats.dropped_messages += 1; + } + self.pending.push_back(message); + } + /// Set authentication credentials pub fn with_auth(mut self, auth: WssAuth) -> Self { self.auth = Some(auth); @@ -277,9 +290,7 @@ impl WebSocketStream { // Parse the message according to Polymarket's `event_type` format let stream_messages = crate::decode::parse_stream_messages(&text)?; for stream_message in stream_messages { - if let Err(e) = self.tx.send(stream_message) { - error!("Failed to send message to internal channel: {}", e); - } + self.enqueue(stream_message); } self.stats.messages_received += 1; @@ -372,8 +383,7 @@ impl Stream for WebSocketStream { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { loop { - // First drain any parsed messages - if let Poll::Ready(Some(message)) = self.rx.poll_recv(cx) { + if let Some(message) = self.pending.pop_front() { return Poll::Ready(Some(Ok(message))); } @@ -387,12 +397,17 @@ impl Stream for WebSocketStream { tokio_tungstenite::tungstenite::Message::Text(text) => { match crate::decode::parse_stream_messages(&text) { Ok(messages) => { - for msg in messages { - let _ = self.tx.send(msg); + let mut iter = messages.into_iter(); + let Some(first) = iter.next() else { + continue; + }; + + for msg in iter { + self.enqueue(msg); } self.stats.messages_received += 1; self.stats.last_message_time = Some(Utc::now()); - continue; + return Poll::Ready(Some(Ok(first))); }, Err(e) => { self.stats.errors += 1; @@ -515,6 +530,7 @@ impl MarketStream for MockStream { messages_received: self.messages.len() as u64, messages_sent: 0, errors: self.messages.iter().filter(|m| m.is_err()).count() as u64, + dropped_messages: 0, last_message_time: None, connection_uptime: std::time::Duration::ZERO, reconnect_count: 0, From 0e71182ec55381d4bda403411bc12de1e7d447da Mon Sep 17 00:00:00 2001 From: floor-licker Date: Fri, 30 Jan 2026 18:50:52 -0500 Subject: [PATCH 12/32] feat(book): apply ws book updates via OrderBookManager --- src/book.rs | 24 +++++++++++++++++ tests/no_alloc_hot_paths.rs | 53 ++++++++++++++++++++++++++++++++++++- 2 files changed, 76 insertions(+), 1 deletion(-) diff --git a/src/book.rs b/src/book.rs index 3de5c6c..d04a568 100644 --- a/src/book.rs +++ b/src/book.rs @@ -781,6 +781,30 @@ impl OrderBookManager { book.apply_delta(delta) } + /// Apply a WebSocket `book` update to a managed book. + /// + /// This is the preferred way to ingest `StreamMessage::Book` updates into + /// the in-memory order books (avoids rebuilding snapshots via per-level deltas). + pub fn apply_book_update(&self, update: &BookUpdate) -> Result<()> { + let mut books = self + .books + .write() + .map_err(|_| PolyfillError::internal_simple("Failed to acquire book lock"))?; + + if let Some(book) = books.get_mut(update.asset_id.as_str()) { + return book.apply_book_update(update); + } + + // First time we've seen this token; allocating the key and book is part of warmup. + let token_id = update.asset_id.clone(); + books.insert(token_id.clone(), OrderBook::new(token_id, self.max_depth)); + + books + .get_mut(update.asset_id.as_str()) + .ok_or_else(|| PolyfillError::internal_simple("Failed to insert order book"))? + .apply_book_update(update) + } + /// Get a book snapshot /// Returns a copy of the current book state that won't change pub fn get_book(&self, token_id: &str) -> Result { diff --git a/tests/no_alloc_hot_paths.rs b/tests/no_alloc_hot_paths.rs index 6e7465e..51b3bc9 100644 --- a/tests/no_alloc_hot_paths.rs +++ b/tests/no_alloc_hot_paths.rs @@ -5,7 +5,7 @@ use std::hash::{Hash, Hasher}; use std::str::FromStr; use chrono::Utc; -use polyfill_rs::{OrderBookImpl, Side}; +use polyfill_rs::{book::OrderBookManager, OrderBookImpl, Side}; use rust_decimal::Decimal; thread_local! { @@ -164,3 +164,54 @@ fn no_alloc_apply_book_update_existing_levels() { book.apply_book_update(&update).unwrap(); guard.assert_no_allocations(); } + +#[test] +fn no_alloc_book_manager_apply_book_update_existing_levels() { + let asset_id = "test_asset_id"; + let manager = OrderBookManager::new(100); + manager.get_or_create_book(asset_id).unwrap(); + + // Warm up the internal book with initial levels (allocations allowed). + manager + .apply_delta(polyfill_rs::types::OrderDelta { + token_id: asset_id.to_string(), + timestamp: chrono::Utc::now(), + side: Side::BUY, + price: Decimal::from_str("0.75").unwrap(), + size: Decimal::from_str("100.0").unwrap(), + sequence: 1, + }) + .unwrap(); + manager + .apply_delta(polyfill_rs::types::OrderDelta { + token_id: asset_id.to_string(), + timestamp: chrono::Utc::now(), + side: Side::SELL, + price: Decimal::from_str("0.76").unwrap(), + size: Decimal::from_str("100.0").unwrap(), + sequence: 2, + }) + .unwrap(); + + let update = polyfill_rs::types::BookUpdate { + asset_id: asset_id.to_string(), + market: "0xabc".to_string(), + timestamp: 10, + bids: vec![polyfill_rs::types::OrderSummary { + price: Decimal::from_str("0.75").unwrap(), + size: Decimal::from_str("200.0").unwrap(), + }], + asks: vec![polyfill_rs::types::OrderSummary { + price: Decimal::from_str("0.76").unwrap(), + size: Decimal::from_str("50.0").unwrap(), + }], + hash: None, + }; + + // Warm up TLS access before measuring (defensive). + let _ = allocation_count(); + + let guard = NoAllocGuard::new(); + manager.apply_book_update(&update).unwrap(); + guard.assert_no_allocations(); +} From 7145f74805471b64af1b2317d4edcd8694e05da6 Mon Sep 17 00:00:00 2001 From: floor-licker Date: Fri, 30 Jan 2026 19:01:46 -0500 Subject: [PATCH 13/32] docs: update README.md --- README.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/README.md b/README.md index f647fbf..37be757 100644 --- a/README.md +++ b/README.md @@ -6,6 +6,10 @@ A high-performance drop-in replacement for `polymarket-rs-client` with latency-optimized data structures and zero-allocation hot paths. A 100% API-compatible drop-in replacement for `polymarket-rs-client` with identical method signatures. +At the time that this project was started, `polymarket-rs-client` was a Polymarket Rust Client with a few GitHub stars, but which seemed to be unmaintained. I took on the task of creating a Rust client which could beat the benchmarks quoted in the README.md of that project, while also maintaining zero alloc hot paths. + +I also want to take a moment to clarify what zero-alloc means because I've now recieved double digit messages about this on twitter/x and telegram. In general, zero alloc means either zero alloc in hot paths (which can be a bit more arbitrary) or atlernatively it can mean zero alloc ater init/warm-up, which is the objective of this repository. Succinctly that means that **the per-message handling loop never touches the heap**. + ## Quick Start From c1b763309b2385e6e79f1b8c792464c961f22545 Mon Sep 17 00:00:00 2001 From: floor-licker Date: Fri, 30 Jan 2026 19:20:32 -0500 Subject: [PATCH 14/32] chore(security): update lockfile for RustSec advisories --- Cargo.lock | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c62fd4d..0979727 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1399,7 +1399,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb" dependencies = [ "libc", - "windows-sys 0.61.2", + "windows-sys 0.52.0", ] [[package]] @@ -2140,7 +2140,7 @@ checksum = "3640c1c38b8e4e43584d8df18be5fc6b0aa314ce6ebf51b53313d4306cca8e46" dependencies = [ "hermit-abi", "libc", - "windows-sys 0.61.2", + "windows-sys 0.52.0", ] [[package]] @@ -2427,7 +2427,7 @@ version = "0.50.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7957b9740744892f114936ab4a57b3f487491bbeafaf8083688b16841a4240e5" dependencies = [ - "windows-sys 0.61.2", + "windows-sys 0.59.0", ] [[package]] @@ -3096,9 +3096,9 @@ dependencies = [ [[package]] name = "rkyv" -version = "0.7.45" +version = "0.7.46" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9008cd6385b9e161d8229e1f6549dd23c3d022f132a2ea37ac3a10ac4935779b" +checksum = "2297bf9c81a3f0dc96bc9521370b88f054168c29826a75e89c55ff196e7ed6a1" dependencies = [ "bitvec", "bytecheck", @@ -3114,9 +3114,9 @@ dependencies = [ [[package]] name = "rkyv_derive" -version = "0.7.45" +version = "0.7.46" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "503d1d27590a2b0a3a4ca4c94755aa2875657196ecbf401a42eff41d7de532c0" +checksum = "84d7b42d4b8d06048d3ac8db0eb31bcb942cbeb709f0b5f2b2ebde398d3038f5" dependencies = [ "proc-macro2", "quote", @@ -3135,9 +3135,9 @@ dependencies = [ [[package]] name = "ruint" -version = "1.17.0" +version = "1.17.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a68df0380e5c9d20ce49534f292a36a7514ae21350726efe1865bdb1fa91d278" +checksum = "7f5befb5191be3584a4edaf63435e8ff92ffff622e711ca7e77f8f8f365a9df8" dependencies = [ "alloy-rlp", "ark-ff 0.3.0", @@ -3233,7 +3233,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys", - "windows-sys 0.61.2", + "windows-sys 0.52.0", ] [[package]] @@ -3727,7 +3727,7 @@ dependencies = [ "getrandom 0.3.4", "once_cell", "rustix", - "windows-sys 0.61.2", + "windows-sys 0.52.0", ] [[package]] @@ -4400,7 +4400,7 @@ version = "0.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22" dependencies = [ - "windows-sys 0.61.2", + "windows-sys 0.48.0", ] [[package]] From 3b0765fbec6884d6c8e84ba425c6e030c30137bd Mon Sep 17 00:00:00 2001 From: floor-licker Date: Fri, 30 Jan 2026 19:35:03 -0500 Subject: [PATCH 15/32] ci: run no-alloc hot-path tests as separate job --- .github/workflows/ci.yml | 26 +++++++++++++++++++++++--- 1 file changed, 23 insertions(+), 3 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 9b389d1..a8b87fe 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -35,8 +35,8 @@ jobs: - name: Run clippy run: cargo clippy --lib --bins --tests --benches --all-features -- -D warnings && cargo clippy --example benchmark_with_keepalive --example demo --example final_benchmark --example http2_tuning_benchmark --example performance_benchmark --example quick_demo --example snipe -- -D warnings - - name: Run tests - run: cargo test --all-features + - name: Run tests (excluding no-alloc hot paths) + run: cargo test --all-features -- --skip no_alloc_ - name: Build examples run: | @@ -64,4 +64,24 @@ jobs: run: cargo install cargo-audit - name: Run security audit - run: cargo audit \ No newline at end of file + run: cargo audit + + no_alloc: + name: No-alloc hot paths + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v4 + - uses: dtolnay/rust-toolchain@stable + + - name: Cache dependencies + uses: actions/cache@v4 + with: + path: | + ~/.cargo/registry + ~/.cargo/git + target + key: ${{ runner.os }}-cargo-${{ hashFiles('**/Cargo.lock') }} + + - name: Run no-alloc hot path tests + run: cargo test --all-features --test no_alloc_hot_paths From 9fa4174e42da5ae3f185f8c062f88970a7d906e0 Mon Sep 17 00:00:00 2001 From: floor-licker Date: Fri, 30 Jan 2026 19:48:30 -0500 Subject: [PATCH 16/32] docs: update README.md --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 37be757..02399ae 100644 --- a/README.md +++ b/README.md @@ -6,7 +6,7 @@ A high-performance drop-in replacement for `polymarket-rs-client` with latency-optimized data structures and zero-allocation hot paths. A 100% API-compatible drop-in replacement for `polymarket-rs-client` with identical method signatures. -At the time that this project was started, `polymarket-rs-client` was a Polymarket Rust Client with a few GitHub stars, but which seemed to be unmaintained. I took on the task of creating a Rust client which could beat the benchmarks quoted in the README.md of that project, while also maintaining zero alloc hot paths. +At the time that this project was started, `polymarket-rs-client` was a Polymarket Rust Client with a few GitHub stars, but which seemed to be unmaintained. I took on the task of creating a Rust client which could beat the benchmarks quoted in the README.md of that project, with the added constraint of also maintaining zero alloc hot paths. I also want to take a moment to clarify what zero-alloc means because I've now recieved double digit messages about this on twitter/x and telegram. In general, zero alloc means either zero alloc in hot paths (which can be a bit more arbitrary) or atlernatively it can mean zero alloc ater init/warm-up, which is the objective of this repository. Succinctly that means that **the per-message handling loop never touches the heap**. From b523a7fd44539ba41fe315fb5d1d25fae722c80c Mon Sep 17 00:00:00 2001 From: floor-licker Date: Fri, 30 Jan 2026 19:53:17 -0500 Subject: [PATCH 17/32] docs: update README.md --- README.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/README.md b/README.md index 02399ae..197c5e2 100644 --- a/README.md +++ b/README.md @@ -10,6 +10,10 @@ At the time that this project was started, `polymarket-rs-client` was a Polymark I also want to take a moment to clarify what zero-alloc means because I've now recieved double digit messages about this on twitter/x and telegram. In general, zero alloc means either zero alloc in hot paths (which can be a bit more arbitrary) or atlernatively it can mean zero alloc ater init/warm-up, which is the objective of this repository. Succinctly that means that **the per-message handling loop never touches the heap**. +Notably order book paths that introduce new allocations by design: +- First time seeing a token/book (HashMap insert + key clone): `src/book.rs:~788` +- New price levels (BTreeMap node growth): `src/book.rs:~409` + ## Quick Start From e0b97c53cb32ea47d4a054b2a0ac73003bbef875 Mon Sep 17 00:00:00 2001 From: floor-licker Date: Fri, 30 Jan 2026 20:22:45 -0500 Subject: [PATCH 18/32] feat(ws): add tape-based book update processor --- src/book.rs | 76 ++++++++++++++ src/lib.rs | 2 + src/ws_hot_path.rs | 195 ++++++++++++++++++++++++++++++++++++ tests/no_alloc_hot_paths.rs | 56 ++++++++++- 4 files changed, 328 insertions(+), 1 deletion(-) create mode 100644 src/ws_hot_path.rs diff --git a/src/book.rs b/src/book.rs index d04a568..37bdf35 100644 --- a/src/book.rs +++ b/src/book.rs @@ -396,6 +396,58 @@ impl OrderBook { Ok(()) } + /// Begin applying a WebSocket `book` update (hot-path oriented). + /// + /// This is intended for in-place WS processing where we *stream* levels out of a decoded + /// message, without constructing intermediate `BookUpdate` structs. + /// + /// Returns `Ok(true)` if the update should be applied, or `Ok(false)` if the update is stale + /// and should be skipped. + pub(crate) fn begin_ws_book_update(&mut self, asset_id: &str, timestamp: u64) -> Result { + if asset_id != self.token_id { + return Err(PolyfillError::validation("Token ID mismatch")); + } + + if timestamp <= self.sequence { + return Ok(false); + } + + self.sequence = timestamp; + self.timestamp = + chrono::DateTime::::from_timestamp(timestamp as i64, 0).unwrap_or_else(Utc::now); + + Ok(true) + } + + /// Apply a single WS `book` level (already converted to internal fixed-point). + /// + /// Note: Insertions of new price levels may allocate (BTreeMap node growth). In a strict + /// zero-alloc hot path, all expected levels must be warmed up ahead of time. + pub(crate) fn apply_ws_book_level_fast( + &mut self, + side: Side, + price_ticks: Price, + size_units: Qty, + ) -> Result<()> { + if let Some(tick_size_ticks) = self.tick_size_ticks { + if tick_size_ticks > 0 && !price_ticks.is_multiple_of(tick_size_ticks) { + return Err(PolyfillError::validation("Price not aligned to tick size")); + } + } + + match side { + Side::BUY => self.apply_bid_delta_fast(price_ticks, size_units), + Side::SELL => self.apply_ask_delta_fast(price_ticks, size_units), + } + + Ok(()) + } + + /// Finish applying a WS `book` update. + pub(crate) fn finish_ws_book_update(&mut self) { + self.trim_depth(); + } + /// Apply a WebSocket `book` update for this token. /// /// The official Polymarket CLOB WebSocket `book` event contains batches of @@ -761,6 +813,30 @@ impl OrderBookManager { } } + /// Execute a closure with mutable access to a managed book. + /// + /// This is useful for hot-path update ingestion where you want to avoid allocating + /// intermediate update structs (e.g., applying WS updates directly). + pub fn with_book_mut( + &self, + token_id: &str, + f: impl FnOnce(&mut OrderBook) -> Result, + ) -> Result { + let mut books = self + .books + .write() + .map_err(|_| PolyfillError::internal_simple("Failed to acquire book lock"))?; + + let book = books.get_mut(token_id).ok_or_else(|| { + PolyfillError::market_data( + format!("No book found for token: {}", token_id), + crate::errors::MarketDataErrorKind::TokenNotFound, + ) + })?; + + f(book) + } + /// Update a book with a delta /// This is called when we receive real-time updates from the exchange pub fn apply_delta(&self, delta: OrderDelta) -> Result<()> { diff --git a/src/lib.rs b/src/lib.rs index dcabb27..d7c7a8c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -146,6 +146,7 @@ pub use crate::book::{OrderBook as OrderBookImpl, OrderBookManager}; pub use crate::decode::Decoder; pub use crate::fill::{FillEngine, FillResult}; pub use crate::stream::{MarketStream, StreamManager, WebSocketStream}; +pub use crate::ws_hot_path::{WsBookApplyStats, WsBookUpdateProcessor}; // Re-export utilities pub use crate::utils::{crypto, math, rate_limit, retry, time, url}; @@ -165,6 +166,7 @@ pub mod orders; pub mod stream; pub mod types; pub mod utils; +pub mod ws_hot_path; // Benchmarks #[cfg(test)] diff --git a/src/ws_hot_path.rs b/src/ws_hot_path.rs new file mode 100644 index 0000000..7eaa98c --- /dev/null +++ b/src/ws_hot_path.rs @@ -0,0 +1,195 @@ +//! Zero-allocation-ish WebSocket hot-path processing. +//! +//! This module is focused on the "decode + apply" path for WS `book` events: +//! after warmup, processing a message should not perform heap allocations. +//! +//! Important: using the current tokio-tungstenite transport, the *network layer* +//! may still allocate when producing `Message::Text(String)`. This module aims to +//! make the *processing* layer allocation-free so we can enforce it with tests. + +use crate::book::OrderBookManager; +use crate::errors::{PolyfillError, Result}; +use crate::types::{decimal_to_price, decimal_to_qty, Side}; +use rust_decimal::Decimal; +use simd_json::prelude::*; +use std::str::FromStr; + +/// Summary of what happened while processing a WS payload. +#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)] +pub struct WsBookApplyStats { + pub book_messages: usize, + pub book_levels_applied: usize, +} + +/// In-place WS `book` message processor built on `simd-json`'s tape API. +/// +/// This avoids building a DOM (which allocates for arrays/objects) by decoding into a +/// reusable tape, then traversing it to extract the fields needed for order book updates. +pub struct WsBookUpdateProcessor { + buffers: simd_json::Buffers, + tape: Option>, +} + +impl WsBookUpdateProcessor { + /// Create a new processor. + /// + /// `input_len_hint` should be set to the typical WS message size to reduce warmup reallocs. + pub fn new(input_len_hint: usize) -> Self { + Self { + buffers: simd_json::Buffers::new(input_len_hint), + // Store an empty tape with a `'static` lifetime so we can reuse its allocation. + tape: Some(simd_json::Tape::null().reset()), + } + } + + /// Process a WS payload in-place (bytes will be mutated by the JSON parser). + pub fn process_bytes( + &mut self, + bytes: &mut [u8], + books: &OrderBookManager, + ) -> Result { + let mut tape = self + .tape + .take() + .expect("WsBookUpdateProcessor tape must be present") + .reset(); + + simd_json::fill_tape(bytes, &mut self.buffers, &mut tape).map_err(|e| { + PolyfillError::parse("Failed to parse WebSocket JSON", Some(Box::new(e))) + })?; + + let root = tape.as_value(); + let stats = process_root_value(root, books)?; + + // Reset the tape to detach lifetimes and keep capacity for reuse. + self.tape = Some(tape.reset()); + Ok(stats) + } + + /// Convenience: process an owned text message without allocating an additional buffer. + pub fn process_text( + &mut self, + text: String, + books: &OrderBookManager, + ) -> Result { + let mut bytes = text.into_bytes(); + self.process_bytes(bytes.as_mut_slice(), books) + } +} + +fn process_root_value<'tape, 'input>( + value: simd_json::tape::Value<'tape, 'input>, + books: &OrderBookManager, +) -> Result { + if let Some(obj) = value.as_object() { + return process_stream_object(obj, books); + } + + let Some(arr) = value.as_array() else { + return Ok(WsBookApplyStats::default()); + }; + + let mut total = WsBookApplyStats::default(); + for elem in arr.iter() { + let Some(obj) = elem.as_object() else { + continue; + }; + let stats = process_stream_object(obj, books)?; + total.book_messages += stats.book_messages; + total.book_levels_applied += stats.book_levels_applied; + } + + Ok(total) +} + +fn process_stream_object<'tape, 'input>( + obj: simd_json::tape::Object<'tape, 'input>, + books: &OrderBookManager, +) -> Result { + let Some(event_type) = obj.get("event_type").and_then(|v| v.into_string()) else { + return Ok(WsBookApplyStats::default()); + }; + + if event_type != "book" { + return Ok(WsBookApplyStats::default()); + } + + let asset_id = obj + .get("asset_id") + .and_then(|v| v.into_string()) + .ok_or_else(|| PolyfillError::parse("Missing asset_id", None))?; + + let timestamp_value = obj + .get("timestamp") + .ok_or_else(|| PolyfillError::parse("Missing timestamp", None))?; + let timestamp = parse_u64(timestamp_value) + .ok_or_else(|| PolyfillError::parse("Invalid timestamp", None))?; + + let bids = obj.get("bids").and_then(|v| v.as_array()); + let asks = obj.get("asks").and_then(|v| v.as_array()); + + let levels_applied = books.with_book_mut(asset_id, |book| { + if !book.begin_ws_book_update(asset_id, timestamp)? { + return Ok(0); + } + + let mut applied = 0usize; + if let Some(bids) = bids { + applied += apply_levels(book, Side::BUY, bids)?; + } + if let Some(asks) = asks { + applied += apply_levels(book, Side::SELL, asks)?; + } + + book.finish_ws_book_update(); + Ok(applied) + })?; + + Ok(WsBookApplyStats { + book_messages: 1, + book_levels_applied: levels_applied, + }) +} + +fn parse_u64<'tape, 'input>(value: simd_json::tape::Value<'tape, 'input>) -> Option { + value + .as_u64() + .or_else(|| value.into_string().and_then(|s| s.parse::().ok())) +} + +fn apply_levels<'tape, 'input>( + book: &mut crate::book::OrderBook, + side: Side, + levels: simd_json::tape::Array<'tape, 'input>, +) -> Result { + let mut applied = 0usize; + for level in levels.iter() { + let Some(obj) = level.as_object() else { + continue; + }; + + let price_str = obj + .get("price") + .and_then(|v| v.into_string()) + .ok_or_else(|| PolyfillError::parse("Missing price", None))?; + let size_str = obj + .get("size") + .and_then(|v| v.into_string()) + .ok_or_else(|| PolyfillError::parse("Missing size", None))?; + + let price_decimal = + Decimal::from_str(price_str).map_err(|_| PolyfillError::validation("Invalid price"))?; + let size_decimal = + Decimal::from_str(size_str).map_err(|_| PolyfillError::validation("Invalid size"))?; + + let price_ticks = decimal_to_price(price_decimal) + .map_err(|_| PolyfillError::validation("Invalid price"))?; + let size_units = + decimal_to_qty(size_decimal).map_err(|_| PolyfillError::validation("Invalid size"))?; + + book.apply_ws_book_level_fast(side, price_ticks, size_units)?; + applied += 1; + } + + Ok(applied) +} diff --git a/tests/no_alloc_hot_paths.rs b/tests/no_alloc_hot_paths.rs index 51b3bc9..c56a310 100644 --- a/tests/no_alloc_hot_paths.rs +++ b/tests/no_alloc_hot_paths.rs @@ -5,7 +5,7 @@ use std::hash::{Hash, Hasher}; use std::str::FromStr; use chrono::Utc; -use polyfill_rs::{book::OrderBookManager, OrderBookImpl, Side}; +use polyfill_rs::{book::OrderBookManager, OrderBookImpl, Side, WsBookUpdateProcessor}; use rust_decimal::Decimal; thread_local! { @@ -215,3 +215,57 @@ fn no_alloc_book_manager_apply_book_update_existing_levels() { manager.apply_book_update(&update).unwrap(); guard.assert_no_allocations(); } + +#[test] +fn no_alloc_ws_book_update_processor_apply_existing_levels() { + let asset_id = "test_asset_id"; + let manager = OrderBookManager::new(100); + manager.get_or_create_book(asset_id).unwrap(); + + // Warm up the internal book with initial levels (allocations allowed). + manager + .apply_delta(polyfill_rs::types::OrderDelta { + token_id: asset_id.to_string(), + timestamp: chrono::Utc::now(), + side: Side::BUY, + price: Decimal::from_str("0.75").unwrap(), + size: Decimal::from_str("100.0").unwrap(), + sequence: 1, + }) + .unwrap(); + manager + .apply_delta(polyfill_rs::types::OrderDelta { + token_id: asset_id.to_string(), + timestamp: chrono::Utc::now(), + side: Side::SELL, + price: Decimal::from_str("0.76").unwrap(), + size: Decimal::from_str("100.0").unwrap(), + sequence: 2, + }) + .unwrap(); + + let mut processor = WsBookUpdateProcessor::new(1024); + + // Warm up simd-json buffers/tape outside the guarded section. + let mut warmup_msg = format!( + "{{\"event_type\":\"book\",\"asset_id\":\"{asset_id}\",\"market\":\"0xabc\",\"timestamp\":10,\"bids\":[{{\"price\":\"0.75\",\"size\":\"200.0\"}}],\"asks\":[{{\"price\":\"0.76\",\"size\":\"50.0\"}}]}}" + ) + .into_bytes(); + processor + .process_bytes(warmup_msg.as_mut_slice(), &manager) + .unwrap(); + + let mut msg = format!( + "{{\"event_type\":\"book\",\"asset_id\":\"{asset_id}\",\"market\":\"0xabc\",\"timestamp\":11,\"bids\":[{{\"price\":\"0.75\",\"size\":\"150.0\"}}],\"asks\":[{{\"price\":\"0.76\",\"size\":\"75.0\"}}]}}" + ) + .into_bytes(); + + // Warm up TLS access before measuring (defensive). + let _ = allocation_count(); + + let guard = NoAllocGuard::new(); + processor + .process_bytes(msg.as_mut_slice(), &manager) + .unwrap(); + guard.assert_no_allocations(); +} From d45b202d6d71e72d298a00601812efdd7c786f70 Mon Sep 17 00:00:00 2001 From: floor-licker Date: Fri, 30 Jan 2026 20:40:07 -0500 Subject: [PATCH 19/32] feat(ws): add book-applier stream --- src/lib.rs | 2 +- src/stream.rs | 136 ++++++++++++++++++++++++++++++++++++ tests/no_alloc_hot_paths.rs | 54 +++++++++++++- 3 files changed, 190 insertions(+), 2 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index d7c7a8c..070ee7e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -145,7 +145,7 @@ pub use crate::errors::{PolyfillError, Result}; pub use crate::book::{OrderBook as OrderBookImpl, OrderBookManager}; pub use crate::decode::Decoder; pub use crate::fill::{FillEngine, FillResult}; -pub use crate::stream::{MarketStream, StreamManager, WebSocketStream}; +pub use crate::stream::{MarketStream, StreamManager, WebSocketBookApplier, WebSocketStream}; pub use crate::ws_hot_path::{WsBookApplyStats, WsBookUpdateProcessor}; // Re-export utilities diff --git a/src/stream.rs b/src/stream.rs index 0ba3b9a..a3e2613 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -5,6 +5,7 @@ use crate::errors::{PolyfillError, Result}; use crate::types::*; +use crate::ws_hot_path::{WsBookApplyStats, WsBookUpdateProcessor}; use chrono::Utc; use futures::{SinkExt, Stream, StreamExt}; use serde_json::Value; @@ -378,6 +379,116 @@ impl WebSocketStream { } } +/// WebSocket stream wrapper that applies `book` updates directly into an [`crate::book::OrderBookManager`]. +/// +/// This bypasses `StreamMessage` decoding (serde/DOM parsing) for the `book` hot path by using +/// [`WsBookUpdateProcessor`]. Non-`book` WS payloads are ignored. +/// +/// Note: the underlying WS transport may still allocate when producing `Message::Text(String)`. +pub struct WebSocketBookApplier<'a> { + stream: WebSocketStream, + books: &'a crate::book::OrderBookManager, + processor: WsBookUpdateProcessor, +} + +impl WebSocketStream { + /// Convert this stream into a book-applier stream. + /// + /// The caller is expected to "warm up" the [`crate::book::OrderBookManager`] by creating books for all + /// subscribed asset IDs ahead of time. Missing books are treated as an error. + pub fn into_book_applier<'a>( + mut self, + books: &'a crate::book::OrderBookManager, + processor: WsBookUpdateProcessor, + ) -> WebSocketBookApplier<'a> { + // Drop any pre-parsed messages to avoid mixing the two streaming modes. + self.pending.clear(); + WebSocketBookApplier { + stream: self, + books, + processor, + } + } +} + +impl<'a> WebSocketBookApplier<'a> { + /// Access the underlying WebSocket stream (e.g., for subscribe/unsubscribe calls). + pub fn stream_mut(&mut self) -> &mut WebSocketStream { + &mut self.stream + } + + /// Current WebSocket connection stats. + pub fn stream_stats(&self) -> StreamStats { + self.stream.stats.clone() + } + + /// Access the hot-path processor (e.g., to reuse it across connections). + pub fn processor_mut(&mut self) -> &mut WsBookUpdateProcessor { + &mut self.processor + } + + /// Apply a single WS text payload (useful for custom transports and for testing). + pub fn apply_text_message(&mut self, text: String) -> Result { + let stats = self.processor.process_text(text, self.books)?; + self.stream.stats.messages_received += 1; + self.stream.stats.last_message_time = Some(Utc::now()); + Ok(stats) + } +} + +impl<'a> Stream for WebSocketBookApplier<'a> { + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + loop { + let Some(connection) = &mut self.stream.connection else { + return Poll::Ready(None); + }; + + match connection.poll_next_unpin(cx) { + Poll::Pending => return Poll::Pending, + Poll::Ready(Some(Ok(msg))) => match msg { + tokio_tungstenite::tungstenite::Message::Text(text) => { + match self.apply_text_message(text) { + Ok(stats) => { + if stats.book_messages == 0 { + continue; + } + return Poll::Ready(Some(Ok(stats))); + }, + Err(e) => { + self.stream.stats.errors += 1; + return Poll::Ready(Some(Err(e))); + }, + } + }, + tokio_tungstenite::tungstenite::Message::Close(_) => { + info!("WebSocket connection closed by server"); + self.stream.connection = None; + return Poll::Ready(None); + }, + tokio_tungstenite::tungstenite::Message::Ping(_) => { + // Best-effort: tokio-tungstenite/tungstenite may handle pings internally. + continue; + }, + tokio_tungstenite::tungstenite::Message::Pong(_) => continue, + tokio_tungstenite::tungstenite::Message::Binary(_) => continue, + tokio_tungstenite::tungstenite::Message::Frame(_) => continue, + }, + Poll::Ready(Some(Err(e))) => { + error!("WebSocket error: {}", e); + self.stream.stats.errors += 1; + return Poll::Ready(Some(Err(e.into()))); + }, + Poll::Ready(None) => { + info!("WebSocket stream ended"); + return Poll::Ready(None); + }, + } + } + } +} + impl Stream for WebSocketStream { type Item = Result; @@ -585,6 +696,8 @@ impl StreamManager { #[cfg(test)] mod tests { use super::*; + use rust_decimal::Decimal; + use std::str::FromStr; #[test] fn test_mock_stream() { @@ -626,4 +739,27 @@ mod tests { }); assert!(manager.broadcast_message(message).is_ok()); } + + #[test] + fn test_websocket_book_applier_apply_text_message_updates_book() { + let books = crate::book::OrderBookManager::new(64); + let _ = books.get_or_create_book("12345").unwrap(); + + let processor = WsBookUpdateProcessor::new(1024); + let stream = WebSocketStream::new("wss://example.com/ws"); + let mut applier = stream.into_book_applier(&books, processor); + + let msg = r#"{"event_type":"book","asset_id":"12345","timestamp":1,"bids":[{"price":"0.75","size":"10"}],"asks":[{"price":"0.76","size":"5"}]}"#.to_string(); + let stats = applier.apply_text_message(msg).unwrap(); + assert_eq!(stats.book_messages, 1); + assert_eq!(stats.book_levels_applied, 2); + + let snapshot = books.get_book("12345").unwrap(); + assert_eq!(snapshot.bids.len(), 1); + assert_eq!(snapshot.asks.len(), 1); + assert_eq!(snapshot.bids[0].price, Decimal::from_str("0.75").unwrap()); + assert_eq!(snapshot.bids[0].size, Decimal::from_str("10").unwrap()); + assert_eq!(snapshot.asks[0].price, Decimal::from_str("0.76").unwrap()); + assert_eq!(snapshot.asks[0].size, Decimal::from_str("5").unwrap()); + } } diff --git a/tests/no_alloc_hot_paths.rs b/tests/no_alloc_hot_paths.rs index c56a310..0e47168 100644 --- a/tests/no_alloc_hot_paths.rs +++ b/tests/no_alloc_hot_paths.rs @@ -5,7 +5,9 @@ use std::hash::{Hash, Hasher}; use std::str::FromStr; use chrono::Utc; -use polyfill_rs::{book::OrderBookManager, OrderBookImpl, Side, WsBookUpdateProcessor}; +use polyfill_rs::{ + book::OrderBookManager, OrderBookImpl, Side, WebSocketStream, WsBookUpdateProcessor, +}; use rust_decimal::Decimal; thread_local! { @@ -269,3 +271,53 @@ fn no_alloc_ws_book_update_processor_apply_existing_levels() { .unwrap(); guard.assert_no_allocations(); } + +#[test] +fn no_alloc_websocket_book_applier_apply_text_message_existing_levels() { + let asset_id = "test_asset_id"; + let manager = OrderBookManager::new(100); + manager.get_or_create_book(asset_id).unwrap(); + + // Warm up the internal book with initial levels (allocations allowed). + manager + .apply_delta(polyfill_rs::types::OrderDelta { + token_id: asset_id.to_string(), + timestamp: chrono::Utc::now(), + side: Side::BUY, + price: Decimal::from_str("0.75").unwrap(), + size: Decimal::from_str("100.0").unwrap(), + sequence: 1, + }) + .unwrap(); + manager + .apply_delta(polyfill_rs::types::OrderDelta { + token_id: asset_id.to_string(), + timestamp: chrono::Utc::now(), + side: Side::SELL, + price: Decimal::from_str("0.76").unwrap(), + size: Decimal::from_str("100.0").unwrap(), + sequence: 2, + }) + .unwrap(); + + let processor = WsBookUpdateProcessor::new(1024); + let stream = WebSocketStream::new("wss://example.com/ws"); + let mut applier = stream.into_book_applier(&manager, processor); + + // Warm up simd-json buffers/tape outside the guarded section. + let warmup_msg = format!( + "{{\"event_type\":\"book\",\"asset_id\":\"{asset_id}\",\"market\":\"0xabc\",\"timestamp\":10,\"bids\":[{{\"price\":\"0.75\",\"size\":\"200.0\"}}],\"asks\":[{{\"price\":\"0.76\",\"size\":\"50.0\"}}]}}" + ); + applier.apply_text_message(warmup_msg).unwrap(); + + let msg = format!( + "{{\"event_type\":\"book\",\"asset_id\":\"{asset_id}\",\"market\":\"0xabc\",\"timestamp\":11,\"bids\":[{{\"price\":\"0.75\",\"size\":\"150.0\"}}],\"asks\":[{{\"price\":\"0.76\",\"size\":\"75.0\"}}]}}" + ); + + // Warm up TLS access before measuring (defensive). + let _ = allocation_count(); + + let guard = NoAllocGuard::new(); + applier.apply_text_message(msg).unwrap(); + guard.assert_no_allocations(); +} From 31062b3ed720f139597d42c79a291709d0f12d65 Mon Sep 17 00:00:00 2001 From: floor-licker Date: Fri, 30 Jan 2026 20:54:21 -0500 Subject: [PATCH 20/32] docs: align testing guide with CI --- docs/TESTING.md | 53 +++++++++++++++----------------- scripts/run_integration_tests.sh | 30 ++++++++++++++++++ tests/common/mod.rs | 8 ++++- 3 files changed, 62 insertions(+), 29 deletions(-) create mode 100755 scripts/run_integration_tests.sh diff --git a/docs/TESTING.md b/docs/TESTING.md index c39a4a1..719774b 100644 --- a/docs/TESTING.md +++ b/docs/TESTING.md @@ -13,21 +13,21 @@ This document describes how to run tests for polyfill-rs, with a focus on integr ### Integration Tests - **Location**: `tests/integration_tests.rs` - **Purpose**: Verify the client can communicate with the real Polymarket API -- **Dependencies**: Network connectivity, optional authentication credentials +- **Dependencies**: Network connectivity + credentials (tests are `#[ignore]` by default) - **Speed**: Slower (network calls) ## Running Tests ### Quick Start (Basic Tests) ```bash -# Run all unit tests -cargo test +# Run unit tests + doc tests (real-API tests are `#[ignore]` by default) +cargo test --all-features -# Run only integration tests -cargo test --test integration_tests +# Run the "no-alloc hot paths" regression tests +cargo test --all-features --test no_alloc_hot_paths -# Run with verbose output -cargo test --test integration_tests -- --nocapture +# Compile-check all examples +cargo build --examples ``` ### Full Integration Testing @@ -53,11 +53,11 @@ export POLYMARKET_CHAIN_ID="137" #### 2. Run Integration Tests ```bash -# Using the test runner script +# Using the test runner script (runs ignored tests that hit the real API) ./scripts/run_integration_tests.sh # Or directly with cargo -cargo test --test integration_tests -- --nocapture +cargo test --all-features --test integration_tests -- --ignored --nocapture --test-threads=1 ``` ## Test Categories @@ -93,10 +93,9 @@ Performance test passed Markets returned: 50 ``` -### Skip Indicators +### Ignored Indicators (default) ``` -Skipping authentication test - no private key provided -Skipping order management test - missing auth credentials +test test_real_api_* ... ignored ``` ### Failure Indicators @@ -134,15 +133,13 @@ nslookup clob.polymarket.com # Verify private key format echo $POLYMARKET_PRIVATE_KEY | wc -c # Should be 66 characters (0x + 64 hex) -# Test with minimal credentials -export POLYMARKET_PRIVATE_KEY="0x1234567890123456789012345678901234567890123456789012345678901234" -cargo test test_authentication +# Run a small ignored auth smoke-test (requires real credentials) +cargo test --all-features --test simple_auth_test -- --ignored --nocapture --test-threads=1 ``` #### Rate Limiting ```bash -# If tests fail due to rate limiting, add delays -export POLYMARKET_TEST_DELAY=1000 # 1 second between requests +# If tests fail due to rate limiting, consider adding delays between manual runs. ``` ### Debug Mode @@ -151,25 +148,25 @@ Run tests with detailed logging: ```bash # Enable debug logging -RUST_LOG=debug cargo test --test integration_tests -- --nocapture +RUST_LOG=debug cargo test --all-features --test integration_tests -- --ignored --nocapture --test-threads=1 # Enable trace logging for maximum detail -RUST_LOG=trace cargo test --test integration_tests -- --nocapture +RUST_LOG=trace cargo test --all-features --test integration_tests -- --ignored --nocapture --test-threads=1 ``` ## Continuous Integration ### GitHub Actions -Our CI runs integration tests automatically: +Our CI runs formatting, clippy, unit tests, docs, security audit, and a separate no-alloc job. Real-API integration tests are `#[ignore]` and are not run in CI. ```yaml # .github/workflows/ci.yml -- name: Run Integration Tests - env: - POLYMARKET_HOST: ${{ secrets.POLYMARKET_HOST }} - POLYMARKET_CHAIN_ID: ${{ secrets.POLYMARKET_CHAIN_ID }} - run: cargo test --test integration_tests +- name: Run tests (excluding no-alloc hot paths) + run: cargo test --all-features -- --skip no_alloc_ + +- name: Run no-alloc hot path tests + run: cargo test --all-features --test no_alloc_hot_paths ``` ### Local CI @@ -180,8 +177,8 @@ Run the same tests locally: # Install cargo-nextest for faster test execution cargo install cargo-nextest -# Run with nextest -cargo nextest run --test integration_tests +# Run with nextest (ignored tests are not run by default) +cargo nextest run --all-features ``` ## Test Coverage @@ -235,4 +232,4 @@ async fn test_new_feature() -> Result<()> { - **Never commit credentials**: All test credentials are loaded from environment variables - **Use test accounts**: If testing with real credentials, use dedicated test accounts - **Read-only tests**: Order management tests only create orders, they don't execute them -- **Rate limiting**: Tests include delays to respect API rate limits \ No newline at end of file +- **Rate limiting**: Tests include delays to respect API rate limits diff --git a/scripts/run_integration_tests.sh b/scripts/run_integration_tests.sh new file mode 100755 index 0000000..a761431 --- /dev/null +++ b/scripts/run_integration_tests.sh @@ -0,0 +1,30 @@ +#!/usr/bin/env bash +# +# Runs the real-API integration tests for polyfill-rs. +# +# These tests hit the live Polymarket API and are `#[ignore]` by default. +# You must provide credentials via environment variables or a local `.env` file. + +set -euo pipefail + +ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)" +cd "$ROOT_DIR" + +echo "Running ignored integration tests against the live Polymarket API..." + +if [[ -z "${POLYMARKET_PRIVATE_KEY:-}" ]]; then + if [[ -f .env ]] && grep -q '^POLYMARKET_PRIVATE_KEY=' .env; then + echo "Using POLYMARKET_PRIVATE_KEY from .env" + else + echo "ERROR: POLYMARKET_PRIVATE_KEY is not set (env or .env)." + echo "Set POLYMARKET_PRIVATE_KEY in your environment or add it to .env." + exit 1 + fi +fi + +set -x + +# Run serially to reduce the chance of hitting rate limits. +cargo test --all-features --test integration_tests -- --ignored --nocapture --test-threads=1 +cargo test --all-features --test simple_auth_test -- --ignored --nocapture --test-threads=1 +cargo test --all-features --test order_posting_test -- --ignored --nocapture --test-threads=1 diff --git a/tests/common/mod.rs b/tests/common/mod.rs index 217b34e..1416f38 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -34,6 +34,12 @@ impl Default for TestConfig { } impl TestConfig { + /// Load a test configuration from environment variables (and a local `.env` file, if present). + pub fn from_env() -> Self { + dotenvy::dotenv().ok(); + Self::default() + } + /// Check if we have authentication credentials pub fn has_auth(&self) -> bool { self.private_key.is_some() @@ -163,4 +169,4 @@ impl TestReporter { pub fn performance(test_name: &str, duration: Duration) { println!("{} completed in {:?}", test_name, duration); } -} \ No newline at end of file +} From 2570aa08a1e46b260b06a1e1ac93cfd1db7cde80 Mon Sep 17 00:00:00 2001 From: floor-licker Date: Fri, 30 Jan 2026 20:55:20 -0500 Subject: [PATCH 21/32] docs: clarify integration test env vars --- docs/TESTING.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/TESTING.md b/docs/TESTING.md index 719774b..73102df 100644 --- a/docs/TESTING.md +++ b/docs/TESTING.md @@ -37,15 +37,15 @@ cargo build --examples Create a `.env` file or export variables: ```bash -# Required for authentication tests +# Required for ignored real-API tests export POLYMARKET_PRIVATE_KEY="your_private_key_here" -# Required for order management tests +# Optional: API credentials (some tests/tools may use these) export POLYMARKET_API_KEY="your_api_key" export POLYMARKET_API_SECRET="your_api_secret" export POLYMARKET_API_PASSPHRASE="your_passphrase" -# Optional (defaults provided) +# Optional (defaults provided in test helpers) export POLYMARKET_HOST="https://clob.polymarket.com" export POLYMARKET_CHAIN_ID="137" ``` From 0f577f0b2c5fba21ced4d9a747a2273c46aeafcb Mon Sep 17 00:00:00 2001 From: floor-licker Date: Fri, 30 Jan 2026 21:10:03 -0500 Subject: [PATCH 22/32] fix(api): improve order endpoint compatibility --- src/client.rs | 12 ++++++++---- src/types.rs | 5 +++-- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/src/client.rs b/src/client.rs index 6e1e70c..4fc5e96 100644 --- a/src/client.rs +++ b/src/client.rs @@ -848,10 +848,14 @@ impl ClobClient { let response = req.json(&body).send().await?; if !response.status().is_success() { - return Err(PolyfillError::api( - response.status().as_u16(), - "Failed to post order", - )); + let status = response.status().as_u16(); + let body = response.text().await.unwrap_or_default(); + let message = if body.is_empty() { + "Failed to post order".to_string() + } else { + format!("Failed to post order: {}", body) + }; + return Err(PolyfillError::api(status, message)); } Ok(response.json::().await?) diff --git a/src/types.rs b/src/types.rs index 0c47a4e..53efb9f 100644 --- a/src/types.rs +++ b/src/types.rs @@ -208,9 +208,10 @@ impl Side { } /// Order type specifications -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, Default)] #[allow(clippy::upper_case_acronyms)] pub enum OrderType { + #[default] GTC, FOK, GTD, @@ -1099,7 +1100,7 @@ pub struct OpenOrder { pub asset_id: String, #[serde(deserialize_with = "crate::decode::deserializers::number_from_string")] pub expiration: u64, - #[serde(rename = "type")] + #[serde(rename = "type", alias = "order_type", alias = "orderType", default)] pub order_type: OrderType, #[serde(deserialize_with = "crate::decode::deserializers::number_from_string")] pub created_at: u64, From 2dc3c06155119f202ee9f86b1e229f5025cdbfbd Mon Sep 17 00:00:00 2001 From: floor-licker Date: Fri, 30 Jan 2026 21:10:15 -0500 Subject: [PATCH 23/32] test: make real-api smoke tests robust --- scripts/run_integration_tests.sh | 11 ++++--- tests/integration_tests.rs | 54 +++++++++++++++++--------------- tests/simple_auth_test.rs | 8 +++-- 3 files changed, 41 insertions(+), 32 deletions(-) diff --git a/scripts/run_integration_tests.sh b/scripts/run_integration_tests.sh index a761431..c8aa7b5 100755 --- a/scripts/run_integration_tests.sh +++ b/scripts/run_integration_tests.sh @@ -22,9 +22,12 @@ if [[ -z "${POLYMARKET_PRIVATE_KEY:-}" ]]; then fi fi -set -x +ARGS=(--ignored --test-threads=1) +if [[ "${NO_CAPTURE:-0}" == "1" ]]; then + ARGS+=(--nocapture) +fi # Run serially to reduce the chance of hitting rate limits. -cargo test --all-features --test integration_tests -- --ignored --nocapture --test-threads=1 -cargo test --all-features --test simple_auth_test -- --ignored --nocapture --test-threads=1 -cargo test --all-features --test order_posting_test -- --ignored --nocapture --test-threads=1 +cargo test --all-features --test integration_tests -- "${ARGS[@]}" +cargo test --all-features --test simple_auth_test -- "${ARGS[@]}" +cargo test --all-features --test order_posting_test -- "${ARGS[@]}" diff --git a/tests/integration_tests.rs b/tests/integration_tests.rs index c2a9922..b15ebff 100644 --- a/tests/integration_tests.rs +++ b/tests/integration_tests.rs @@ -85,20 +85,23 @@ async fn test_real_api_authenticated_order_flow() { .expect("Failed to get midpoint"); println!("PASS: Current midpoint: {}", midpoint.mid); - // Step 4: Create and post a small order well away from market price - // (so it won't fill immediately) - let order_price = if midpoint.mid > dec!(0.5) { - dec!(0.01) // Very low buy price, won't fill + // Step 4: Create and post a small order well away from market price (so it won't fill immediately). + // IMPORTANT: choose side consistently with the price so we don't accidentally create a marketable order. + let (side, order_price) = if midpoint.mid > dec!(0.5) { + (Side::BUY, dec!(0.01)) // Very low buy price, won't fill } else { - dec!(0.99) // Very high sell price, won't fill + (Side::SELL, dec!(0.99)) // Very high sell price, won't fill }; - println!("Step 4: Posting order at price {}...", order_price); + println!( + "Step 4: Posting {:?} order at price {}...", + side, order_price + ); let order_args = OrderArgs { token_id: token_id.clone(), price: order_price, - size: dec!(1.0), // Minimum size - side: Side::BUY, + size: dec!(1.0), // Small size (auth is the thing we're testing here) + side, }; let post_result = client.create_and_post_order(&order_args).await; @@ -126,24 +129,23 @@ async fn test_real_api_authenticated_order_flow() { } }, Err(e) => { - let err_str = format!("{:?}", e); - - // Check if it's a 401 (authentication failure) - if err_str.contains("401") { - panic!("FAIL: CRITICAL: 401 Unauthorized error - HMAC authentication is broken!"); - } - - // Check if it's a 400 with specific validation errors (these are OK) - if err_str.contains("400") - && (err_str.contains("insufficient") - || err_str.contains("balance") - || err_str.contains("allowance") - || err_str.contains("POLY_AMOUNT_TOO_SMALL")) - { - println!("PASS: Authentication successful (got expected validation error)"); - println!(" Error: {}", err_str); - } else { - panic!("FAIL: Unexpected error: {:?}", e); + // The critical failure: did we get a 401 (authentication failure)? + match &e { + polyfill_rs::PolyfillError::Api { status: 401, .. } => { + panic!( + "FAIL: CRITICAL: 401 Unauthorized error - HMAC authentication is broken!" + ); + }, + // Any 4xx other than 401 indicates auth succeeded and we reached server-side validation. + polyfill_rs::PolyfillError::Api { + status: 400..=499, .. + } => { + println!("PASS: Authentication successful (got expected validation error)"); + println!(" Error: {:?}", e); + }, + _ => { + panic!("FAIL: Unexpected error: {:?}", e); + }, } }, } diff --git a/tests/simple_auth_test.rs b/tests/simple_auth_test.rs index 77a1031..ad9481f 100644 --- a/tests/simple_auth_test.rs +++ b/tests/simple_auth_test.rs @@ -18,7 +18,7 @@ async fn test_create_api_key_simple() { match result { Ok(creds) => { println!("Successfully created/derived API key"); - println!(" API Key: {}", creds.api_key); + println!(" API Key created (len={})", creds.api_key.len()); client.set_api_creds(creds); // Now try to get orders (requires auth) @@ -134,7 +134,11 @@ async fn test_get_notifications() { match result { Ok(notifs) => { - println!("Authentication successful! Notifications: {:?}", notifs); + let count = notifs.as_array().map(|arr| arr.len()); + match count { + Some(n) => println!("Authentication successful! Notifications: {n}"), + None => println!("Authentication successful! Notifications received"), + } }, Err(e) => { let err_str = format!("{:?}", e); From 1e1ef74730302e7a653747329126663cbf5b1fff Mon Sep 17 00:00:00 2001 From: floor-licker Date: Fri, 30 Jan 2026 21:14:42 -0500 Subject: [PATCH 24/32] test(ws): add ignored real-market websocket smoke test --- scripts/run_integration_tests.sh | 1 + tests/ws_integration_tests.rs | 64 ++++++++++++++++++++++++++++++++ 2 files changed, 65 insertions(+) create mode 100644 tests/ws_integration_tests.rs diff --git a/scripts/run_integration_tests.sh b/scripts/run_integration_tests.sh index c8aa7b5..2a219c5 100755 --- a/scripts/run_integration_tests.sh +++ b/scripts/run_integration_tests.sh @@ -29,5 +29,6 @@ fi # Run serially to reduce the chance of hitting rate limits. cargo test --all-features --test integration_tests -- "${ARGS[@]}" +cargo test --all-features --test ws_integration_tests -- "${ARGS[@]}" cargo test --all-features --test simple_auth_test -- "${ARGS[@]}" cargo test --all-features --test order_posting_test -- "${ARGS[@]}" diff --git a/tests/ws_integration_tests.rs b/tests/ws_integration_tests.rs new file mode 100644 index 0000000..a609eb3 --- /dev/null +++ b/tests/ws_integration_tests.rs @@ -0,0 +1,64 @@ +// WebSocket integration tests for polyfill-rs +// +// These tests connect to Polymarket's live WS endpoints and are ignored by default. +// +// Run with: +// cargo test --all-features --test ws_integration_tests -- --ignored --nocapture --test-threads=1 + +#![cfg(feature = "stream")] + +use futures::StreamExt; +use polyfill_rs::{ClobClient, OrderBookManager, WebSocketStream, WsBookUpdateProcessor}; +use std::time::Duration; + +const HOST: &str = "https://clob.polymarket.com"; +const WS_MARKET_URL: &str = "wss://ws-subscriptions-clob.polymarket.com/ws/market"; + +#[tokio::test(flavor = "multi_thread")] +#[ignore] +async fn test_real_ws_market_book_applier_receives_book_update() { + // Pick an active token ID so the market channel should produce data. + let client = ClobClient::new(HOST); + let markets = client + .get_sampling_markets(None) + .await + .expect("failed to fetch markets"); + + let token_id = markets + .data + .iter() + .find(|m| m.active && !m.closed) + .and_then(|m| m.tokens.first()) + .map(|t| t.token_id.clone()) + .expect("no active markets found"); + + let books = OrderBookManager::new(256); + books + .get_or_create_book(&token_id) + .expect("failed to create book"); + + let mut ws = WebSocketStream::new(WS_MARKET_URL); + ws.subscribe_market_channel(vec![token_id.clone()]) + .await + .expect("failed to subscribe market channel"); + + let processor = WsBookUpdateProcessor::new(256 * 1024); + let mut applier = ws.into_book_applier(&books, processor); + + let stats = tokio::time::timeout(Duration::from_secs(10), applier.next()) + .await + .expect("timed out waiting for WS book message") + .expect("WS stream ended unexpectedly") + .expect("WS processing error"); + + assert!( + stats.book_messages > 0, + "expected at least one book message" + ); + + let snapshot = books.get_book(&token_id).expect("failed to read book"); + assert!( + !snapshot.bids.is_empty() || !snapshot.asks.is_empty(), + "expected some book levels after applying an update" + ); +} From d4e240714c9466d20e740768f5b55e1b628be4f1 Mon Sep 17 00:00:00 2001 From: floor-licker Date: Fri, 30 Jan 2026 21:21:14 -0500 Subject: [PATCH 25/32] test(ws): add user channel and stability checks --- tests/ws_integration_tests.rs | 119 ++++++++++++++++++++++++++++++++++ 1 file changed, 119 insertions(+) diff --git a/tests/ws_integration_tests.rs b/tests/ws_integration_tests.rs index a609eb3..aa3c405 100644 --- a/tests/ws_integration_tests.rs +++ b/tests/ws_integration_tests.rs @@ -9,10 +9,25 @@ use futures::StreamExt; use polyfill_rs::{ClobClient, OrderBookManager, WebSocketStream, WsBookUpdateProcessor}; +use std::env; use std::time::Duration; const HOST: &str = "https://clob.polymarket.com"; const WS_MARKET_URL: &str = "wss://ws-subscriptions-clob.polymarket.com/ws/market"; +const WS_USER_URL: &str = "wss://ws-subscriptions-clob.polymarket.com/ws/user"; +const CHAIN_ID: u64 = 137; + +fn load_private_key() -> String { + dotenvy::dotenv().ok(); + env::var("POLYMARKET_PRIVATE_KEY").expect("POLYMARKET_PRIVATE_KEY must be set (env or .env)") +} + +fn stability_secs(default_secs: u64) -> u64 { + env::var("POLYFILL_WS_STABILITY_SECS") + .ok() + .and_then(|v| v.parse::().ok()) + .unwrap_or(default_secs) +} #[tokio::test(flavor = "multi_thread")] #[ignore] @@ -62,3 +77,107 @@ async fn test_real_ws_market_book_applier_receives_book_update() { "expected some book levels after applying an update" ); } + +#[tokio::test(flavor = "multi_thread")] +#[ignore] +async fn test_real_ws_market_book_applier_connection_stable() { + // Subscribe to a handful of active tokens and keep the connection open for a while. + // We don't require constant message flow; we just fail on close or error. + let client = ClobClient::new(HOST); + let markets = client + .get_sampling_markets(None) + .await + .expect("failed to fetch markets"); + + let token_ids: Vec = markets + .data + .iter() + .filter(|m| m.active && !m.closed) + .filter_map(|m| m.tokens.first().map(|t| t.token_id.clone())) + .take(10) + .collect(); + + assert!(!token_ids.is_empty(), "no active token IDs found"); + + let books = OrderBookManager::new(256); + for token_id in &token_ids { + books + .get_or_create_book(token_id) + .expect("failed to create book"); + } + + let mut ws = WebSocketStream::new(WS_MARKET_URL); + ws.subscribe_market_channel(token_ids.clone()) + .await + .expect("failed to subscribe market channel"); + + let processor = WsBookUpdateProcessor::new(256 * 1024); + let mut applier = ws.into_book_applier(&books, processor); + + let deadline = tokio::time::Instant::now() + Duration::from_secs(stability_secs(15)); + let mut saw_book_message = false; + + while tokio::time::Instant::now() < deadline { + match tokio::time::timeout(Duration::from_secs(1), applier.next()).await { + Ok(Some(Ok(stats))) => { + if stats.book_messages > 0 { + saw_book_message = true; + } + }, + Ok(Some(Err(e))) => panic!("WS processing error: {:?}", e), + Ok(None) => panic!("WS stream ended unexpectedly"), + Err(_) => { + // No message in this interval; keep waiting. + }, + } + } + + assert!( + saw_book_message, + "did not observe any `book` messages during stability window" + ); +} + +#[tokio::test(flavor = "multi_thread")] +#[ignore] +async fn test_real_ws_user_channel_connection_stable() { + // Connect + subscribe to the user channel and keep the connection open for a while. + // + // Note: depending on account activity, the WS may not emit any `order`/`trade` messages + // during the window. This test primarily asserts we can authenticate + subscribe + // and that the connection doesn't immediately drop. + let private_key = load_private_key(); + let auth_client = ClobClient::with_l1_headers(HOST, &private_key, CHAIN_ID); + let api_creds = auth_client + .create_or_derive_api_key(None) + .await + .expect("failed to create/derive api key"); + + let markets = auth_client + .get_sampling_markets(None) + .await + .expect("failed to fetch markets"); + let market_id = markets + .data + .iter() + .find(|m| m.active && !m.closed) + .map(|m| m.condition_id.clone()) + .expect("no active markets found"); + + let mut ws = WebSocketStream::new(WS_USER_URL).with_auth(api_creds); + ws.subscribe_user_channel(vec![market_id]) + .await + .expect("failed to subscribe user channel"); + + let deadline = tokio::time::Instant::now() + Duration::from_secs(stability_secs(10)); + while tokio::time::Instant::now() < deadline { + match tokio::time::timeout(Duration::from_secs(1), ws.next()).await { + Ok(Some(Ok(_msg))) => {}, + Ok(Some(Err(e))) => panic!("WS error: {:?}", e), + Ok(None) => panic!("WS stream ended unexpectedly"), + Err(_) => { + // No message in this interval; keep waiting. + }, + } + } +} From 65bab0d3c1e0fae8ea6bdb8b25d698ffff705e73 Mon Sep 17 00:00:00 2001 From: floor-licker Date: Fri, 30 Jan 2026 21:30:10 -0500 Subject: [PATCH 26/32] docs: update README.md --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 197c5e2..5fa38be 100644 --- a/README.md +++ b/README.md @@ -8,7 +8,7 @@ A high-performance drop-in replacement for `polymarket-rs-client` with latency-o At the time that this project was started, `polymarket-rs-client` was a Polymarket Rust Client with a few GitHub stars, but which seemed to be unmaintained. I took on the task of creating a Rust client which could beat the benchmarks quoted in the README.md of that project, with the added constraint of also maintaining zero alloc hot paths. -I also want to take a moment to clarify what zero-alloc means because I've now recieved double digit messages about this on twitter/x and telegram. In general, zero alloc means either zero alloc in hot paths (which can be a bit more arbitrary) or atlernatively it can mean zero alloc ater init/warm-up, which is the objective of this repository. Succinctly that means that **the per-message handling loop never touches the heap**. +I also want to take a moment to clarify what zero-alloc means because I've now recieved double digit messages about this on twitter/x and telegram. In general, zero alloc means either zero alloc in hot paths (which can be a bit more arbitrary) or atlernatively it can mean zero alloc after init/warm-up, which is the objective of this repository. Succinctly that means that **the per-message handling loop never touches the heap**. Notably order book paths that introduce new allocations by design: - First time seeing a token/book (HashMap insert + key clone): `src/book.rs:~788` From 40b503ef29efa02ec8dfe4f3733239fd5e2a16dd Mon Sep 17 00:00:00 2001 From: floor-licker Date: Fri, 30 Jan 2026 22:41:51 -0500 Subject: [PATCH 27/32] bench(ws): add WS hot-path Criterion benchmark --- Cargo.toml | 4 + benches/ws_hot_path.rs | 180 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 184 insertions(+) create mode 100644 benches/ws_hot_path.rs diff --git a/Cargo.toml b/Cargo.toml index e204510..da23643 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -92,6 +92,10 @@ harness = false name = "network_benchmarks" harness = false +[[bench]] +name = "ws_hot_path" +harness = false + [profile.release] # Optimizations for HFT performance opt-level = 3 diff --git a/benches/ws_hot_path.rs b/benches/ws_hot_path.rs new file mode 100644 index 0000000..46d4fa6 --- /dev/null +++ b/benches/ws_hot_path.rs @@ -0,0 +1,180 @@ +//! Benchmarks for the WebSocket `book` hot path. +//! +//! These are intended to approximate a warmed, steady-state processing loop: +//! after init/warmup, per-message processing should avoid heap allocations. +//! The allocation checks live in `tests/no_alloc_hot_paths.rs`; these benches +//! focus on throughput/latency of the processing path. + +use criterion::{black_box, criterion_group, criterion_main, BatchSize, Criterion, Throughput}; +use polyfill_rs::types::BookUpdate; +use polyfill_rs::{OrderBookManager, OrderSummary, WsBookUpdateProcessor}; +use rust_decimal::Decimal; +use std::sync::atomic::{AtomicU64, Ordering}; + +const START_TIMESTAMP: u64 = 1_000_000_000_000_000; +const BOOK_ASSET_ID: &str = "test_asset_id"; +const BOOK_MARKET: &str = "0xabc"; + +struct TimestampRange { + start: usize, + end: usize, +} + +impl TimestampRange { + fn find(bytes: &[u8]) -> Self { + let needle = b"\"timestamp\":"; + let Some(pos) = bytes.windows(needle.len()).position(|w| w == needle) else { + panic!("timestamp field not found in WS template JSON"); + }; + + let start = pos + needle.len(); + let mut end = start; + while end < bytes.len() && bytes[end].is_ascii_digit() { + end += 1; + } + + if start == end { + panic!("timestamp digits not found in WS template JSON"); + } + + Self { start, end } + } + + fn write_fixed_width(&self, bytes: &mut [u8], mut value: u64) { + let width = self.end - self.start; + + // Write digits right-to-left into the existing digit window. + for idx in (0..width).rev() { + let digit = (value % 10) as u8; + bytes[self.start + idx] = b'0' + digit; + value /= 10; + } + } +} + +fn price_string_from_ticks(ticks: u32) -> String { + let whole = ticks / 10_000; + let frac = ticks % 10_000; + format!("{whole}.{frac:04}") +} + +fn build_book_update(levels_per_side: usize) -> BookUpdate { + let mut bids = Vec::with_capacity(levels_per_side); + let mut asks = Vec::with_capacity(levels_per_side); + + let size = Decimal::new(1_000_000, 4); // 100.0000 + + for i in 0..levels_per_side { + let bid_ticks = 7_500u32 - i as u32; + let ask_ticks = 7_501u32 + i as u32; + bids.push(OrderSummary { + price: Decimal::new(bid_ticks as i64, 4), + size, + }); + asks.push(OrderSummary { + price: Decimal::new(ask_ticks as i64, 4), + size, + }); + } + + BookUpdate { + asset_id: BOOK_ASSET_ID.to_string(), + market: BOOK_MARKET.to_string(), + timestamp: 1, + bids, + asks, + hash: None, + } +} + +fn build_ws_book_template(levels_per_side: usize) -> Vec { + let mut json = String::new(); + + json.push_str("{\"event_type\":\"book\",\"asset_id\":\""); + json.push_str(BOOK_ASSET_ID); + json.push_str("\",\"timestamp\":"); + json.push_str(&START_TIMESTAMP.to_string()); + json.push_str(",\"bids\":["); + + let size = "100.0000"; + for i in 0..levels_per_side { + if i != 0 { + json.push(','); + } + let bid_ticks = 7_500u32 - i as u32; + let bid_price = price_string_from_ticks(bid_ticks); + json.push_str("{\"price\":\""); + json.push_str(&bid_price); + json.push_str("\",\"size\":\""); + json.push_str(size); + json.push_str("\"}"); + } + + json.push_str("],\"asks\":["); + for i in 0..levels_per_side { + if i != 0 { + json.push(','); + } + let ask_ticks = 7_501u32 + i as u32; + let ask_price = price_string_from_ticks(ask_ticks); + json.push_str("{\"price\":\""); + json.push_str(&ask_price); + json.push_str("\",\"size\":\""); + json.push_str(size); + json.push_str("\"}"); + } + json.push_str("]}"); + + json.into_bytes() +} + +fn bench_ws_book_process_bytes(c: &mut Criterion) { + let mut group = c.benchmark_group("ws_book_hot_path"); + + for levels_per_side in [1usize, 16, 64] { + let books = OrderBookManager::new(levels_per_side * 2); + let _ = books.get_or_create_book(BOOK_ASSET_ID).unwrap(); + + // Warm up: ensure all levels exist so the steady-state path doesn't allocate. + let warmup_update = build_book_update(levels_per_side); + books.apply_book_update(&warmup_update).unwrap(); + + let template = build_ws_book_template(levels_per_side); + let ts_range = TimestampRange::find(&template); + + let mut processor = WsBookUpdateProcessor::new(template.len()); + let mut warmup_msg = template.clone(); + processor + .process_bytes(warmup_msg.as_mut_slice(), &books) + .unwrap(); + + let counter = AtomicU64::new(START_TIMESTAMP); + + group.throughput(Throughput::Bytes(template.len() as u64)); + group.bench_function( + format!("process_bytes_levels_per_side_{levels_per_side}"), + move |b| { + b.iter_batched( + || { + let mut msg = template.clone(); + let ts = counter.fetch_add(1, Ordering::Relaxed) + 1; + ts_range.write_fixed_width(msg.as_mut_slice(), ts); + msg + }, + |mut msg| { + let stats = processor + .process_bytes(black_box(msg.as_mut_slice()), black_box(&books)) + .unwrap(); + black_box(stats); + }, + BatchSize::SmallInput, + ); + }, + ); + } + + group.finish(); +} + +criterion_group!(benches, bench_ws_book_process_bytes); +criterion_main!(benches); From f5a807cfd0a6fdd5bd14ca3237f62bab6f1acfc2 Mon Sep 17 00:00:00 2001 From: floor-licker Date: Fri, 30 Jan 2026 22:48:52 -0500 Subject: [PATCH 28/32] bench(ws): add serde baseline comparison --- benches/ws_hot_path.rs | 69 ++++++++++++++++++++++++++++++++++-------- 1 file changed, 57 insertions(+), 12 deletions(-) diff --git a/benches/ws_hot_path.rs b/benches/ws_hot_path.rs index 46d4fa6..61ac7b9 100644 --- a/benches/ws_hot_path.rs +++ b/benches/ws_hot_path.rs @@ -7,7 +7,7 @@ use criterion::{black_box, criterion_group, criterion_main, BatchSize, Criterion, Throughput}; use polyfill_rs::types::BookUpdate; -use polyfill_rs::{OrderBookManager, OrderSummary, WsBookUpdateProcessor}; +use polyfill_rs::{OrderBookManager, OrderSummary, StreamMessage, WsBookUpdateProcessor}; use rust_decimal::Decimal; use std::sync::atomic::{AtomicU64, Ordering}; @@ -92,6 +92,8 @@ fn build_ws_book_template(levels_per_side: usize) -> Vec { json.push_str("{\"event_type\":\"book\",\"asset_id\":\""); json.push_str(BOOK_ASSET_ID); + json.push_str("\",\"market\":\""); + json.push_str(BOOK_MARKET); json.push_str("\",\"timestamp\":"); json.push_str(&START_TIMESTAMP.to_string()); json.push_str(",\"bids\":["); @@ -132,38 +134,42 @@ fn bench_ws_book_process_bytes(c: &mut Criterion) { let mut group = c.benchmark_group("ws_book_hot_path"); for levels_per_side in [1usize, 16, 64] { - let books = OrderBookManager::new(levels_per_side * 2); - let _ = books.get_or_create_book(BOOK_ASSET_ID).unwrap(); + let hot_path_books = OrderBookManager::new(levels_per_side * 2); + let _ = hot_path_books.get_or_create_book(BOOK_ASSET_ID).unwrap(); // Warm up: ensure all levels exist so the steady-state path doesn't allocate. let warmup_update = build_book_update(levels_per_side); - books.apply_book_update(&warmup_update).unwrap(); + hot_path_books.apply_book_update(&warmup_update).unwrap(); let template = build_ws_book_template(levels_per_side); - let ts_range = TimestampRange::find(&template); + let tape_template = template.clone(); + let ts_range = TimestampRange::find(&tape_template); - let mut processor = WsBookUpdateProcessor::new(template.len()); - let mut warmup_msg = template.clone(); + let mut processor = WsBookUpdateProcessor::new(tape_template.len()); + let mut warmup_msg = tape_template.clone(); processor - .process_bytes(warmup_msg.as_mut_slice(), &books) + .process_bytes(warmup_msg.as_mut_slice(), &hot_path_books) .unwrap(); let counter = AtomicU64::new(START_TIMESTAMP); - group.throughput(Throughput::Bytes(template.len() as u64)); + group.throughput(Throughput::Bytes(tape_template.len() as u64)); group.bench_function( - format!("process_bytes_levels_per_side_{levels_per_side}"), + format!("tape_process_and_apply_levels_per_side_{levels_per_side}"), move |b| { b.iter_batched( || { - let mut msg = template.clone(); + let mut msg = tape_template.clone(); let ts = counter.fetch_add(1, Ordering::Relaxed) + 1; ts_range.write_fixed_width(msg.as_mut_slice(), ts); msg }, |mut msg| { let stats = processor - .process_bytes(black_box(msg.as_mut_slice()), black_box(&books)) + .process_bytes( + black_box(msg.as_mut_slice()), + black_box(&hot_path_books), + ) .unwrap(); black_box(stats); }, @@ -171,6 +177,45 @@ fn bench_ws_book_process_bytes(c: &mut Criterion) { ); }, ); + + // Baseline: serde_json DOM -> StreamMessage -> BookUpdate -> apply to books. + // + // This is representative of our "non-hot-path" decoding approach and provides + // a direct comparison within the same benchmark. + let serde_books = OrderBookManager::new(levels_per_side * 2); + let _ = serde_books.get_or_create_book(BOOK_ASSET_ID).unwrap(); + serde_books.apply_book_update(&warmup_update).unwrap(); + + let serde_template = template; + let serde_ts_range = TimestampRange::find(&serde_template); + let serde_counter = AtomicU64::new(START_TIMESTAMP); + + group.bench_function( + format!("serde_decode_and_apply_levels_per_side_{levels_per_side}"), + move |b| { + b.iter_batched( + || { + let mut msg = serde_template.clone(); + let ts = serde_counter.fetch_add(1, Ordering::Relaxed) + 1; + serde_ts_range.write_fixed_width(msg.as_mut_slice(), ts); + msg + }, + |msg| { + let messages = polyfill_rs::decode::parse_stream_messages_bytes(black_box( + msg.as_slice(), + )) + .unwrap(); + + for message in messages { + if let StreamMessage::Book(update) = message { + serde_books.apply_book_update(&update).unwrap(); + } + } + }, + BatchSize::SmallInput, + ); + }, + ); } group.finish(); From 25f268981f716ed8c56ea4450681ca5fd94bbe58 Mon Sep 17 00:00:00 2001 From: floor-licker Date: Fri, 30 Jan 2026 23:03:33 -0500 Subject: [PATCH 29/32] docs(readme): mention WS hot-path benchmark --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index 5fa38be..db5f91c 100644 --- a/README.md +++ b/README.md @@ -64,6 +64,7 @@ End-to-end performance with Polymarket's API, including network latency, JSON pa | **Order Book Updates (1000 ops)** | 159.6 µs ± 32 µs | 6,260 updates/sec, zero-allocation | | **Spread/Mid Calculations** | 70 ns ± 77 ns | 14.3M ops/sec, optimized BTreeMap | | **JSON Parsing (480KB)** | ~2.3 ms | SIMD-accelerated parsing (1.77x faster than serde_json) | +| **WS `book` hot path (decode + apply)** | ~0.28 µs / 2.01 µs / 7.70 µs | 1 / 16 / 64 levels-per-side, ~3.7–4.0x faster vs serde decode+apply (see `benches/ws_hot_path.rs`) | **Key Performance Optimizations:** From 9f86e244024a633ce331c8c99839943c34b2d926 Mon Sep 17 00:00:00 2001 From: floor-licker Date: Fri, 30 Jan 2026 23:04:25 -0500 Subject: [PATCH 30/32] docs(readme): add ws_hot_path benchmark command --- README.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/README.md b/README.md index db5f91c..3171e3d 100644 --- a/README.md +++ b/README.md @@ -66,6 +66,8 @@ End-to-end performance with Polymarket's API, including network latency, JSON pa | **JSON Parsing (480KB)** | ~2.3 ms | SIMD-accelerated parsing (1.77x faster than serde_json) | | **WS `book` hot path (decode + apply)** | ~0.28 µs / 2.01 µs / 7.70 µs | 1 / 16 / 64 levels-per-side, ~3.7–4.0x faster vs serde decode+apply (see `benches/ws_hot_path.rs`) | +Run the WS hot-path benchmark locally with `cargo bench --bench ws_hot_path`. + **Key Performance Optimizations:** The 21.4% performance improvement comes from SIMD-accelerated JSON parsing (1.77x faster than serde_json), HTTP/2 tuning with 512KB stream windows optimized for 469KB payloads, integrated DNS caching, connection keep-alive, and buffer pooling to reduce allocation overhead. From 3dfb65ea0ea56c7b0f45b593ee5f2321223abe76 Mon Sep 17 00:00:00 2001 From: floor-licker Date: Sat, 31 Jan 2026 10:30:48 -0500 Subject: [PATCH 31/32] feat(api): add /prices-history helper methods --- src/client.rs | 163 +++++++++++++++++++++++++++++++++++++++++++++++++- src/lib.rs | 2 + src/types.rs | 38 ++++++++++++ 3 files changed, 200 insertions(+), 3 deletions(-) diff --git a/src/client.rs b/src/client.rs index 4fc5e96..d7480d7 100644 --- a/src/client.rs +++ b/src/client.rs @@ -460,6 +460,126 @@ impl ClobClient { Ok(price) } + fn validate_prices_history_asset_id(asset_id: &str) -> Result<()> { + if asset_id.is_empty() { + return Err(PolyfillError::validation( + "asset_id is required (use the decimal token_id / asset_id)", + )); + } + + // Common footgun: passing a condition id (0x...) instead of the decimal asset id. + if asset_id.starts_with("0x") || asset_id.starts_with("0X") { + return Err(PolyfillError::validation( + "`/prices-history` expects a decimal token_id/asset_id, not a hex condition_id", + )); + } + + if !asset_id.as_bytes().iter().all(u8::is_ascii_digit) { + return Err(PolyfillError::validation( + "asset_id must be a decimal string (token_id / asset_id)", + )); + } + + Ok(()) + } + + /// Get price history for a single outcome (`token_id` / `asset_id`) over a fixed interval. + /// + /// Important: the upstream API query parameter is named `market`, but it expects the + /// decimal outcome asset id (not the hex `condition_id`). + pub async fn get_prices_history_interval( + &self, + asset_id: &str, + interval: PricesHistoryInterval, + fidelity: Option, + ) -> Result { + Self::validate_prices_history_asset_id(asset_id)?; + + let mut request = self + .http_client + .get(format!("{}/prices-history", self.base_url)) + .query(&[("market", asset_id), ("interval", interval.as_str())]); + + if let Some(fidelity) = fidelity { + request = request.query(&[("fidelity", fidelity)]); + } + + let response = request.send().await?; + if !response.status().is_success() { + let status = response.status().as_u16(); + let body = response.text().await.unwrap_or_default(); + let message = serde_json::from_str::(&body) + .ok() + .and_then(|v| { + v.get("error") + .and_then(Value::as_str) + .map(|s| s.to_string()) + }) + .unwrap_or_else(|| { + if body.is_empty() { + "Failed to get prices history".to_string() + } else { + body + } + }); + return Err(PolyfillError::api(status, message)); + } + + Ok(response.json::().await?) + } + + /// Get price history for a single outcome (`token_id` / `asset_id`) over a timestamp range. + /// + /// `start_ts` and `end_ts` are Unix timestamps (seconds). + pub async fn get_prices_history_range( + &self, + asset_id: &str, + start_ts: u64, + end_ts: u64, + fidelity: Option, + ) -> Result { + Self::validate_prices_history_asset_id(asset_id)?; + + if start_ts >= end_ts { + return Err(PolyfillError::validation( + "start_ts must be < end_ts for prices history", + )); + } + + let mut request = self + .http_client + .get(format!("{}/prices-history", self.base_url)) + .query(&[("market", asset_id)]) + .query(&[("startTs", start_ts), ("endTs", end_ts)]); + + if let Some(fidelity) = fidelity { + request = request.query(&[("fidelity", fidelity)]); + } + + let response = request.send().await?; + if !response.status().is_success() { + let status = response.status().as_u16(); + let body = response.text().await.unwrap_or_default(); + let message = serde_json::from_str::(&body) + .ok() + .and_then(|v| { + v.get("error") + .and_then(Value::as_str) + .map(|s| s.to_string()) + }) + .unwrap_or_else(|| { + if body.is_empty() { + "Failed to get prices history".to_string() + } else { + body + } + }); + return Err(PolyfillError::api(status, message)); + } + + Ok(response.json::().await?) + } + /// Get tick size for a token pub async fn get_tick_size(&self, token_id: &str) -> Result { let response = self @@ -1723,8 +1843,8 @@ impl ClobClient { // Re-export types from the canonical location in types.rs pub use crate::types::{ ExtraOrderArgs, Market, MarketOrderArgs, MarketsResponse, MidpointResponse, NegRiskResponse, - OrderBookSummary, OrderSummary, PriceResponse, Rewards, SpreadResponse, TickSizeResponse, - Token, + OrderBookSummary, OrderSummary, PriceResponse, PricesHistoryInterval, PricesHistoryResponse, + Rewards, SpreadResponse, TickSizeResponse, Token, }; // Compatibility types that need to stay in client.rs @@ -1740,7 +1860,7 @@ pub type PolyfillClient = ClobClient; #[cfg(test)] mod tests { use super::{ClobClient, OrderArgs as ClientOrderArgs}; - use crate::types::Side; + use crate::types::{PricesHistoryInterval, Side}; use crate::{ApiCredentials, PolyfillError}; use mockito::{Matcher, Server}; use rust_decimal::Decimal; @@ -2030,6 +2150,43 @@ mod tests { assert_eq!(response.price, Decimal::from_str("0.76").unwrap()); } + #[tokio::test(flavor = "multi_thread")] + async fn test_get_prices_history_interval_rejects_hex_condition_id() { + let client = create_test_client("https://test.example.com"); + let result = client + .get_prices_history_interval("0xdeadbeef", PricesHistoryInterval::OneDay, None) + .await; + assert!(matches!(result, Err(PolyfillError::Validation { .. }))); + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_get_prices_history_interval_success() { + let mut server = Server::new_async().await; + let mock_response = r#"{"history":[{"t":1}]}"#; + + let mock = server + .mock("GET", "/prices-history") + .match_query(Matcher::AllOf(vec![ + Matcher::UrlEncoded("market".into(), "12345".into()), + Matcher::UrlEncoded("interval".into(), "1d".into()), + Matcher::UrlEncoded("fidelity".into(), "5".into()), + ])) + .with_status(200) + .with_header("content-type", "application/json") + .with_body(mock_response) + .create_async() + .await; + + let client = create_test_client(&server.url()); + let response = client + .get_prices_history_interval("12345", PricesHistoryInterval::OneDay, Some(5)) + .await + .unwrap(); + + mock.assert_async().await; + assert_eq!(response.history.len(), 1); + } + #[tokio::test(flavor = "multi_thread")] async fn test_get_tick_size_success() { let mut server = Server::new_async().await; diff --git a/src/lib.rs b/src/lib.rs index 070ee7e..f10b43a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -117,6 +117,8 @@ pub use crate::types::{ OrderSummary, OrderType, PriceResponse, + PricesHistoryInterval, + PricesHistoryResponse, Rewards, Side, SimplifiedMarket, diff --git a/src/types.rs b/src/types.rs index 53efb9f..d983138 100644 --- a/src/types.rs +++ b/src/types.rs @@ -1225,6 +1225,44 @@ pub struct PriceResponse { pub price: Decimal, } +// ============================================================================ +// PRICE HISTORY (ANALYTICS) +// ============================================================================ + +/// Time bucket for the `/prices-history` endpoint. +/// +/// Note: this endpoint uses a confusing query parameter name (`market`) but expects an +/// outcome asset id (`token_id` / `asset_id`) in **decimal string** form. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum PricesHistoryInterval { + OneMinute, + OneHour, + SixHours, + OneDay, + OneWeek, +} + +impl PricesHistoryInterval { + pub const fn as_str(self) -> &'static str { + match self { + Self::OneMinute => "1m", + Self::OneHour => "1h", + Self::SixHours => "6h", + Self::OneDay => "1d", + Self::OneWeek => "1w", + } + } +} + +/// Raw response from `/prices-history`. +/// +/// We intentionally keep `history` entries as `serde_json::Value` because the upstream API has +/// no stable public schema here and currently may return empty history for many markets. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct PricesHistoryResponse { + pub history: Vec, +} + #[derive(Debug, Deserialize)] pub struct SpreadResponse { #[serde(with = "rust_decimal::serde::str")] From 0b2740a434f028c3535223cd6394a09ad673305f Mon Sep 17 00:00:00 2001 From: floor-licker Date: Sat, 31 Jan 2026 10:38:44 -0500 Subject: [PATCH 32/32] test(api): add /prices-history real API integration test --- tests/prices_history_integration_tests.rs | 46 +++++++++++++++++++++++ 1 file changed, 46 insertions(+) create mode 100644 tests/prices_history_integration_tests.rs diff --git a/tests/prices_history_integration_tests.rs b/tests/prices_history_integration_tests.rs new file mode 100644 index 0000000..c33bf82 --- /dev/null +++ b/tests/prices_history_integration_tests.rs @@ -0,0 +1,46 @@ +// Real API integration tests for /prices-history. +// +// These tests hit Polymarket's live HTTP API and are ignored by default. +// +// Run with: +// cargo test --all-features --test prices_history_integration_tests -- --ignored --nocapture --test-threads=1 + +use polyfill_rs::{ClobClient, PricesHistoryInterval}; + +const HOST: &str = "https://clob.polymarket.com"; + +#[tokio::test(flavor = "multi_thread")] +#[ignore] +async fn test_real_api_get_prices_history_interval_parses() { + let client = ClobClient::new(HOST); + let markets = client + .get_sampling_markets(None) + .await + .expect("failed to fetch sampling markets"); + + let token_ids: Vec = markets + .data + .iter() + .filter(|m| m.active && !m.closed) + .filter_map(|m| m.tokens.first().map(|t| t.token_id.clone())) + .take(20) + .collect(); + + assert!(!token_ids.is_empty(), "no active token IDs found"); + + // The API sometimes returns empty history for some markets; try a few and + // assert at least one returns a non-empty series (verifies endpoint semantics). + for token_id in token_ids { + let response = client + .get_prices_history_interval(&token_id, PricesHistoryInterval::OneDay, Some(5)) + .await + .expect("failed to fetch prices history"); + + if !response.history.is_empty() { + return; + } + } + + panic!("expected at least one active market to return non-empty price history"); +} +