Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,47 @@ All fallible operations return `AsyncJsonStreamReaderError`:

Minimum supported Rust version is **1.74**.

## Benchmark (Serde vs asyncjsonstream)

The examples folder includes a generator and benchmark for a single large JSON object with a
`rows` array. This comparison highlights the memory savings when you **stream and skip** large
fields instead of deserializing full objects.

### Generate a 5GB fixture

```bash
cargo run --release --example generate_big_object -- \
--path /tmp/big_object.json \
--target-bytes 5368709120 \
--payload-bytes 1024
```

### Run benchmarks (macOS)

```bash
/usr/bin/time -l cargo run --release --example bench_big_object -- \
--path /tmp/big_object.json --mode async

/usr/bin/time -l cargo run --release --example bench_big_object -- \
--path /tmp/big_object.json --mode async-light

/usr/bin/time -l cargo run --release --example bench_big_object -- \
--path /tmp/big_object.json --mode serde
```

`async` deserializes each row into a `serde_json::Value` (higher memory). `async-light` only
reads `id` and skips other fields using tokens (low memory).

### Results (MacBook Pro, macOS, 5GB file, payload 1KB)

| Mode | Rows | Elapsed (ms) | Max RSS (bytes) | Peak footprint (bytes) |
|--------------|----------|--------------|-----------------|------------------------|
| async | 4,979,433 | 7,432 | 3,320,676,352 | 5,382,197,400 |
| async-light | 4,979,433 | 10,340 | 2,916,352 | 2,146,616 |
| serde | 4,979,433 | 6,662 | 10,902,372,352 | 14,253,713,704 |

Checksums matched across modes, confirming identical `id` aggregation.

## License

Licensed under either of:
Expand Down
323 changes: 323 additions & 0 deletions examples/bench_big_object.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,323 @@
use asyncjsonstream::{AsyncJsonStreamReader, JsonToken};
use serde_json::Value;
use std::env;
use std::fs;
use std::path::PathBuf;
use std::time::Instant;
use tokio::io::BufReader;

fn usage() {
eprintln!(
"Usage: bench_big_object --path <file> [--mode serde|async|async-light|both] [--repeat <n>]"
);
}

fn parse_u64(value: &str, name: &str) -> u64 {
value
.parse()
.unwrap_or_else(|_| panic!("Invalid {name}: {value}"))
}

fn parse_mode(value: &str) -> Mode {
match value {
"serde" => Mode::Serde,
"async" => Mode::Async,
"async-light" => Mode::AsyncLight,
"both" => Mode::Both,
_ => panic!("Invalid mode: {value}"),
}
}

#[derive(Clone, Copy)]
enum Mode {
Serde,
Async,
AsyncLight,
Both,
}

fn bench_serde(path: &PathBuf) -> Result<(u64, u64, u128), Box<dyn std::error::Error>> {
let start = Instant::now();
let data = fs::read(path)?;
let parsed: Value = serde_json::from_slice(&data)?;
let rows = parsed
.get("rows")
.and_then(|v| v.as_array())
.ok_or_else(|| "missing rows array".to_string())?;

let mut checksum: u64 = 0;
for row in rows {
if let Some(id) = row.get("id").and_then(|v| v.as_u64()) {
checksum = checksum.wrapping_add(id);
}
}

Ok((rows.len() as u64, checksum, start.elapsed().as_millis()))
}

async fn bench_async(path: &PathBuf) -> Result<(u64, u64, u128), Box<dyn std::error::Error>> {
let start = Instant::now();
let file = tokio::fs::File::open(path).await?;
let reader = BufReader::new(file);
let mut reader = AsyncJsonStreamReader::new(reader);

let mut rows: u64 = 0;
let mut checksum: u64 = 0;

while let Some(key) = reader.next_object_entry().await? {
if key == "rows" {
while reader.start_array_item().await? {
let obj = reader.deserialize_object().await?;
if let Some(id) = obj.get("id").and_then(|v| v.as_u64()) {
checksum = checksum.wrapping_add(id);
}
rows += 1;
}
}
}

Ok((rows, checksum, start.elapsed().as_millis()))
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum Container {
Object,
Array,
}

async fn consume_value<R>(
reader: &mut AsyncJsonStreamReader<R>,
) -> Result<(), Box<dyn std::error::Error>>
where
R: tokio::io::AsyncRead + Unpin,
{
let token = reader
.next_token()
.await?
.ok_or_else(|| "unexpected EOF while consuming value".to_string())?;
consume_value_from_token(reader, token).await
}

async fn consume_value_from_token<R>(
reader: &mut AsyncJsonStreamReader<R>,
mut token: JsonToken,
) -> Result<(), Box<dyn std::error::Error>>
where
R: tokio::io::AsyncRead + Unpin,
{
let mut stack: Vec<Container> = Vec::new();

loop {
match token {
JsonToken::StartObject => stack.push(Container::Object),
JsonToken::StartArray => stack.push(Container::Array),
JsonToken::EndObject => match stack.pop() {
Some(Container::Object) => {}
_ => return Err("unexpected EndObject".into()),
},
JsonToken::EndArray => match stack.pop() {
Some(Container::Array) => {}
_ => return Err("unexpected EndArray".into()),
},
JsonToken::EndObjectOrListItem => {}
JsonToken::Key(_) => {}
JsonToken::String(_)
| JsonToken::Number(_)
| JsonToken::Boolean(_)
| JsonToken::Null => {}
}

if stack.is_empty() {
match token {
JsonToken::String(_)
| JsonToken::Number(_)
| JsonToken::Boolean(_)
| JsonToken::Null
| JsonToken::EndObject
| JsonToken::EndArray => break,
_ => {}
}
}

token = reader
.next_token()
.await?
.ok_or_else(|| "unexpected EOF while consuming value".to_string())?;
}

Ok(())
}

async fn bench_async_light(path: &PathBuf) -> Result<(u64, u64, u128), Box<dyn std::error::Error>> {
let start = Instant::now();
let file = tokio::fs::File::open(path).await?;
let reader = BufReader::new(file);
let mut reader = AsyncJsonStreamReader::new(reader);

let mut rows: u64 = 0;
let mut checksum: u64 = 0;

let token = reader
.next_token()
.await?
.ok_or_else(|| "empty input".to_string())?;
if token != JsonToken::StartObject {
return Err("expected root object".into());
}

loop {
let token = reader
.next_token()
.await?
.ok_or_else(|| "unexpected EOF in root object".to_string())?;
match token {
JsonToken::Key(key) => {
if key == "rows" {
let next = reader
.next_token()
.await?
.ok_or_else(|| "unexpected EOF before rows array".to_string())?;
if next != JsonToken::StartArray {
return Err("expected rows array".into());
}

loop {
let token = reader
.next_token()
.await?
.ok_or_else(|| "unexpected EOF in rows array".to_string())?;
match token {
JsonToken::EndArray => break,
JsonToken::EndObjectOrListItem => continue,
JsonToken::StartObject => loop {
let token = reader.next_token().await?.ok_or_else(|| {
"unexpected EOF while reading row".to_string()
})?;
match token {
JsonToken::EndObject => {
rows += 1;
break;
}
JsonToken::EndObjectOrListItem => continue,
JsonToken::Key(field) => {
if field == "id" {
let token =
reader.next_token().await?.ok_or_else(|| {
"unexpected EOF reading id".to_string()
})?;
match token {
JsonToken::Number(n) => {
if let Ok(id) = n.parse::<u64>() {
checksum = checksum.wrapping_add(id);
}
}
JsonToken::String(s) => {
if let Ok(id) = s.parse::<u64>() {
checksum = checksum.wrapping_add(id);
}
}
JsonToken::Null => {}
other => {
return Err(format!(
"unexpected id token: {other:?}"
)
.into());
}
}
} else {
consume_value(&mut reader).await?;
}
}
other => {
return Err(
format!("unexpected token in row: {other:?}").into()
);
}
}
},
other => consume_value_from_token(&mut reader, other).await?,
}
}
} else {
consume_value(&mut reader).await?;
}
}
JsonToken::EndObject => break,
JsonToken::EndObjectOrListItem => continue,
other => {
return Err(format!("unexpected token in root: {other:?}").into());
}
}
}

Ok((rows, checksum, start.elapsed().as_millis()))
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut path: Option<PathBuf> = None;
let mut mode = Mode::Both;
let mut repeat: u64 = 1;

let args: Vec<String> = env::args().skip(1).collect();
let mut i = 0;
while i < args.len() {
match args[i].as_str() {
"--path" => {
i += 1;
if i >= args.len() {
usage();
panic!("Missing value for --path");
}
path = Some(PathBuf::from(&args[i]));
}
"--mode" => {
i += 1;
if i >= args.len() {
usage();
panic!("Missing value for --mode");
}
mode = parse_mode(&args[i]);
}
"--repeat" => {
i += 1;
if i >= args.len() {
usage();
panic!("Missing value for --repeat");
}
repeat = parse_u64(&args[i], "repeat");
}
"--help" | "-h" => {
usage();
return Ok(());
}
other => {
usage();
panic!("Unknown argument: {other}");
}
}
i += 1;
}

let path = path.unwrap_or_else(|| {
usage();
panic!("--path is required");
});

for run in 1..=repeat {
if matches!(mode, Mode::Serde | Mode::Both) {
let (rows, checksum, ms) = bench_serde(&path)?;
println!("run={run} mode=serde rows={rows} checksum={checksum} elapsed_ms={ms}");
}
if matches!(mode, Mode::Async | Mode::Both) {
let (rows, checksum, ms) = bench_async(&path).await?;
println!("run={run} mode=async rows={rows} checksum={checksum} elapsed_ms={ms}");
}
if matches!(mode, Mode::AsyncLight) {
let (rows, checksum, ms) = bench_async_light(&path).await?;
println!("run={run} mode=async-light rows={rows} checksum={checksum} elapsed_ms={ms}");
}
}

Ok(())
}
Loading