From 379f427ef1809e00c84924c5bc76a647904778b1 Mon Sep 17 00:00:00 2001 From: Nimrod Gutman Date: Wed, 4 Feb 2026 10:57:57 +0200 Subject: [PATCH] feat: add benchmark generator and examples --- README.md | 41 ++++ examples/bench_big_object.rs | 323 ++++++++++++++++++++++++++++++++ examples/generate_big_object.rs | 138 ++++++++++++++ 3 files changed, 502 insertions(+) create mode 100644 examples/bench_big_object.rs create mode 100644 examples/generate_big_object.rs diff --git a/README.md b/README.md index 846caad..051b195 100644 --- a/README.md +++ b/README.md @@ -73,6 +73,47 @@ All fallible operations return `AsyncJsonStreamReaderError`: Minimum supported Rust version is **1.74**. +## Benchmark (Serde vs asyncjsonstream) + +The examples folder includes a generator and benchmark for a single large JSON object with a +`rows` array. This comparison highlights the memory savings when you **stream and skip** large +fields instead of deserializing full objects. + +### Generate a 5GB fixture + +```bash +cargo run --release --example generate_big_object -- \ + --path /tmp/big_object.json \ + --target-bytes 5368709120 \ + --payload-bytes 1024 +``` + +### Run benchmarks (macOS) + +```bash +/usr/bin/time -l cargo run --release --example bench_big_object -- \ + --path /tmp/big_object.json --mode async + +/usr/bin/time -l cargo run --release --example bench_big_object -- \ + --path /tmp/big_object.json --mode async-light + +/usr/bin/time -l cargo run --release --example bench_big_object -- \ + --path /tmp/big_object.json --mode serde +``` + +`async` deserializes each row into a `serde_json::Value` (higher memory). `async-light` only +reads `id` and skips other fields using tokens (low memory). + +### Results (MacBook Pro, macOS, 5GB file, payload 1KB) + +| Mode | Rows | Elapsed (ms) | Max RSS (bytes) | Peak footprint (bytes) | +|--------------|----------|--------------|-----------------|------------------------| +| async | 4,979,433 | 7,432 | 3,320,676,352 | 5,382,197,400 | +| async-light | 4,979,433 | 10,340 | 2,916,352 | 2,146,616 | +| serde | 4,979,433 | 6,662 | 10,902,372,352 | 14,253,713,704 | + +Checksums matched across modes, confirming identical `id` aggregation. + ## License Licensed under either of: diff --git a/examples/bench_big_object.rs b/examples/bench_big_object.rs new file mode 100644 index 0000000..144c275 --- /dev/null +++ b/examples/bench_big_object.rs @@ -0,0 +1,323 @@ +use asyncjsonstream::{AsyncJsonStreamReader, JsonToken}; +use serde_json::Value; +use std::env; +use std::fs; +use std::path::PathBuf; +use std::time::Instant; +use tokio::io::BufReader; + +fn usage() { + eprintln!( + "Usage: bench_big_object --path [--mode serde|async|async-light|both] [--repeat ]" + ); +} + +fn parse_u64(value: &str, name: &str) -> u64 { + value + .parse() + .unwrap_or_else(|_| panic!("Invalid {name}: {value}")) +} + +fn parse_mode(value: &str) -> Mode { + match value { + "serde" => Mode::Serde, + "async" => Mode::Async, + "async-light" => Mode::AsyncLight, + "both" => Mode::Both, + _ => panic!("Invalid mode: {value}"), + } +} + +#[derive(Clone, Copy)] +enum Mode { + Serde, + Async, + AsyncLight, + Both, +} + +fn bench_serde(path: &PathBuf) -> Result<(u64, u64, u128), Box> { + let start = Instant::now(); + let data = fs::read(path)?; + let parsed: Value = serde_json::from_slice(&data)?; + let rows = parsed + .get("rows") + .and_then(|v| v.as_array()) + .ok_or_else(|| "missing rows array".to_string())?; + + let mut checksum: u64 = 0; + for row in rows { + if let Some(id) = row.get("id").and_then(|v| v.as_u64()) { + checksum = checksum.wrapping_add(id); + } + } + + Ok((rows.len() as u64, checksum, start.elapsed().as_millis())) +} + +async fn bench_async(path: &PathBuf) -> Result<(u64, u64, u128), Box> { + let start = Instant::now(); + let file = tokio::fs::File::open(path).await?; + let reader = BufReader::new(file); + let mut reader = AsyncJsonStreamReader::new(reader); + + let mut rows: u64 = 0; + let mut checksum: u64 = 0; + + while let Some(key) = reader.next_object_entry().await? { + if key == "rows" { + while reader.start_array_item().await? { + let obj = reader.deserialize_object().await?; + if let Some(id) = obj.get("id").and_then(|v| v.as_u64()) { + checksum = checksum.wrapping_add(id); + } + rows += 1; + } + } + } + + Ok((rows, checksum, start.elapsed().as_millis())) +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum Container { + Object, + Array, +} + +async fn consume_value( + reader: &mut AsyncJsonStreamReader, +) -> Result<(), Box> +where + R: tokio::io::AsyncRead + Unpin, +{ + let token = reader + .next_token() + .await? + .ok_or_else(|| "unexpected EOF while consuming value".to_string())?; + consume_value_from_token(reader, token).await +} + +async fn consume_value_from_token( + reader: &mut AsyncJsonStreamReader, + mut token: JsonToken, +) -> Result<(), Box> +where + R: tokio::io::AsyncRead + Unpin, +{ + let mut stack: Vec = Vec::new(); + + loop { + match token { + JsonToken::StartObject => stack.push(Container::Object), + JsonToken::StartArray => stack.push(Container::Array), + JsonToken::EndObject => match stack.pop() { + Some(Container::Object) => {} + _ => return Err("unexpected EndObject".into()), + }, + JsonToken::EndArray => match stack.pop() { + Some(Container::Array) => {} + _ => return Err("unexpected EndArray".into()), + }, + JsonToken::EndObjectOrListItem => {} + JsonToken::Key(_) => {} + JsonToken::String(_) + | JsonToken::Number(_) + | JsonToken::Boolean(_) + | JsonToken::Null => {} + } + + if stack.is_empty() { + match token { + JsonToken::String(_) + | JsonToken::Number(_) + | JsonToken::Boolean(_) + | JsonToken::Null + | JsonToken::EndObject + | JsonToken::EndArray => break, + _ => {} + } + } + + token = reader + .next_token() + .await? + .ok_or_else(|| "unexpected EOF while consuming value".to_string())?; + } + + Ok(()) +} + +async fn bench_async_light(path: &PathBuf) -> Result<(u64, u64, u128), Box> { + let start = Instant::now(); + let file = tokio::fs::File::open(path).await?; + let reader = BufReader::new(file); + let mut reader = AsyncJsonStreamReader::new(reader); + + let mut rows: u64 = 0; + let mut checksum: u64 = 0; + + let token = reader + .next_token() + .await? + .ok_or_else(|| "empty input".to_string())?; + if token != JsonToken::StartObject { + return Err("expected root object".into()); + } + + loop { + let token = reader + .next_token() + .await? + .ok_or_else(|| "unexpected EOF in root object".to_string())?; + match token { + JsonToken::Key(key) => { + if key == "rows" { + let next = reader + .next_token() + .await? + .ok_or_else(|| "unexpected EOF before rows array".to_string())?; + if next != JsonToken::StartArray { + return Err("expected rows array".into()); + } + + loop { + let token = reader + .next_token() + .await? + .ok_or_else(|| "unexpected EOF in rows array".to_string())?; + match token { + JsonToken::EndArray => break, + JsonToken::EndObjectOrListItem => continue, + JsonToken::StartObject => loop { + let token = reader.next_token().await?.ok_or_else(|| { + "unexpected EOF while reading row".to_string() + })?; + match token { + JsonToken::EndObject => { + rows += 1; + break; + } + JsonToken::EndObjectOrListItem => continue, + JsonToken::Key(field) => { + if field == "id" { + let token = + reader.next_token().await?.ok_or_else(|| { + "unexpected EOF reading id".to_string() + })?; + match token { + JsonToken::Number(n) => { + if let Ok(id) = n.parse::() { + checksum = checksum.wrapping_add(id); + } + } + JsonToken::String(s) => { + if let Ok(id) = s.parse::() { + checksum = checksum.wrapping_add(id); + } + } + JsonToken::Null => {} + other => { + return Err(format!( + "unexpected id token: {other:?}" + ) + .into()); + } + } + } else { + consume_value(&mut reader).await?; + } + } + other => { + return Err( + format!("unexpected token in row: {other:?}").into() + ); + } + } + }, + other => consume_value_from_token(&mut reader, other).await?, + } + } + } else { + consume_value(&mut reader).await?; + } + } + JsonToken::EndObject => break, + JsonToken::EndObjectOrListItem => continue, + other => { + return Err(format!("unexpected token in root: {other:?}").into()); + } + } + } + + Ok((rows, checksum, start.elapsed().as_millis())) +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + let mut path: Option = None; + let mut mode = Mode::Both; + let mut repeat: u64 = 1; + + let args: Vec = env::args().skip(1).collect(); + let mut i = 0; + while i < args.len() { + match args[i].as_str() { + "--path" => { + i += 1; + if i >= args.len() { + usage(); + panic!("Missing value for --path"); + } + path = Some(PathBuf::from(&args[i])); + } + "--mode" => { + i += 1; + if i >= args.len() { + usage(); + panic!("Missing value for --mode"); + } + mode = parse_mode(&args[i]); + } + "--repeat" => { + i += 1; + if i >= args.len() { + usage(); + panic!("Missing value for --repeat"); + } + repeat = parse_u64(&args[i], "repeat"); + } + "--help" | "-h" => { + usage(); + return Ok(()); + } + other => { + usage(); + panic!("Unknown argument: {other}"); + } + } + i += 1; + } + + let path = path.unwrap_or_else(|| { + usage(); + panic!("--path is required"); + }); + + for run in 1..=repeat { + if matches!(mode, Mode::Serde | Mode::Both) { + let (rows, checksum, ms) = bench_serde(&path)?; + println!("run={run} mode=serde rows={rows} checksum={checksum} elapsed_ms={ms}"); + } + if matches!(mode, Mode::Async | Mode::Both) { + let (rows, checksum, ms) = bench_async(&path).await?; + println!("run={run} mode=async rows={rows} checksum={checksum} elapsed_ms={ms}"); + } + if matches!(mode, Mode::AsyncLight) { + let (rows, checksum, ms) = bench_async_light(&path).await?; + println!("run={run} mode=async-light rows={rows} checksum={checksum} elapsed_ms={ms}"); + } + } + + Ok(()) +} diff --git a/examples/generate_big_object.rs b/examples/generate_big_object.rs new file mode 100644 index 0000000..84eb1d7 --- /dev/null +++ b/examples/generate_big_object.rs @@ -0,0 +1,138 @@ +use std::env; +use std::fs::File; +use std::io::{BufWriter, Write}; +use std::path::PathBuf; +use std::time::Instant; + +fn usage() { + eprintln!( + "Usage: generate_big_object --path [--target-bytes ] [--payload-bytes ] [--rows ] [--rows-per-flush ]" + ); +} + +fn parse_u64(value: &str, name: &str) -> u64 { + value + .parse() + .unwrap_or_else(|_| panic!("Invalid {name}: {value}")) +} + +fn main() -> std::io::Result<()> { + let mut path = PathBuf::from("big_object.json"); + let mut target_bytes: u64 = 5 * 1024 * 1024 * 1024; + let mut payload_bytes: usize = 1024; + let mut rows: Option = None; + let mut rows_per_flush: usize = 8192; + + let args: Vec = env::args().skip(1).collect(); + let mut i = 0; + while i < args.len() { + match args[i].as_str() { + "--path" => { + i += 1; + if i >= args.len() { + usage(); + panic!("Missing value for --path"); + } + path = PathBuf::from(&args[i]); + } + "--target-bytes" => { + i += 1; + if i >= args.len() { + usage(); + panic!("Missing value for --target-bytes"); + } + target_bytes = parse_u64(&args[i], "target-bytes"); + } + "--payload-bytes" => { + i += 1; + if i >= args.len() { + usage(); + panic!("Missing value for --payload-bytes"); + } + payload_bytes = parse_u64(&args[i], "payload-bytes") as usize; + } + "--rows" => { + i += 1; + if i >= args.len() { + usage(); + panic!("Missing value for --rows"); + } + rows = Some(parse_u64(&args[i], "rows")); + } + "--rows-per-flush" => { + i += 1; + if i >= args.len() { + usage(); + panic!("Missing value for --rows-per-flush"); + } + rows_per_flush = parse_u64(&args[i], "rows-per-flush") as usize; + } + "--help" | "-h" => { + usage(); + return Ok(()); + } + other => { + usage(); + panic!("Unknown argument: {other}"); + } + } + i += 1; + } + + let file = File::create(&path)?; + let mut writer = BufWriter::with_capacity(4 * 1024 * 1024, file); + let payload = "x".repeat(payload_bytes); + + let start = Instant::now(); + let mut bytes_written: u64 = 0; + let mut row_count: u64 = 0; + + let header = + format!("{{\"meta\":{{\"payload_bytes\":{payload_bytes},\"schema_version\":1}},\"rows\":["); + writer.write_all(header.as_bytes())?; + bytes_written += header.len() as u64; + + loop { + if let Some(limit) = rows { + if row_count >= limit { + break; + } + } else if bytes_written >= target_bytes { + break; + } + + if row_count > 0 { + writer.write_all(b",")?; + bytes_written += 1; + } + + let id = row_count; + let value = (row_count % 10_000) as f64 / 100.0; + let flag = if row_count % 2 == 0 { "true" } else { "false" }; + let row = format!( + "{{\"id\":{id},\"value\":{value:.2},\"flag\":{flag},\"payload\":\"{payload}\"}}" + ); + writer.write_all(row.as_bytes())?; + bytes_written += row.len() as u64; + row_count += 1; + + if rows_per_flush > 0 && (row_count as usize) % rows_per_flush == 0 { + writer.flush()?; + } + } + + writer.write_all(b"]}")?; + bytes_written += 2; + writer.flush()?; + + let elapsed = start.elapsed(); + println!( + "Generated {} rows to {} ({} bytes) in {:.2?}", + row_count, + path.display(), + bytes_written, + elapsed + ); + + Ok(()) +}