From efe1dc23f7ebda22f3a50e1b07612f8671f3cb05 Mon Sep 17 00:00:00 2001 From: konor Date: Sun, 8 Mar 2026 20:44:50 +0200 Subject: [PATCH 1/6] feat(cli): native real-time websocket integration with organized output --- Cargo.lock | 139 +++++++++++++++++++++++++++- Cargo.toml | 4 + examples/ws_test.rs | 91 ++++++++++++++++++ src/commands/mod.rs | 1 + src/commands/stream.rs | 136 +++++++++++++++++++++++++++ src/main.rs | 4 + src/output/mod.rs | 1 + src/output/stream.rs | 104 +++++++++++++++++++++ src/ws.rs | 194 +++++++++++++++++++++++++++++++++++++++ tests/cli_integration.rs | 56 +++++++++++ 10 files changed, 726 insertions(+), 4 deletions(-) create mode 100644 examples/ws_test.rs create mode 100644 src/commands/stream.rs create mode 100644 src/output/stream.rs create mode 100644 src/ws.rs diff --git a/Cargo.lock b/Cargo.lock index 26a3032..4805b18 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1050,6 +1050,15 @@ dependencies = [ "generic-array", ] +[[package]] +name = "block2" +version = "0.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cdeb9d870516001442e364c5220d3574d2da8dc765554b4a617230d33fa58ef5" +dependencies = [ + "objc2", +] + [[package]] name = "blst" version = "0.3.16" @@ -1440,6 +1449,17 @@ dependencies = [ "typenum", ] +[[package]] +name = "ctrlc" +version = "3.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e0b1fab2ae45819af2d0731d60f2afe17227ebb1a1538a236da84c93e9a60162" +dependencies = [ + "dispatch2", + "nix 0.31.2", + "windows-sys 0.61.2", +] + [[package]] name = "darling" version = "0.21.3" @@ -1524,6 +1544,12 @@ dependencies = [ "parking_lot_core", ] +[[package]] +name = "data-encoding" +version = "2.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7a1e2f27636f116493b8b860f5546edb47c8d8f8ea73e1d2a20be88e28d1fea" + [[package]] name = "der" version = "0.7.10" @@ -1626,6 +1652,18 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "dispatch2" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e0e367e4e7da84520dedcac1901e4da967309406d1e51017ae1abfb97adbd38" +dependencies = [ + "bitflags", + "block2", + "libc", + "objc2", +] + [[package]] name = "displaydoc" version = "0.2.5" @@ -1798,7 +1836,7 @@ checksum = "0ce92ff622d6dadf7349484f42c93271a0d49b7cc4d466a936405bacbe10aa78" dependencies = [ "cfg-if", "rustix", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -2221,7 +2259,7 @@ dependencies = [ "tokio", "tokio-rustls", "tower-service", - "webpki-roots", + "webpki-roots 1.0.6", ] [[package]] @@ -2685,6 +2723,18 @@ dependencies = [ "libc", ] +[[package]] +name = "nix" +version = "0.31.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5d6d0705320c1e6ba1d912b5e37cf18071b6c2e9b7fa8215a1e8a7651966f5d3" +dependencies = [ + "bitflags", + "cfg-if", + "cfg_aliases", + "libc", +] + [[package]] name = "normalize-line-endings" version = "0.3.0" @@ -2771,6 +2821,21 @@ dependencies = [ "smallvec", ] +[[package]] +name = "objc2" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3a12a8ed07aefc768292f076dc3ac8c48f3781c8f2d5851dd3d98950e8c5a89f" +dependencies = [ + "objc2-encode", +] + +[[package]] +name = "objc2-encode" +version = "4.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ef25abbcd74fb2609453eb695bd2f860d389e457f67dc17cafc8b8cbc89d0c33" + [[package]] name = "once_cell" version = "1.21.3" @@ -2973,16 +3038,20 @@ dependencies = [ "assert_cmd", "chrono", "clap", + "ctrlc", "dirs", + "futures-util", "polymarket-client-sdk", "predicates", "rust_decimal", "rust_decimal_macros", + "rustls", "rustyline", "serde", "serde_json", "tabled", "tokio", + "tokio-tungstenite", ] [[package]] @@ -3457,7 +3526,7 @@ dependencies = [ "wasm-bindgen", "wasm-bindgen-futures", "web-sys", - "webpki-roots", + "webpki-roots 1.0.6", ] [[package]] @@ -3674,6 +3743,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c665f33d38cea657d9614f766881e4d510e0eda4239891eea56b4cadcf01801b" dependencies = [ "aws-lc-rs", + "log", "once_cell", "ring", "rustls-pki-types", @@ -3775,7 +3845,7 @@ dependencies = [ "libc", "log", "memchr", - "nix", + "nix 0.29.0", "radix_trie", "unicode-segmentation", "unicode-width", @@ -4056,6 +4126,17 @@ dependencies = [ "serde", ] +[[package]] +name = "sha1" +version = "0.10.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba" +dependencies = [ + "cfg-if", + "cpufeatures", + "digest 0.10.7", +] + [[package]] name = "sha2" version = "0.10.9" @@ -4471,6 +4552,22 @@ dependencies = [ "tokio-util", ] +[[package]] +name = "tokio-tungstenite" +version = "0.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d25a406cddcc431a75d3d9afc6a7c0f7428d4891dd973e4d54c56b46127bf857" +dependencies = [ + "futures-util", + "log", + "rustls", + "rustls-pki-types", + "tokio", + "tokio-rustls", + "tungstenite", + "webpki-roots 0.26.11", +] + [[package]] name = "tokio-util" version = "0.7.18" @@ -4596,6 +4693,25 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" +[[package]] +name = "tungstenite" +version = "0.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8628dcc84e5a09eb3d8423d6cb682965dea9133204e8fb3efee74c2a0c259442" +dependencies = [ + "bytes", + "data-encoding", + "http", + "httparse", + "log", + "rand 0.9.2", + "rustls", + "rustls-pki-types", + "sha1", + "thiserror 2.0.18", + "utf-8", +] + [[package]] name = "typenum" version = "1.19.0" @@ -4669,6 +4785,12 @@ dependencies = [ "serde_derive", ] +[[package]] +name = "utf-8" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" + [[package]] name = "utf8_iter" version = "1.0.4" @@ -4893,6 +5015,15 @@ dependencies = [ "rustls-pki-types", ] +[[package]] +name = "webpki-roots" +version = "0.26.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "521bc38abb08001b01866da9f51eb7c5d647a19260e00054a8c7fd5f9e57f7a9" +dependencies = [ + "webpki-roots 1.0.6", +] + [[package]] name = "webpki-roots" version = "1.0.6" diff --git a/Cargo.toml b/Cargo.toml index a01bf05..efc2d36 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,6 +26,10 @@ anyhow = "1" chrono = "0.4" dirs = "6" rustyline = "15" +tokio-tungstenite = { version = "0.28", features = ["rustls-tls-webpki-roots"] } +rustls = { version = "0.23", features = ["ring"] } +futures-util = "0.3" +ctrlc = "3" [dev-dependencies] assert_cmd = "2" diff --git a/examples/ws_test.rs b/examples/ws_test.rs new file mode 100644 index 0000000..8dab4c7 --- /dev/null +++ b/examples/ws_test.rs @@ -0,0 +1,91 @@ +//! Minimal standalone WebSocket test — connects to Polymarket and dumps raw frames. +//! Run with: cargo run --example ws_test + +use std::time::Duration; + +use futures_util::{SinkExt, StreamExt}; +use rustls::crypto::ring::default_provider; +use tokio::time::sleep; +use tokio_tungstenite::connect_async; +use tokio_tungstenite::tungstenite::Message; + +const WS_URL: &str = "wss://ws-subscriptions-clob.polymarket.com/ws/market"; + +#[tokio::main] +async fn main() { + let _ = default_provider().install_default(); + + println!("Connecting to {WS_URL}..."); + let (ws, resp) = connect_async(WS_URL).await.expect("Failed to connect"); + println!("Connected! Status: {}", resp.status()); + + let (mut sink, mut stream) = ws.split(); + + // Subscribe to known active market tokens + let sub = serde_json::json!({ + "type": "market", + "operation": "subscribe", + "assets_ids": ["38397507750621893057346880033441136112987238933685677349709401910643842844855", "95949957895141858444199258452803633110472396604599808168788254125381075552218"], + "markets": [], + "initial_dump": true + }); + let payload = serde_json::to_string(&sub).unwrap(); + println!("Sending: {payload}"); + sink.send(Message::Text(payload.into())).await.unwrap(); + + println!("Waiting for frames (up to 30 seconds)..."); + let mut count = 0; + let timeout = sleep(Duration::from_secs(30)); + tokio::pin!(timeout); // macro — must remain fully qualified + + loop { + tokio::select! { + msg = stream.next() => { + match msg { + Some(Ok(Message::Text(t))) => { + let preview: String = if t.chars().count() > 200 { + format!("{}...", t.chars().take(200).collect::()) + } else { + t.to_string() + }; + println!("[TEXT #{count}] {preview}"); + count += 1; + } + Some(Ok(Message::Binary(b))) => { + let s = String::from_utf8_lossy(&b); + let preview: String = if s.chars().count() > 200 { + format!("{}...", s.chars().take(200).collect::()) + } else { + s.to_string() + }; + println!("[BIN #{count}] {preview}"); + count += 1; + } + Some(Ok(Message::Ping(p))) => println!("[PING] {} bytes", p.len()), + Some(Ok(Message::Pong(p))) => println!("[PONG] {} bytes", p.len()), + Some(Ok(Message::Close(c))) => { + println!("[CLOSE] {c:?}"); + break; + } + Some(Ok(Message::Frame(f))) => println!("[FRAME] {f:?}"), + Some(Err(e)) => { + eprintln!("[ERROR] {e}"); + break; + } + None => { + println!("Stream ended."); + break; + } + } + if count >= 10 { + println!("Got 10 frames, exiting."); + break; + } + } + _ = &mut timeout => { + println!("Timeout after 30 seconds. Received {count} frames total."); + break; + } + } + } +} diff --git a/src/commands/mod.rs b/src/commands/mod.rs index 671c0ee..939c3eb 100644 --- a/src/commands/mod.rs +++ b/src/commands/mod.rs @@ -12,6 +12,7 @@ pub mod profiles; pub mod series; pub mod setup; pub mod sports; +pub mod stream; pub mod tags; pub mod upgrade; pub mod wallet; diff --git a/src/commands/stream.rs b/src/commands/stream.rs new file mode 100644 index 0000000..a7fdf35 --- /dev/null +++ b/src/commands/stream.rs @@ -0,0 +1,136 @@ +//! `polymarket stream` — real-time WebSocket data streamed to the terminal. +//! +//! Uses a native WebSocket connection (via `tokio-tungstenite`) to the +//! Polymarket CLOB WebSocket API. No third-party SDK integration. + +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; + +use anyhow::{Context as _, Result}; +use clap::{Args, Subcommand}; +use futures_util::StreamExt as _; + +use crate::output::OutputFormat; +use crate::output::stream as stream_output; +use crate::ws; + +#[derive(Args)] +pub struct StreamArgs { + #[command(subcommand)] + pub command: StreamCommand, + + /// Maximum number of events to receive before exiting (default: unlimited) + #[arg(long, global = true)] + pub max_events: Option, +} + +#[derive(Subcommand)] +pub enum StreamCommand { + /// Stream real-time orderbook snapshots (bids/asks) + Orderbook { + /// Token/asset IDs (comma-separated) + token_ids: String, + }, + /// Stream real-time price changes + Prices { + /// Token/asset IDs (comma-separated) + token_ids: String, + }, + /// Stream last trade price updates + LastTrade { + /// Token/asset IDs (comma-separated) + token_ids: String, + }, + /// Stream calculated midpoint prices + Midpoints { + /// Token/asset IDs (comma-separated) + token_ids: String, + }, +} + +/// Create a shutdown signal tied to Ctrl+C. +fn shutdown_flag() -> Arc { + let flag = Arc::new(AtomicBool::new(false)); + let f = Arc::clone(&flag); + ctrlc::set_handler(move || { + f.store(true, Ordering::SeqCst); + }) + .expect("failed to register Ctrl+C handler"); + flag +} + +fn split_ids(csv: &str) -> Vec { + csv.split(',') + .map(|s| s.trim().to_string()) + .filter(|s| !s.is_empty()) + .collect() +} + +pub async fn execute(args: StreamArgs, output: OutputFormat) -> Result<()> { + let shutdown = shutdown_flag(); + let max = args.max_events; + + let (asset_ids, filter) = match &args.command { + StreamCommand::Orderbook { token_ids } => (split_ids(token_ids), "book"), + StreamCommand::Prices { token_ids } => (split_ids(token_ids), "price_change"), + StreamCommand::LastTrade { token_ids } => (split_ids(token_ids), "last_trade_price"), + StreamCommand::Midpoints { token_ids } => (split_ids(token_ids), "midpoint"), + }; + + anyhow::ensure!(!asset_ids.is_empty(), "At least one token ID is required"); + + let mut stream = Box::pin(ws::subscribe_market(&asset_ids).await?); + + let mut count: u64 = 0; + while let Some(result) = stream.next().await { + if shutdown.load(Ordering::SeqCst) { + break; + } + + let event = result.context("WebSocket stream error")?; + + // Filter to the requested event type + if event.event_type != filter { + continue; + } + + stream_output::print_event(&event, &output)?; + + count += 1; + if max.is_some_and(|m| count >= m) { + break; + } + } + + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn split_ids_basic() { + assert_eq!(split_ids("a,b,c"), vec!["a", "b", "c"]); + } + + #[test] + fn split_ids_trims_whitespace() { + assert_eq!(split_ids(" a , b , c "), vec!["a", "b", "c"]); + } + + #[test] + fn split_ids_filters_empty() { + assert_eq!(split_ids("a,,b, ,c"), vec!["a", "b", "c"]); + } + + #[test] + fn split_ids_single() { + assert_eq!(split_ids("abc"), vec!["abc"]); + } + + #[test] + fn split_ids_empty_string() { + assert!(split_ids("").is_empty()); + } +} diff --git a/src/main.rs b/src/main.rs index 61af087..7049ffd 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,6 +3,7 @@ mod commands; mod config; mod output; mod shell; +mod ws; use std::process::ExitCode; @@ -60,6 +61,8 @@ enum Commands { Bridge(commands::bridge::BridgeArgs), /// Manage wallet and authentication Wallet(commands::wallet::WalletArgs), + /// Stream real-time WebSocket data (orderbook, prices, trades) + Stream(commands::stream::StreamArgs), /// Check API health status Status, /// Update to the latest version @@ -184,6 +187,7 @@ pub(crate) async fn run(cli: Cli) -> anyhow::Result<()> { Commands::Wallet(args) => { commands::wallet::execute(args, &cli.output, cli.private_key.as_deref()) } + Commands::Stream(args) => commands::stream::execute(args, cli.output).await, Commands::Upgrade => commands::upgrade::execute(), Commands::Status => { let status = polymarket_client_sdk::gamma::Client::default() diff --git a/src/output/mod.rs b/src/output/mod.rs index cc76acd..6034a4c 100644 --- a/src/output/mod.rs +++ b/src/output/mod.rs @@ -9,6 +9,7 @@ pub mod markets; pub mod profiles; pub mod series; pub mod sports; +pub mod stream; pub mod tags; use polymarket_client_sdk::types::Decimal; diff --git a/src/output/stream.rs b/src/output/stream.rs new file mode 100644 index 0000000..b2b86b9 --- /dev/null +++ b/src/output/stream.rs @@ -0,0 +1,104 @@ +//! Output formatters for WebSocket stream events. +//! +//! Works with the native [`crate::ws::WsEvent`] type — no SDK dependency. + +use anyhow::Result; +use serde_json::Value; + +use super::OutputFormat; +use crate::ws::WsEvent; + +pub fn print_event(event: &WsEvent, output: &OutputFormat) -> Result<()> { + match output { + OutputFormat::Json => { + // Emit the raw payload as a single-line JSON object (NDJSON). + println!("{}", serde_json::to_string(&event.payload)?); + } + OutputFormat::Table => { + print_table_line(event); + } + } + Ok(()) +} + +fn print_table_line(event: &WsEvent) { + let p = &event.payload; + let asset = truncate_id(&str_field(p, "asset_id")); + + match event.event_type.as_str() { + "book" => { + let bids = p.get("bids").and_then(Value::as_array).map_or(0, Vec::len); + let asks = p.get("asks").and_then(Value::as_array).map_or(0, Vec::len); + let best_bid = p + .get("bids") + .and_then(Value::as_array) + .and_then(|a| a.first()) + .and_then(|l| l.get("price")) + .map_or_else(|| "-".into(), |v| v.to_string()); + let best_ask = p + .get("asks") + .and_then(Value::as_array) + .and_then(|a| a.first()) + .and_then(|l| l.get("price")) + .map_or_else(|| "-".into(), |v| v.to_string()); + println!("BOOK | Asset: {asset} | Bid: {best_bid:<6} | Ask: {best_ask:<6} | Levels: {bids}/{asks}"); + } + "price_change" => { + if let Some(changes) = p.get("price_changes").and_then(Value::as_array) { + for pc in changes { + let id = truncate_id(&str_field(pc, "asset_id")); + let price = str_field(pc, "price"); + let side = str_field(pc, "side").to_uppercase(); + println!("PRICE | Asset: {id} | Price: {price:<6} | Side: {side:<4}"); + } + } + } + "last_trade_price" => { + let price = str_field(p, "price"); + println!("TRADE | Asset: {asset} | Price: {price:<6}"); + } + "midpoint" => { + let mid = str_field(p, "midpoint"); + println!("MID | Asset: {asset} | Price: {mid:<6}"); + } + other => { + let prefix = other.to_uppercase(); + let safe_prefix: String = prefix.chars().take(5).collect(); + println!("{safe_prefix:<5} | {}", serde_json::to_string(p).unwrap_or_default()); + } + } +} + +fn str_field(v: &Value, key: &str) -> String { + v.get(key) + .map_or_else(|| "-".into(), |val| val.as_str().unwrap_or(&val.to_string()).to_string()) +} + +fn truncate_id(s: &str) -> String { + if s.chars().count() <= 12 { + s.to_string() + } else { + let prefix: String = s.chars().take(6).collect(); + let suffix: String = s.chars().rev().take(4).collect::>().into_iter().rev().collect(); + format!("{prefix}…{suffix}") + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn truncate_short_id_unchanged() { + assert_eq!(truncate_id("12345"), "12345"); + } + + #[test] + fn truncate_long_id_abbreviated() { + let long_id = "106585164761922456203746651621390029417453862034640469075081961934906147433548"; + let result = truncate_id(long_id); + assert!(result.contains('…'), "Expected ellipsis in: {result}"); + assert!(result.starts_with("106585")); + assert!(result.ends_with("3548")); + } +} diff --git a/src/ws.rs b/src/ws.rs new file mode 100644 index 0000000..d5acb7b --- /dev/null +++ b/src/ws.rs @@ -0,0 +1,194 @@ +//! Native WebSocket client for the Polymarket CLOB real-time API. +//! +//! Connects directly to `wss://ws-subscriptions-clob.polymarket.com/ws/market` +//! using `tokio-tungstenite`. No SDK dependency — just raw WebSocket frames. + +use std::str; + +use anyhow::{anyhow, Context, Result}; +use futures_util::{Stream, SinkExt as _, StreamExt as _}; +use rustls::crypto::ring::default_provider; +use serde::{Deserialize, Serialize}; +use tokio_tungstenite::connect_async; +use tokio_tungstenite::tungstenite::Message; + +const WS_MARKET_URL: &str = "wss://ws-subscriptions-clob.polymarket.com/ws/market"; + +// --------------------------------------------------------------------------- +// Subscription protocol +// --------------------------------------------------------------------------- + +#[derive(Serialize)] +struct SubscribeRequest<'a> { + r#type: &'static str, + #[serde(skip_serializing_if = "Option::is_none")] + operation: Option<&'static str>, + #[serde(rename = "assets_ids")] + asset_ids: &'a [String], + markets: &'a [String], + #[serde(skip_serializing_if = "Option::is_none")] + initial_dump: Option, +} + +impl<'a> SubscribeRequest<'a> { + fn market(asset_ids: &'a [String]) -> Self { + Self { + r#type: "market", + operation: Some("subscribe"), + asset_ids, + markets: &[], + initial_dump: Some(true), + } + } +} + +// --------------------------------------------------------------------------- +// Inbound message types (only what the CLI needs) +// --------------------------------------------------------------------------- + +/// Raw JSON event from the WebSocket — we keep it loosely typed so the CLI +/// never breaks when the upstream adds new fields. +#[derive(Debug, Clone, Deserialize)] +pub struct WsEvent { + pub event_type: String, + #[serde(flatten)] + pub payload: serde_json::Value, +} + +// --------------------------------------------------------------------------- +// Connection +// --------------------------------------------------------------------------- + +/// Connect to the market channel, subscribe to the given asset IDs, and yield +/// each raw [`WsEvent`] to the caller. +/// +/// The returned stream is live: it stays open until the server closes it, the +/// caller drops the stream, or a fatal error occurs. +pub async fn subscribe_market( + asset_ids: &[String], +) -> Result>> { + // Install the default crypto provider for rustls (ignores errors if already installed) + let _ = default_provider().install_default(); + + let (ws_stream, _response) = connect_async(WS_MARKET_URL) + .await + .context("Failed to connect to Polymarket WebSocket")?; + + let (mut sink, stream) = ws_stream.split(); + + // Send subscription + let request = SubscribeRequest::market(asset_ids); + let payload = serde_json::to_string(&request)?; + sink.send(Message::Text(payload.into())).await?; + + // Map incoming frames → WsEvent(s) + Ok(stream.filter_map(move |frame_result| async move { + let frame = match frame_result { + Ok(f) => f, + Err(e) => return Some(Err(anyhow!("WebSocket error: {e}"))), + }; + + let text = match &frame { + Message::Text(t) => t.as_ref(), + Message::Binary(b) => { + return match str::from_utf8(b) { + Ok(s) => parse_events(s), + Err(_) => None, + } + } + Message::Ping(_) | Message::Pong(_) | Message::Frame(_) => return None, + Message::Close(_) => return Some(Err(anyhow!("WebSocket closed by server"))), + }; + + parse_events(text) + })) +} + +/// Parse a JSON text frame into a single `WsEvent` result. +/// +/// The server may send single objects or arrays — we handle both. +fn parse_events(text: &str) -> Option> { + let trimmed = text.trim(); + if trimmed.is_empty() { + return None; + } + + // Single object + if trimmed.starts_with('{') { + return match serde_json::from_str::(trimmed) { + Ok(event) => Some(Ok(event)), + Err(e) => Some(Err(anyhow!("Failed to parse WS message: {e}"))), + }; + } + + // Array — yield the first parseable event (the stream adapter is per-frame) + if trimmed.starts_with('[') { + return match serde_json::from_str::>(trimmed) { + Ok(events) => events.into_iter().next().map(Ok), + Err(e) => Some(Err(anyhow!("Failed to parse WS array message: {e}"))), + }; + } + + None +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn parse_single_event() { + let json = r#"{"event_type":"book","asset_id":"123","market":"0x01","timestamp":"1234567890","bids":[],"asks":[]}"#; + let result = parse_events(json); + assert!(result.is_some()); + let event = result.unwrap().unwrap(); + assert_eq!(event.event_type, "book"); + } + + #[test] + fn parse_array_events() { + let json = r#"[{"event_type":"price_change","market":"0x01","timestamp":"123","price_changes":[]}]"#; + let result = parse_events(json); + assert!(result.is_some()); + let event = result.unwrap().unwrap(); + assert_eq!(event.event_type, "price_change"); + } + + #[test] + fn parse_empty_returns_none() { + assert!(parse_events("").is_none()); + assert!(parse_events(" ").is_none()); + } + + #[test] + fn subscribe_request_serialises_correctly() { + let ids = vec!["123".to_string(), "456".to_string()]; + let req = SubscribeRequest::market(&ids); + let json = serde_json::to_string(&req).unwrap(); + assert!(json.contains(r#""type":"market"#)); + assert!(json.contains(r#""operation":"subscribe"#)); + assert!(json.contains(r#""assets_ids":["123","456"]"#)); + assert!(json.contains(r#""initial_dump":true"#)); + } + + #[test] + fn parse_malformed_array_returns_error() { + let json = r#"[{"not_an_event": true}]"#; + let result = parse_events(json); + assert!(result.is_some()); + assert!(result.unwrap().is_err()); + } + + #[test] + fn parse_non_json_text_returns_none() { + assert!(parse_events("hello world").is_none()); + } + + #[test] + fn parse_empty_array_returns_none() { + let result = parse_events("[]"); + assert!(result.is_some_and(|r| r.is_ok()) == false); + // Empty array parses fine but has no elements → None from .next() + assert!(parse_events("[]").is_none()); + } +} diff --git a/tests/cli_integration.rs b/tests/cli_integration.rs index 41d3d11..9cde55e 100644 --- a/tests/cli_integration.rs +++ b/tests/cli_integration.rs @@ -28,6 +28,7 @@ fn help_lists_all_top_level_commands() { .and(predicate::str::contains("data")) .and(predicate::str::contains("bridge")) .and(predicate::str::contains("wallet")) + .and(predicate::str::contains("stream")) .and(predicate::str::contains("status")), ); } @@ -486,3 +487,58 @@ fn wallet_address_succeeds_or_fails_gracefully() { // Either succeeds or fails with an error message — not a panic assert!(output.status.success() || !output.stderr.is_empty()); } + +#[test] +fn stream_help_lists_subcommands() { + polymarket() + .args(["stream", "--help"]) + .assert() + .success() + .stdout( + predicate::str::contains("orderbook") + .and(predicate::str::contains("prices")) + .and(predicate::str::contains("last-trade")) + .and(predicate::str::contains("midpoints")), + ); +} + +#[test] +fn stream_orderbook_requires_token_ids() { + polymarket() + .args(["stream", "orderbook"]) + .assert() + .failure(); +} + +#[test] +fn stream_prices_requires_token_ids() { + polymarket() + .args(["stream", "prices"]) + .assert() + .failure(); +} + +#[test] +fn stream_last_trade_requires_token_ids() { + polymarket() + .args(["stream", "last-trade"]) + .assert() + .failure(); +} + +#[test] +fn stream_midpoints_requires_token_ids() { + polymarket() + .args(["stream", "midpoints"]) + .assert() + .failure(); +} + +#[test] +fn stream_max_events_flag_appears_in_help() { + polymarket() + .args(["stream", "--help"]) + .assert() + .success() + .stdout(predicate::str::contains("--max-events")); +} From 1f44fc36ab994b308ee2ea93a882c8243d12dd87 Mon Sep 17 00:00:00 2001 From: konor Date: Sun, 8 Mar 2026 22:23:15 +0200 Subject: [PATCH 2/6] fix(ws): resolve array truncation and ctrl+c blocking issues --- Cargo.lock | 73 +++++----------------------- Cargo.toml | 3 +- src/commands/stream.rs | 49 ++++++++----------- src/ws.rs | 106 ++++++++++++++++++++++------------------- 4 files changed, 92 insertions(+), 139 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4805b18..ad8d83e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1050,15 +1050,6 @@ dependencies = [ "generic-array", ] -[[package]] -name = "block2" -version = "0.6.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cdeb9d870516001442e364c5220d3574d2da8dc765554b4a617230d33fa58ef5" -dependencies = [ - "objc2", -] - [[package]] name = "blst" version = "0.3.16" @@ -1449,17 +1440,6 @@ dependencies = [ "typenum", ] -[[package]] -name = "ctrlc" -version = "3.5.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e0b1fab2ae45819af2d0731d60f2afe17227ebb1a1538a236da84c93e9a60162" -dependencies = [ - "dispatch2", - "nix 0.31.2", - "windows-sys 0.61.2", -] - [[package]] name = "darling" version = "0.21.3" @@ -1652,18 +1632,6 @@ dependencies = [ "windows-sys 0.61.2", ] -[[package]] -name = "dispatch2" -version = "0.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1e0e367e4e7da84520dedcac1901e4da967309406d1e51017ae1abfb97adbd38" -dependencies = [ - "bitflags", - "block2", - "libc", - "objc2", -] - [[package]] name = "displaydoc" version = "0.2.5" @@ -2723,18 +2691,6 @@ dependencies = [ "libc", ] -[[package]] -name = "nix" -version = "0.31.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5d6d0705320c1e6ba1d912b5e37cf18071b6c2e9b7fa8215a1e8a7651966f5d3" -dependencies = [ - "bitflags", - "cfg-if", - "cfg_aliases", - "libc", -] - [[package]] name = "normalize-line-endings" version = "0.3.0" @@ -2821,21 +2777,6 @@ dependencies = [ "smallvec", ] -[[package]] -name = "objc2" -version = "0.6.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3a12a8ed07aefc768292f076dc3ac8c48f3781c8f2d5851dd3d98950e8c5a89f" -dependencies = [ - "objc2-encode", -] - -[[package]] -name = "objc2-encode" -version = "4.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ef25abbcd74fb2609453eb695bd2f860d389e457f67dc17cafc8b8cbc89d0c33" - [[package]] name = "once_cell" version = "1.21.3" @@ -3038,7 +2979,6 @@ dependencies = [ "assert_cmd", "chrono", "clap", - "ctrlc", "dirs", "futures-util", "polymarket-client-sdk", @@ -3845,7 +3785,7 @@ dependencies = [ "libc", "log", "memchr", - "nix 0.29.0", + "nix", "radix_trie", "unicode-segmentation", "unicode-width", @@ -4174,6 +4114,16 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" +[[package]] +name = "signal-hook-registry" +version = "1.4.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c4db69cba1110affc0e9f7bcd48bbf87b3f4fc7c61fc9155afd4c469eb3d6c1b" +dependencies = [ + "errno", + "libc", +] + [[package]] name = "signature" version = "2.2.0" @@ -4514,6 +4464,7 @@ dependencies = [ "libc", "mio", "pin-project-lite", + "signal-hook-registry", "socket2", "tokio-macros", "windows-sys 0.61.2", diff --git a/Cargo.toml b/Cargo.toml index efc2d36..a172aa0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,7 +17,7 @@ path = "src/main.rs" polymarket-client-sdk = { version = "0.4", features = ["gamma", "data", "bridge", "clob", "ctf"] } alloy = { version = "1.6.3", default-features = false, features = ["providers", "sol-types", "contract", "reqwest", "reqwest-rustls-tls", "signer-local", "signers"] } clap = { version = "4", features = ["derive"] } -tokio = { version = "1", features = ["rt-multi-thread", "macros"] } +tokio = { version = "1", features = ["rt-multi-thread", "macros", "signal"] } serde_json = "1" serde = { version = "1", features = ["derive"] } tabled = "0.17" @@ -29,7 +29,6 @@ rustyline = "15" tokio-tungstenite = { version = "0.28", features = ["rustls-tls-webpki-roots"] } rustls = { version = "0.23", features = ["ring"] } futures-util = "0.3" -ctrlc = "3" [dev-dependencies] assert_cmd = "2" diff --git a/src/commands/stream.rs b/src/commands/stream.rs index a7fdf35..b6332f6 100644 --- a/src/commands/stream.rs +++ b/src/commands/stream.rs @@ -3,9 +3,6 @@ //! Uses a native WebSocket connection (via `tokio-tungstenite`) to the //! Polymarket CLOB WebSocket API. No third-party SDK integration. -use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::Arc; - use anyhow::{Context as _, Result}; use clap::{Args, Subcommand}; use futures_util::StreamExt as _; @@ -48,17 +45,6 @@ pub enum StreamCommand { }, } -/// Create a shutdown signal tied to Ctrl+C. -fn shutdown_flag() -> Arc { - let flag = Arc::new(AtomicBool::new(false)); - let f = Arc::clone(&flag); - ctrlc::set_handler(move || { - f.store(true, Ordering::SeqCst); - }) - .expect("failed to register Ctrl+C handler"); - flag -} - fn split_ids(csv: &str) -> Vec { csv.split(',') .map(|s| s.trim().to_string()) @@ -67,7 +53,6 @@ fn split_ids(csv: &str) -> Vec { } pub async fn execute(args: StreamArgs, output: OutputFormat) -> Result<()> { - let shutdown = shutdown_flag(); let max = args.max_events; let (asset_ids, filter) = match &args.command { @@ -82,23 +67,31 @@ pub async fn execute(args: StreamArgs, output: OutputFormat) -> Result<()> { let mut stream = Box::pin(ws::subscribe_market(&asset_ids).await?); let mut count: u64 = 0; - while let Some(result) = stream.next().await { - if shutdown.load(Ordering::SeqCst) { - break; - } + loop { + tokio::select! { + biased; - let event = result.context("WebSocket stream error")?; + _ = tokio::signal::ctrl_c() => { + break; + } - // Filter to the requested event type - if event.event_type != filter { - continue; - } + Some(result) = stream.next() => { + let event = result.context("WebSocket stream error")?; + + // Filter to the requested event type + if event.event_type != filter { + continue; + } + + stream_output::print_event(&event, &output)?; - stream_output::print_event(&event, &output)?; + count += 1; + if max.is_some_and(|m| count >= m) { + break; + } + } - count += 1; - if max.is_some_and(|m| count >= m) { - break; + else => break, // stream ended } } diff --git a/src/ws.rs b/src/ws.rs index d5acb7b..6dcd86d 100644 --- a/src/ws.rs +++ b/src/ws.rs @@ -81,55 +81,51 @@ pub async fn subscribe_market( let payload = serde_json::to_string(&request)?; sink.send(Message::Text(payload.into())).await?; - // Map incoming frames → WsEvent(s) - Ok(stream.filter_map(move |frame_result| async move { - let frame = match frame_result { - Ok(f) => f, - Err(e) => return Some(Err(anyhow!("WebSocket error: {e}"))), - }; - - let text = match &frame { - Message::Text(t) => t.as_ref(), - Message::Binary(b) => { - return match str::from_utf8(b) { + // Map incoming frames → WsEvent(s), flattening arrays into individual items + Ok(stream.flat_map(move |frame_result| { + let events: Vec> = match frame_result { + Err(e) => vec![Err(anyhow!("WebSocket error: {e}"))], + Ok(frame) => match &frame { + Message::Text(t) => parse_events(t.as_ref()), + Message::Binary(b) => match str::from_utf8(b) { Ok(s) => parse_events(s), - Err(_) => None, - } - } - Message::Ping(_) | Message::Pong(_) | Message::Frame(_) => return None, - Message::Close(_) => return Some(Err(anyhow!("WebSocket closed by server"))), + Err(_) => vec![], + }, + Message::Close(_) => vec![Err(anyhow!("WebSocket closed by server"))], + _ => vec![], + }, }; - - parse_events(text) + futures_util::stream::iter(events) })) } -/// Parse a JSON text frame into a single `WsEvent` result. +/// Parse a JSON text frame into zero or more [`WsEvent`] results. /// -/// The server may send single objects or arrays — we handle both. -fn parse_events(text: &str) -> Option> { +/// The server may send single objects or arrays — we handle both and +/// return **all** events so that array frames are never silently truncated. +fn parse_events(text: &str) -> Vec> { let trimmed = text.trim(); if trimmed.is_empty() { - return None; + return vec![]; } // Single object if trimmed.starts_with('{') { return match serde_json::from_str::(trimmed) { - Ok(event) => Some(Ok(event)), - Err(e) => Some(Err(anyhow!("Failed to parse WS message: {e}"))), + Ok(event) => vec![Ok(event)], + Err(e) => vec![Err(anyhow!("Failed to parse WS message: {e}"))], }; } - // Array — yield the first parseable event (the stream adapter is per-frame) + // Array — yield every event in the array if trimmed.starts_with('[') { return match serde_json::from_str::>(trimmed) { - Ok(events) => events.into_iter().next().map(Ok), - Err(e) => Some(Err(anyhow!("Failed to parse WS array message: {e}"))), + Ok(events) => events.into_iter().map(Ok).collect(), + Err(e) => vec![Err(anyhow!("Failed to parse WS array message: {e}"))], }; } - None + vec![] } #[cfg(test)] @@ -139,25 +135,41 @@ mod tests { #[test] fn parse_single_event() { let json = r#"{"event_type":"book","asset_id":"123","market":"0x01","timestamp":"1234567890","bids":[],"asks":[]}"#; - let result = parse_events(json); - assert!(result.is_some()); - let event = result.unwrap().unwrap(); + let results = parse_events(json); + assert_eq!(results.len(), 1); + let event = results.into_iter().next().unwrap().unwrap(); assert_eq!(event.event_type, "book"); } #[test] - fn parse_array_events() { + fn parse_array_single_event() { let json = r#"[{"event_type":"price_change","market":"0x01","timestamp":"123","price_changes":[]}]"#; - let result = parse_events(json); - assert!(result.is_some()); - let event = result.unwrap().unwrap(); + let results = parse_events(json); + assert_eq!(results.len(), 1); + let event = results.into_iter().next().unwrap().unwrap(); assert_eq!(event.event_type, "price_change"); } #[test] - fn parse_empty_returns_none() { - assert!(parse_events("").is_none()); - assert!(parse_events(" ").is_none()); + fn parse_array_yields_all_events() { + let json = r#"[ + {"event_type":"price_change","market":"0x01","timestamp":"1","price_changes":[]}, + {"event_type":"book","market":"0x02","timestamp":"2","bids":[],"asks":[]}, + {"event_type":"midpoint","market":"0x03","timestamp":"3","midpoint":"0.5"} + ]"#; + let results = parse_events(json); + assert_eq!(results.len(), 3, "All events in the array must be yielded"); + let types: Vec = results + .into_iter() + .map(|r| r.unwrap().event_type) + .collect(); + assert_eq!(types, vec!["price_change", "book", "midpoint"]); + } + + #[test] + fn parse_empty_returns_empty_vec() { + assert!(parse_events("").is_empty()); + assert!(parse_events(" ").is_empty()); } #[test] @@ -174,21 +186,19 @@ mod tests { #[test] fn parse_malformed_array_returns_error() { let json = r#"[{"not_an_event": true}]"#; - let result = parse_events(json); - assert!(result.is_some()); - assert!(result.unwrap().is_err()); + let results = parse_events(json); + assert_eq!(results.len(), 1); + assert!(results.into_iter().next().unwrap().is_err()); } #[test] - fn parse_non_json_text_returns_none() { - assert!(parse_events("hello world").is_none()); + fn parse_non_json_text_returns_empty() { + assert!(parse_events("hello world").is_empty()); } #[test] - fn parse_empty_array_returns_none() { - let result = parse_events("[]"); - assert!(result.is_some_and(|r| r.is_ok()) == false); - // Empty array parses fine but has no elements → None from .next() - assert!(parse_events("[]").is_none()); + fn parse_empty_array_returns_empty() { + let results = parse_events("[]"); + assert!(results.is_empty()); } } From 22cdf27bc1bb08f619e8f83467649b6c2e8e0293 Mon Sep 17 00:00:00 2001 From: konor Date: Sun, 8 Mar 2026 22:36:15 +0200 Subject: [PATCH 3/6] fix(ws): resolve max-events off-by-one and json event-type dropping --- src/commands/stream.rs | 7 ++++--- src/output/stream.rs | 13 +++++++++++-- 2 files changed, 15 insertions(+), 5 deletions(-) diff --git a/src/commands/stream.rs b/src/commands/stream.rs index b6332f6..bb37ea2 100644 --- a/src/commands/stream.rs +++ b/src/commands/stream.rs @@ -83,12 +83,13 @@ pub async fn execute(args: StreamArgs, output: OutputFormat) -> Result<()> { continue; } - stream_output::print_event(&event, &output)?; - - count += 1; + // Check event limit before printing if max.is_some_and(|m| count >= m) { break; } + + stream_output::print_event(&event, &output)?; + count += 1; } else => break, // stream ended diff --git a/src/output/stream.rs b/src/output/stream.rs index b2b86b9..4e4fb93 100644 --- a/src/output/stream.rs +++ b/src/output/stream.rs @@ -11,8 +11,17 @@ use crate::ws::WsEvent; pub fn print_event(event: &WsEvent, output: &OutputFormat) -> Result<()> { match output { OutputFormat::Json => { - // Emit the raw payload as a single-line JSON object (NDJSON). - println!("{}", serde_json::to_string(&event.payload)?); + // The `event_type` field is consumed by serde into the struct + // and excluded from the flattened `payload` Value. Re-inject it + // so every NDJSON line is self-describing for downstream consumers. + let mut obj = event.payload.clone(); + if let Some(map) = obj.as_object_mut() { + map.insert( + "event_type".to_string(), + serde_json::Value::String(event.event_type.clone()), + ); + } + println!("{}", serde_json::to_string(&obj)?); } OutputFormat::Table => { print_table_line(event); From f5f14fde645e1c76b511124c0357b2504e238d33 Mon Sep 17 00:00:00 2001 From: konor Date: Sun, 8 Mar 2026 23:05:07 +0200 Subject: [PATCH 4/6] fix(cli): prevent stream termination hang on disconnect --- src/commands/stream.rs | 33 +++++++++++++++++---------------- 1 file changed, 17 insertions(+), 16 deletions(-) diff --git a/src/commands/stream.rs b/src/commands/stream.rs index bb37ea2..ad478d9 100644 --- a/src/commands/stream.rs +++ b/src/commands/stream.rs @@ -75,24 +75,25 @@ pub async fn execute(args: StreamArgs, output: OutputFormat) -> Result<()> { break; } - Some(result) = stream.next() => { - let event = result.context("WebSocket stream error")?; - - // Filter to the requested event type - if event.event_type != filter { - continue; - } - - // Check event limit before printing - if max.is_some_and(|m| count >= m) { - break; + frame = stream.next() => match frame { + Some(result) => { + let event = result.context("WebSocket stream error")?; + + // Filter to the requested event type + if event.event_type != filter { + continue; + } + + // Check event limit before printing + if max.is_some_and(|m| count >= m) { + break; + } + + stream_output::print_event(&event, &output)?; + count += 1; } - - stream_output::print_event(&event, &output)?; - count += 1; + None => break, // stream ended } - - else => break, // stream ended } } From b16e17e67d7ea19a63fd1521958cda7156d2b949 Mon Sep 17 00:00:00 2001 From: konor Date: Sun, 8 Mar 2026 23:18:20 +0200 Subject: [PATCH 5/6] fix(ws): gracefully skip non-event server messages without crashing --- src/ws.rs | 55 ++++++++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 44 insertions(+), 11 deletions(-) diff --git a/src/ws.rs b/src/ws.rs index 6dcd86d..93dee94 100644 --- a/src/ws.rs +++ b/src/ws.rs @@ -103,26 +103,47 @@ pub async fn subscribe_market( /// /// The server may send single objects or arrays — we handle both and /// return **all** events so that array frames are never silently truncated. +/// +/// Non-event messages (subscription confirmations, heartbeats, error +/// responses) that lack `event_type` are silently skipped instead of +/// returning errors, which would otherwise crash the stream via `?`. fn parse_events(text: &str) -> Vec> { let trimmed = text.trim(); if trimmed.is_empty() { return vec![]; } - // Single object + // Single object — skip gracefully if it lacks `event_type` if trimmed.starts_with('{') { - return match serde_json::from_str::(trimmed) { + // Quick pre-check: only attempt WsEvent parsing when the key exists. + let obj: serde_json::Value = match serde_json::from_str(trimmed) { + Ok(v) => v, + Err(e) => return vec![Err(anyhow!("Failed to parse WS JSON: {e}"))], + }; + if !obj.get("event_type").is_some_and(|v| v.is_string()) { + // Not an event (e.g. subscription ack, heartbeat) — skip. + return vec![]; + } + return match serde_json::from_value::(obj) { Ok(event) => vec![Ok(event)], - Err(e) => vec![Err(anyhow!("Failed to parse WS message: {e}"))], + Err(e) => vec![Err(anyhow!("Failed to parse WS event: {e}"))], }; } - // Array — yield every event in the array + // Array — yield only elements that carry `event_type` if trimmed.starts_with('[') { - return match serde_json::from_str::>(trimmed) { - Ok(events) => events.into_iter().map(Ok).collect(), - Err(e) => vec![Err(anyhow!("Failed to parse WS array message: {e}"))], + let arr: Vec = match serde_json::from_str(trimmed) { + Ok(v) => v, + Err(e) => return vec![Err(anyhow!("Failed to parse WS array: {e}"))], }; + return arr + .into_iter() + .filter(|v| v.get("event_type").is_some_and(|et| et.is_string())) + .map(|v| { + serde_json::from_value::(v) + .map_err(|e| anyhow!("Failed to parse WS event in array: {e}")) + }) + .collect(); } vec![] @@ -184,11 +205,23 @@ mod tests { } #[test] - fn parse_malformed_array_returns_error() { - let json = r#"[{"not_an_event": true}]"#; + fn parse_non_event_object_is_skipped() { + // Subscription confirmations, heartbeats, etc. lack `event_type` + let json = r#"{"type":"subscription","channel":"market","status":"ok"}"#; + assert!(parse_events(json).is_empty()); + } + + #[test] + fn parse_array_skips_non_event_elements() { + // Array with a mix of events and non-events + let json = r#"[ + {"type":"heartbeat","timestamp":"123"}, + {"event_type":"book","market":"0x01","timestamp":"2","bids":[],"asks":[]}, + {"status":"ok"} + ]"#; let results = parse_events(json); - assert_eq!(results.len(), 1); - assert!(results.into_iter().next().unwrap().is_err()); + assert_eq!(results.len(), 1, "Only the element with event_type should be yielded"); + assert_eq!(results[0].as_ref().unwrap().event_type, "book"); } #[test] From 1de653c802c54f227a6e432e22da5e4b8d6f055f Mon Sep 17 00:00:00 2001 From: konor Date: Sun, 8 Mar 2026 23:30:31 +0200 Subject: [PATCH 6/6] fix(cli): remove JSON quotes from orderbook price output --- src/output/stream.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/output/stream.rs b/src/output/stream.rs index 4e4fb93..8174615 100644 --- a/src/output/stream.rs +++ b/src/output/stream.rs @@ -43,13 +43,13 @@ fn print_table_line(event: &WsEvent) { .and_then(Value::as_array) .and_then(|a| a.first()) .and_then(|l| l.get("price")) - .map_or_else(|| "-".into(), |v| v.to_string()); + .map_or_else(|| "-".into(), |v| v.as_str().unwrap_or("-").to_string()); let best_ask = p .get("asks") .and_then(Value::as_array) .and_then(|a| a.first()) .and_then(|l| l.get("price")) - .map_or_else(|| "-".into(), |v| v.to_string()); + .map_or_else(|| "-".into(), |v| v.as_str().unwrap_or("-").to_string()); println!("BOOK | Asset: {asset} | Bid: {best_bid:<6} | Ask: {best_ask:<6} | Levels: {bids}/{asks}"); } "price_change" => {