diff --git a/Cargo.lock b/Cargo.lock index 26a3032..ad8d83e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1524,6 +1524,12 @@ dependencies = [ "parking_lot_core", ] +[[package]] +name = "data-encoding" +version = "2.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7a1e2f27636f116493b8b860f5546edb47c8d8f8ea73e1d2a20be88e28d1fea" + [[package]] name = "der" version = "0.7.10" @@ -1798,7 +1804,7 @@ checksum = "0ce92ff622d6dadf7349484f42c93271a0d49b7cc4d466a936405bacbe10aa78" dependencies = [ "cfg-if", "rustix", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -2221,7 +2227,7 @@ dependencies = [ "tokio", "tokio-rustls", "tower-service", - "webpki-roots", + "webpki-roots 1.0.6", ] [[package]] @@ -2974,15 +2980,18 @@ dependencies = [ "chrono", "clap", "dirs", + "futures-util", "polymarket-client-sdk", "predicates", "rust_decimal", "rust_decimal_macros", + "rustls", "rustyline", "serde", "serde_json", "tabled", "tokio", + "tokio-tungstenite", ] [[package]] @@ -3457,7 +3466,7 @@ dependencies = [ "wasm-bindgen", "wasm-bindgen-futures", "web-sys", - "webpki-roots", + "webpki-roots 1.0.6", ] [[package]] @@ -3674,6 +3683,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c665f33d38cea657d9614f766881e4d510e0eda4239891eea56b4cadcf01801b" dependencies = [ "aws-lc-rs", + "log", "once_cell", "ring", "rustls-pki-types", @@ -4056,6 +4066,17 @@ dependencies = [ "serde", ] +[[package]] +name = "sha1" +version = "0.10.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba" +dependencies = [ + "cfg-if", + "cpufeatures", + "digest 0.10.7", +] + [[package]] name = "sha2" version = "0.10.9" @@ -4093,6 +4114,16 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" +[[package]] +name = "signal-hook-registry" +version = "1.4.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c4db69cba1110affc0e9f7bcd48bbf87b3f4fc7c61fc9155afd4c469eb3d6c1b" +dependencies = [ + "errno", + "libc", +] + [[package]] name = "signature" version = "2.2.0" @@ -4433,6 +4464,7 @@ dependencies = [ "libc", "mio", "pin-project-lite", + "signal-hook-registry", "socket2", "tokio-macros", "windows-sys 0.61.2", @@ -4471,6 +4503,22 @@ dependencies = [ "tokio-util", ] +[[package]] +name = "tokio-tungstenite" +version = "0.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d25a406cddcc431a75d3d9afc6a7c0f7428d4891dd973e4d54c56b46127bf857" +dependencies = [ + "futures-util", + "log", + "rustls", + "rustls-pki-types", + "tokio", + "tokio-rustls", + "tungstenite", + "webpki-roots 0.26.11", +] + [[package]] name = "tokio-util" version = "0.7.18" @@ -4596,6 +4644,25 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" +[[package]] +name = "tungstenite" +version = "0.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8628dcc84e5a09eb3d8423d6cb682965dea9133204e8fb3efee74c2a0c259442" +dependencies = [ + "bytes", + "data-encoding", + "http", + "httparse", + "log", + "rand 0.9.2", + "rustls", + "rustls-pki-types", + "sha1", + "thiserror 2.0.18", + "utf-8", +] + [[package]] name = "typenum" version = "1.19.0" @@ -4669,6 +4736,12 @@ dependencies = [ "serde_derive", ] +[[package]] +name = "utf-8" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" + [[package]] name = "utf8_iter" version = "1.0.4" @@ -4893,6 +4966,15 @@ dependencies = [ "rustls-pki-types", ] +[[package]] +name = "webpki-roots" +version = "0.26.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "521bc38abb08001b01866da9f51eb7c5d647a19260e00054a8c7fd5f9e57f7a9" +dependencies = [ + "webpki-roots 1.0.6", +] + [[package]] name = "webpki-roots" version = "1.0.6" diff --git a/Cargo.toml b/Cargo.toml index a01bf05..a172aa0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,7 +17,7 @@ path = "src/main.rs" polymarket-client-sdk = { version = "0.4", features = ["gamma", "data", "bridge", "clob", "ctf"] } alloy = { version = "1.6.3", default-features = false, features = ["providers", "sol-types", "contract", "reqwest", "reqwest-rustls-tls", "signer-local", "signers"] } clap = { version = "4", features = ["derive"] } -tokio = { version = "1", features = ["rt-multi-thread", "macros"] } +tokio = { version = "1", features = ["rt-multi-thread", "macros", "signal"] } serde_json = "1" serde = { version = "1", features = ["derive"] } tabled = "0.17" @@ -26,6 +26,9 @@ anyhow = "1" chrono = "0.4" dirs = "6" rustyline = "15" +tokio-tungstenite = { version = "0.28", features = ["rustls-tls-webpki-roots"] } +rustls = { version = "0.23", features = ["ring"] } +futures-util = "0.3" [dev-dependencies] assert_cmd = "2" diff --git a/examples/ws_test.rs b/examples/ws_test.rs new file mode 100644 index 0000000..8dab4c7 --- /dev/null +++ b/examples/ws_test.rs @@ -0,0 +1,91 @@ +//! Minimal standalone WebSocket test — connects to Polymarket and dumps raw frames. +//! Run with: cargo run --example ws_test + +use std::time::Duration; + +use futures_util::{SinkExt, StreamExt}; +use rustls::crypto::ring::default_provider; +use tokio::time::sleep; +use tokio_tungstenite::connect_async; +use tokio_tungstenite::tungstenite::Message; + +const WS_URL: &str = "wss://ws-subscriptions-clob.polymarket.com/ws/market"; + +#[tokio::main] +async fn main() { + let _ = default_provider().install_default(); + + println!("Connecting to {WS_URL}..."); + let (ws, resp) = connect_async(WS_URL).await.expect("Failed to connect"); + println!("Connected! Status: {}", resp.status()); + + let (mut sink, mut stream) = ws.split(); + + // Subscribe to known active market tokens + let sub = serde_json::json!({ + "type": "market", + "operation": "subscribe", + "assets_ids": ["38397507750621893057346880033441136112987238933685677349709401910643842844855", "95949957895141858444199258452803633110472396604599808168788254125381075552218"], + "markets": [], + "initial_dump": true + }); + let payload = serde_json::to_string(&sub).unwrap(); + println!("Sending: {payload}"); + sink.send(Message::Text(payload.into())).await.unwrap(); + + println!("Waiting for frames (up to 30 seconds)..."); + let mut count = 0; + let timeout = sleep(Duration::from_secs(30)); + tokio::pin!(timeout); // macro — must remain fully qualified + + loop { + tokio::select! { + msg = stream.next() => { + match msg { + Some(Ok(Message::Text(t))) => { + let preview: String = if t.chars().count() > 200 { + format!("{}...", t.chars().take(200).collect::()) + } else { + t.to_string() + }; + println!("[TEXT #{count}] {preview}"); + count += 1; + } + Some(Ok(Message::Binary(b))) => { + let s = String::from_utf8_lossy(&b); + let preview: String = if s.chars().count() > 200 { + format!("{}...", s.chars().take(200).collect::()) + } else { + s.to_string() + }; + println!("[BIN #{count}] {preview}"); + count += 1; + } + Some(Ok(Message::Ping(p))) => println!("[PING] {} bytes", p.len()), + Some(Ok(Message::Pong(p))) => println!("[PONG] {} bytes", p.len()), + Some(Ok(Message::Close(c))) => { + println!("[CLOSE] {c:?}"); + break; + } + Some(Ok(Message::Frame(f))) => println!("[FRAME] {f:?}"), + Some(Err(e)) => { + eprintln!("[ERROR] {e}"); + break; + } + None => { + println!("Stream ended."); + break; + } + } + if count >= 10 { + println!("Got 10 frames, exiting."); + break; + } + } + _ = &mut timeout => { + println!("Timeout after 30 seconds. Received {count} frames total."); + break; + } + } + } +} diff --git a/src/commands/mod.rs b/src/commands/mod.rs index 671c0ee..939c3eb 100644 --- a/src/commands/mod.rs +++ b/src/commands/mod.rs @@ -12,6 +12,7 @@ pub mod profiles; pub mod series; pub mod setup; pub mod sports; +pub mod stream; pub mod tags; pub mod upgrade; pub mod wallet; diff --git a/src/commands/stream.rs b/src/commands/stream.rs new file mode 100644 index 0000000..ad478d9 --- /dev/null +++ b/src/commands/stream.rs @@ -0,0 +1,131 @@ +//! `polymarket stream` — real-time WebSocket data streamed to the terminal. +//! +//! Uses a native WebSocket connection (via `tokio-tungstenite`) to the +//! Polymarket CLOB WebSocket API. No third-party SDK integration. + +use anyhow::{Context as _, Result}; +use clap::{Args, Subcommand}; +use futures_util::StreamExt as _; + +use crate::output::OutputFormat; +use crate::output::stream as stream_output; +use crate::ws; + +#[derive(Args)] +pub struct StreamArgs { + #[command(subcommand)] + pub command: StreamCommand, + + /// Maximum number of events to receive before exiting (default: unlimited) + #[arg(long, global = true)] + pub max_events: Option, +} + +#[derive(Subcommand)] +pub enum StreamCommand { + /// Stream real-time orderbook snapshots (bids/asks) + Orderbook { + /// Token/asset IDs (comma-separated) + token_ids: String, + }, + /// Stream real-time price changes + Prices { + /// Token/asset IDs (comma-separated) + token_ids: String, + }, + /// Stream last trade price updates + LastTrade { + /// Token/asset IDs (comma-separated) + token_ids: String, + }, + /// Stream calculated midpoint prices + Midpoints { + /// Token/asset IDs (comma-separated) + token_ids: String, + }, +} + +fn split_ids(csv: &str) -> Vec { + csv.split(',') + .map(|s| s.trim().to_string()) + .filter(|s| !s.is_empty()) + .collect() +} + +pub async fn execute(args: StreamArgs, output: OutputFormat) -> Result<()> { + let max = args.max_events; + + let (asset_ids, filter) = match &args.command { + StreamCommand::Orderbook { token_ids } => (split_ids(token_ids), "book"), + StreamCommand::Prices { token_ids } => (split_ids(token_ids), "price_change"), + StreamCommand::LastTrade { token_ids } => (split_ids(token_ids), "last_trade_price"), + StreamCommand::Midpoints { token_ids } => (split_ids(token_ids), "midpoint"), + }; + + anyhow::ensure!(!asset_ids.is_empty(), "At least one token ID is required"); + + let mut stream = Box::pin(ws::subscribe_market(&asset_ids).await?); + + let mut count: u64 = 0; + loop { + tokio::select! { + biased; + + _ = tokio::signal::ctrl_c() => { + break; + } + + frame = stream.next() => match frame { + Some(result) => { + let event = result.context("WebSocket stream error")?; + + // Filter to the requested event type + if event.event_type != filter { + continue; + } + + // Check event limit before printing + if max.is_some_and(|m| count >= m) { + break; + } + + stream_output::print_event(&event, &output)?; + count += 1; + } + None => break, // stream ended + } + } + } + + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn split_ids_basic() { + assert_eq!(split_ids("a,b,c"), vec!["a", "b", "c"]); + } + + #[test] + fn split_ids_trims_whitespace() { + assert_eq!(split_ids(" a , b , c "), vec!["a", "b", "c"]); + } + + #[test] + fn split_ids_filters_empty() { + assert_eq!(split_ids("a,,b, ,c"), vec!["a", "b", "c"]); + } + + #[test] + fn split_ids_single() { + assert_eq!(split_ids("abc"), vec!["abc"]); + } + + #[test] + fn split_ids_empty_string() { + assert!(split_ids("").is_empty()); + } +} diff --git a/src/main.rs b/src/main.rs index 61af087..7049ffd 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,6 +3,7 @@ mod commands; mod config; mod output; mod shell; +mod ws; use std::process::ExitCode; @@ -60,6 +61,8 @@ enum Commands { Bridge(commands::bridge::BridgeArgs), /// Manage wallet and authentication Wallet(commands::wallet::WalletArgs), + /// Stream real-time WebSocket data (orderbook, prices, trades) + Stream(commands::stream::StreamArgs), /// Check API health status Status, /// Update to the latest version @@ -184,6 +187,7 @@ pub(crate) async fn run(cli: Cli) -> anyhow::Result<()> { Commands::Wallet(args) => { commands::wallet::execute(args, &cli.output, cli.private_key.as_deref()) } + Commands::Stream(args) => commands::stream::execute(args, cli.output).await, Commands::Upgrade => commands::upgrade::execute(), Commands::Status => { let status = polymarket_client_sdk::gamma::Client::default() diff --git a/src/output/mod.rs b/src/output/mod.rs index cc76acd..6034a4c 100644 --- a/src/output/mod.rs +++ b/src/output/mod.rs @@ -9,6 +9,7 @@ pub mod markets; pub mod profiles; pub mod series; pub mod sports; +pub mod stream; pub mod tags; use polymarket_client_sdk::types::Decimal; diff --git a/src/output/stream.rs b/src/output/stream.rs new file mode 100644 index 0000000..8174615 --- /dev/null +++ b/src/output/stream.rs @@ -0,0 +1,113 @@ +//! Output formatters for WebSocket stream events. +//! +//! Works with the native [`crate::ws::WsEvent`] type — no SDK dependency. + +use anyhow::Result; +use serde_json::Value; + +use super::OutputFormat; +use crate::ws::WsEvent; + +pub fn print_event(event: &WsEvent, output: &OutputFormat) -> Result<()> { + match output { + OutputFormat::Json => { + // The `event_type` field is consumed by serde into the struct + // and excluded from the flattened `payload` Value. Re-inject it + // so every NDJSON line is self-describing for downstream consumers. + let mut obj = event.payload.clone(); + if let Some(map) = obj.as_object_mut() { + map.insert( + "event_type".to_string(), + serde_json::Value::String(event.event_type.clone()), + ); + } + println!("{}", serde_json::to_string(&obj)?); + } + OutputFormat::Table => { + print_table_line(event); + } + } + Ok(()) +} + +fn print_table_line(event: &WsEvent) { + let p = &event.payload; + let asset = truncate_id(&str_field(p, "asset_id")); + + match event.event_type.as_str() { + "book" => { + let bids = p.get("bids").and_then(Value::as_array).map_or(0, Vec::len); + let asks = p.get("asks").and_then(Value::as_array).map_or(0, Vec::len); + let best_bid = p + .get("bids") + .and_then(Value::as_array) + .and_then(|a| a.first()) + .and_then(|l| l.get("price")) + .map_or_else(|| "-".into(), |v| v.as_str().unwrap_or("-").to_string()); + let best_ask = p + .get("asks") + .and_then(Value::as_array) + .and_then(|a| a.first()) + .and_then(|l| l.get("price")) + .map_or_else(|| "-".into(), |v| v.as_str().unwrap_or("-").to_string()); + println!("BOOK | Asset: {asset} | Bid: {best_bid:<6} | Ask: {best_ask:<6} | Levels: {bids}/{asks}"); + } + "price_change" => { + if let Some(changes) = p.get("price_changes").and_then(Value::as_array) { + for pc in changes { + let id = truncate_id(&str_field(pc, "asset_id")); + let price = str_field(pc, "price"); + let side = str_field(pc, "side").to_uppercase(); + println!("PRICE | Asset: {id} | Price: {price:<6} | Side: {side:<4}"); + } + } + } + "last_trade_price" => { + let price = str_field(p, "price"); + println!("TRADE | Asset: {asset} | Price: {price:<6}"); + } + "midpoint" => { + let mid = str_field(p, "midpoint"); + println!("MID | Asset: {asset} | Price: {mid:<6}"); + } + other => { + let prefix = other.to_uppercase(); + let safe_prefix: String = prefix.chars().take(5).collect(); + println!("{safe_prefix:<5} | {}", serde_json::to_string(p).unwrap_or_default()); + } + } +} + +fn str_field(v: &Value, key: &str) -> String { + v.get(key) + .map_or_else(|| "-".into(), |val| val.as_str().unwrap_or(&val.to_string()).to_string()) +} + +fn truncate_id(s: &str) -> String { + if s.chars().count() <= 12 { + s.to_string() + } else { + let prefix: String = s.chars().take(6).collect(); + let suffix: String = s.chars().rev().take(4).collect::>().into_iter().rev().collect(); + format!("{prefix}…{suffix}") + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn truncate_short_id_unchanged() { + assert_eq!(truncate_id("12345"), "12345"); + } + + #[test] + fn truncate_long_id_abbreviated() { + let long_id = "106585164761922456203746651621390029417453862034640469075081961934906147433548"; + let result = truncate_id(long_id); + assert!(result.contains('…'), "Expected ellipsis in: {result}"); + assert!(result.starts_with("106585")); + assert!(result.ends_with("3548")); + } +} diff --git a/src/ws.rs b/src/ws.rs new file mode 100644 index 0000000..93dee94 --- /dev/null +++ b/src/ws.rs @@ -0,0 +1,237 @@ +//! Native WebSocket client for the Polymarket CLOB real-time API. +//! +//! Connects directly to `wss://ws-subscriptions-clob.polymarket.com/ws/market` +//! using `tokio-tungstenite`. No SDK dependency — just raw WebSocket frames. + +use std::str; + +use anyhow::{anyhow, Context, Result}; +use futures_util::{Stream, SinkExt as _, StreamExt as _}; +use rustls::crypto::ring::default_provider; +use serde::{Deserialize, Serialize}; +use tokio_tungstenite::connect_async; +use tokio_tungstenite::tungstenite::Message; + +const WS_MARKET_URL: &str = "wss://ws-subscriptions-clob.polymarket.com/ws/market"; + +// --------------------------------------------------------------------------- +// Subscription protocol +// --------------------------------------------------------------------------- + +#[derive(Serialize)] +struct SubscribeRequest<'a> { + r#type: &'static str, + #[serde(skip_serializing_if = "Option::is_none")] + operation: Option<&'static str>, + #[serde(rename = "assets_ids")] + asset_ids: &'a [String], + markets: &'a [String], + #[serde(skip_serializing_if = "Option::is_none")] + initial_dump: Option, +} + +impl<'a> SubscribeRequest<'a> { + fn market(asset_ids: &'a [String]) -> Self { + Self { + r#type: "market", + operation: Some("subscribe"), + asset_ids, + markets: &[], + initial_dump: Some(true), + } + } +} + +// --------------------------------------------------------------------------- +// Inbound message types (only what the CLI needs) +// --------------------------------------------------------------------------- + +/// Raw JSON event from the WebSocket — we keep it loosely typed so the CLI +/// never breaks when the upstream adds new fields. +#[derive(Debug, Clone, Deserialize)] +pub struct WsEvent { + pub event_type: String, + #[serde(flatten)] + pub payload: serde_json::Value, +} + +// --------------------------------------------------------------------------- +// Connection +// --------------------------------------------------------------------------- + +/// Connect to the market channel, subscribe to the given asset IDs, and yield +/// each raw [`WsEvent`] to the caller. +/// +/// The returned stream is live: it stays open until the server closes it, the +/// caller drops the stream, or a fatal error occurs. +pub async fn subscribe_market( + asset_ids: &[String], +) -> Result>> { + // Install the default crypto provider for rustls (ignores errors if already installed) + let _ = default_provider().install_default(); + + let (ws_stream, _response) = connect_async(WS_MARKET_URL) + .await + .context("Failed to connect to Polymarket WebSocket")?; + + let (mut sink, stream) = ws_stream.split(); + + // Send subscription + let request = SubscribeRequest::market(asset_ids); + let payload = serde_json::to_string(&request)?; + sink.send(Message::Text(payload.into())).await?; + + // Map incoming frames → WsEvent(s), flattening arrays into individual items + Ok(stream.flat_map(move |frame_result| { + let events: Vec> = match frame_result { + Err(e) => vec![Err(anyhow!("WebSocket error: {e}"))], + Ok(frame) => match &frame { + Message::Text(t) => parse_events(t.as_ref()), + Message::Binary(b) => match str::from_utf8(b) { + Ok(s) => parse_events(s), + Err(_) => vec![], + }, + Message::Close(_) => vec![Err(anyhow!("WebSocket closed by server"))], + _ => vec![], + }, + }; + futures_util::stream::iter(events) + })) +} + +/// Parse a JSON text frame into zero or more [`WsEvent`] results. +/// +/// The server may send single objects or arrays — we handle both and +/// return **all** events so that array frames are never silently truncated. +/// +/// Non-event messages (subscription confirmations, heartbeats, error +/// responses) that lack `event_type` are silently skipped instead of +/// returning errors, which would otherwise crash the stream via `?`. +fn parse_events(text: &str) -> Vec> { + let trimmed = text.trim(); + if trimmed.is_empty() { + return vec![]; + } + + // Single object — skip gracefully if it lacks `event_type` + if trimmed.starts_with('{') { + // Quick pre-check: only attempt WsEvent parsing when the key exists. + let obj: serde_json::Value = match serde_json::from_str(trimmed) { + Ok(v) => v, + Err(e) => return vec![Err(anyhow!("Failed to parse WS JSON: {e}"))], + }; + if !obj.get("event_type").is_some_and(|v| v.is_string()) { + // Not an event (e.g. subscription ack, heartbeat) — skip. + return vec![]; + } + return match serde_json::from_value::(obj) { + Ok(event) => vec![Ok(event)], + Err(e) => vec![Err(anyhow!("Failed to parse WS event: {e}"))], + }; + } + + // Array — yield only elements that carry `event_type` + if trimmed.starts_with('[') { + let arr: Vec = match serde_json::from_str(trimmed) { + Ok(v) => v, + Err(e) => return vec![Err(anyhow!("Failed to parse WS array: {e}"))], + }; + return arr + .into_iter() + .filter(|v| v.get("event_type").is_some_and(|et| et.is_string())) + .map(|v| { + serde_json::from_value::(v) + .map_err(|e| anyhow!("Failed to parse WS event in array: {e}")) + }) + .collect(); + } + + vec![] +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn parse_single_event() { + let json = r#"{"event_type":"book","asset_id":"123","market":"0x01","timestamp":"1234567890","bids":[],"asks":[]}"#; + let results = parse_events(json); + assert_eq!(results.len(), 1); + let event = results.into_iter().next().unwrap().unwrap(); + assert_eq!(event.event_type, "book"); + } + + #[test] + fn parse_array_single_event() { + let json = r#"[{"event_type":"price_change","market":"0x01","timestamp":"123","price_changes":[]}]"#; + let results = parse_events(json); + assert_eq!(results.len(), 1); + let event = results.into_iter().next().unwrap().unwrap(); + assert_eq!(event.event_type, "price_change"); + } + + #[test] + fn parse_array_yields_all_events() { + let json = r#"[ + {"event_type":"price_change","market":"0x01","timestamp":"1","price_changes":[]}, + {"event_type":"book","market":"0x02","timestamp":"2","bids":[],"asks":[]}, + {"event_type":"midpoint","market":"0x03","timestamp":"3","midpoint":"0.5"} + ]"#; + let results = parse_events(json); + assert_eq!(results.len(), 3, "All events in the array must be yielded"); + let types: Vec = results + .into_iter() + .map(|r| r.unwrap().event_type) + .collect(); + assert_eq!(types, vec!["price_change", "book", "midpoint"]); + } + + #[test] + fn parse_empty_returns_empty_vec() { + assert!(parse_events("").is_empty()); + assert!(parse_events(" ").is_empty()); + } + + #[test] + fn subscribe_request_serialises_correctly() { + let ids = vec!["123".to_string(), "456".to_string()]; + let req = SubscribeRequest::market(&ids); + let json = serde_json::to_string(&req).unwrap(); + assert!(json.contains(r#""type":"market"#)); + assert!(json.contains(r#""operation":"subscribe"#)); + assert!(json.contains(r#""assets_ids":["123","456"]"#)); + assert!(json.contains(r#""initial_dump":true"#)); + } + + #[test] + fn parse_non_event_object_is_skipped() { + // Subscription confirmations, heartbeats, etc. lack `event_type` + let json = r#"{"type":"subscription","channel":"market","status":"ok"}"#; + assert!(parse_events(json).is_empty()); + } + + #[test] + fn parse_array_skips_non_event_elements() { + // Array with a mix of events and non-events + let json = r#"[ + {"type":"heartbeat","timestamp":"123"}, + {"event_type":"book","market":"0x01","timestamp":"2","bids":[],"asks":[]}, + {"status":"ok"} + ]"#; + let results = parse_events(json); + assert_eq!(results.len(), 1, "Only the element with event_type should be yielded"); + assert_eq!(results[0].as_ref().unwrap().event_type, "book"); + } + + #[test] + fn parse_non_json_text_returns_empty() { + assert!(parse_events("hello world").is_empty()); + } + + #[test] + fn parse_empty_array_returns_empty() { + let results = parse_events("[]"); + assert!(results.is_empty()); + } +} diff --git a/tests/cli_integration.rs b/tests/cli_integration.rs index 41d3d11..9cde55e 100644 --- a/tests/cli_integration.rs +++ b/tests/cli_integration.rs @@ -28,6 +28,7 @@ fn help_lists_all_top_level_commands() { .and(predicate::str::contains("data")) .and(predicate::str::contains("bridge")) .and(predicate::str::contains("wallet")) + .and(predicate::str::contains("stream")) .and(predicate::str::contains("status")), ); } @@ -486,3 +487,58 @@ fn wallet_address_succeeds_or_fails_gracefully() { // Either succeeds or fails with an error message — not a panic assert!(output.status.success() || !output.stderr.is_empty()); } + +#[test] +fn stream_help_lists_subcommands() { + polymarket() + .args(["stream", "--help"]) + .assert() + .success() + .stdout( + predicate::str::contains("orderbook") + .and(predicate::str::contains("prices")) + .and(predicate::str::contains("last-trade")) + .and(predicate::str::contains("midpoints")), + ); +} + +#[test] +fn stream_orderbook_requires_token_ids() { + polymarket() + .args(["stream", "orderbook"]) + .assert() + .failure(); +} + +#[test] +fn stream_prices_requires_token_ids() { + polymarket() + .args(["stream", "prices"]) + .assert() + .failure(); +} + +#[test] +fn stream_last_trade_requires_token_ids() { + polymarket() + .args(["stream", "last-trade"]) + .assert() + .failure(); +} + +#[test] +fn stream_midpoints_requires_token_ids() { + polymarket() + .args(["stream", "midpoints"]) + .assert() + .failure(); +} + +#[test] +fn stream_max_events_flag_appears_in_help() { + polymarket() + .args(["stream", "--help"]) + .assert() + .success() + .stdout(predicate::str::contains("--max-events")); +}