Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7,890 changes: 6,578 additions & 1,312 deletions Cargo.lock

Large diffs are not rendered by default.

23 changes: 23 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,26 @@ path = "examples/http_only_sink/main.rs"
name = "test_block_extractor"
path = "examples/test_block_extractor/main.rs"

[[example]]
name = "test_katana_db_extractor"
path = "examples/test_katana_db_extractor.rs"

[[example]]
name = "bench_katana_db_extractor"
path = "examples/bench_katana_db_extractor.rs"

[[example]]
name = "probe_block_events"
path = "examples/probe_block_events.rs"

[[example]]
name = "probe_block_detail"
path = "examples/probe_block_detail.rs"

[[example]]
name = "probe_extract"
path = "examples/probe_extract.rs"

[dependencies]
# Core async runtime
tokio = { version = "1.41", features = ["full"] }
Expand Down Expand Up @@ -80,6 +100,9 @@ clap = { version = "4.5", features = ["derive"] }

# ETL Pipeline dependencies
starknet = "0.17"
katana-db = { git = "https://github.com/dojoengine/katana.git", rev = "7e6fef8" }
katana-provider = { git = "https://github.com/dojoengine/katana.git", rev = "7e6fef8" }
katana-primitives = { git = "https://github.com/dojoengine/katana.git", rev = "7e6fef8" }
sqlx = { version = "0.8", features = ["runtime-tokio", "sqlite", "postgres", "any"] }
async-trait = "0.1"
anyhow = "1.0"
Expand Down
123 changes: 123 additions & 0 deletions examples/bench_katana_db_extractor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
//! Benchmark KatanaDbExtractor throughput against a full Katana database.
//!
//! Usage: cargo run --release --example bench_katana_db_extractor -- <db_path> [batch_size]
//!
//! Prints per-batch stats and a final summary with average rates.

use std::env;
use std::time::{Duration, Instant};
use torii::etl::engine_db::{EngineDb, EngineDbConfig};
use torii::etl::extractor::{Extractor, KatanaDbConfig, KatanaDbExtractor};

#[tokio::main]
async fn main() -> anyhow::Result<()> {
let db_path = env::args()
.nth(1)
.expect("Usage: bench_katana_db_extractor <db_path> [batch_size] [from_block]");
let batch_size: u64 = env::args()
.nth(2)

Check warning on line 18 in examples/bench_katana_db_extractor.rs

View workflow job for this annotation

GitHub Actions / Format & Lint

Diff in /home/runner/work/torii-core/torii-core/examples/bench_katana_db_extractor.rs
.and_then(|s| s.parse().ok())
.unwrap_or(1000);
let from_block: u64 = env::args()
.nth(3)
.and_then(|s| s.parse().ok())
.unwrap_or(0);

println!("Opening Katana DB at: {db_path}");
println!("Batch size: {batch_size}");
println!("From block: {from_block}");
println!();

let config = KatanaDbConfig {
db_path: db_path.clone(),
from_block,
to_block: None, // run through the entire DB
batch_size,
..Default::default()
};

let mut extractor = KatanaDbExtractor::new(config)?;
let engine_db = EngineDb::new(EngineDbConfig {
path: "sqlite::memory:".to_string(),
})
.await?;

let mut total_events: u64 = 0;
let mut total_blocks: u64 = 0;
let mut total_txs: u64 = 0;
let mut total_declared: u64 = 0;
let mut total_deployed: u64 = 0;
let mut batch_count: u64 = 0;

let start = Instant::now();
let mut last_report = Instant::now();

loop {
let batch_start = Instant::now();
let batch = extractor.extract(None, &engine_db).await?;

if batch.is_empty() {
// Extractor either finished or caught up with chain head.
// For benchmarking we stop in both cases.
break;
}

if !batch.is_empty() {
let batch_elapsed = batch_start.elapsed();
let max_block = batch.blocks.keys().max().copied().unwrap_or(0);
let min_block = batch.blocks.keys().min().copied().unwrap_or(0);
let n_events = batch.events.len() as u64;
let n_txs = batch.transactions.len() as u64;
let n_blocks = batch.blocks.len() as u64;

total_events += n_events;
total_blocks += n_blocks;
total_txs += n_txs;
total_declared += batch.declared_classes.len() as u64;
total_deployed += batch.deployed_contracts.len() as u64;
batch_count += 1;

// Print progress every 5 seconds
if last_report.elapsed() >= Duration::from_secs(5) {
let overall = start.elapsed();
let overall_secs = overall.as_secs_f64();
println!(
"[{:>8.1}s] blocks {min_block:>7}-{max_block:<7} | batch: {n_events:>6} evts, {n_txs:>5} txs in {:>6.1}ms | cumulative: {total_events:>9} evts ({:>8.0} evt/s), {total_txs:>8} txs ({:>8.0} tx/s), {total_blocks:>7} blocks ({:>7.0} blk/s)",
overall_secs,
batch_elapsed.as_secs_f64() * 1000.0,
total_events as f64 / overall_secs,
total_txs as f64 / overall_secs,
total_blocks as f64 / overall_secs,
);
last_report = Instant::now();
}

if let Some(cursor) = &batch.cursor {
extractor.commit_cursor(cursor, &engine_db).await?;
}
}
}

let elapsed = start.elapsed();
let secs = elapsed.as_secs_f64();

println!();
println!("==============================");
println!(" BENCHMARK RESULTS ");
println!("==============================");
println!("Total time: {:.2}s", secs);
println!("Total blocks: {total_blocks}");
println!("Total txs: {total_txs}");
println!("Total events: {total_events}");
println!("Total declared: {total_declared}");
println!("Total deployed: {total_deployed}");
println!("Batches: {batch_count}");
println!("------------------------------");
println!("Blocks/sec: {:.0}", total_blocks as f64 / secs);

Check warning on line 116 in examples/bench_katana_db_extractor.rs

View workflow job for this annotation

GitHub Actions / Format & Lint

Diff in /home/runner/work/torii-core/torii-core/examples/bench_katana_db_extractor.rs
println!("Transactions/sec: {:.0}", total_txs as f64 / secs);
println!("Events/sec: {:.0}", total_events as f64 / secs);
println!("Avg batch time: {:.1}ms", (secs * 1000.0) / batch_count as f64);
println!("==============================");

Ok(())
}
52 changes: 52 additions & 0 deletions examples/probe_block_detail.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
//! Probe individual block queries to find which provider call hangs.
//!

Check warning on line 2 in examples/probe_block_detail.rs

View workflow job for this annotation

GitHub Actions / Format & Lint

Diff in /home/runner/work/torii-core/torii-core/examples/probe_block_detail.rs
//! Usage: cargo run --release --example probe_block_detail -- <db_path> <block_num>

use std::env;
use std::time::Instant;
use katana_provider::api::block::{BlockHashProvider, BlockNumberProvider, HeaderProvider};
use katana_provider::api::state_update::StateUpdateProvider;
use katana_provider::api::transaction::{ReceiptProvider, TransactionProvider};
use katana_provider::{DbProviderFactory, ProviderFactory};

Check warning on line 10 in examples/probe_block_detail.rs

View workflow job for this annotation

GitHub Actions / Format & Lint

Diff in /home/runner/work/torii-core/torii-core/examples/probe_block_detail.rs

fn main() -> anyhow::Result<()> {
let db_path = env::args().nth(1).expect("Usage: probe_block_detail <db_path> <block_num>");
let block_num: u64 = env::args().nth(2).and_then(|s| s.parse().ok()).unwrap_or(869604);

let db = katana_db::Db::open_ro(&db_path)?;
let factory = DbProviderFactory::new(db);
let provider = factory.provider();

println!("Probing block {block_num} step by step...");

Check warning on line 21 in examples/probe_block_detail.rs

View workflow job for this annotation

GitHub Actions / Format & Lint

Diff in /home/runner/work/torii-core/torii-core/examples/probe_block_detail.rs
let t = Instant::now();
let head = provider.latest_number()?;
println!(" latest_number() = {head} [{:.1}ms]", t.elapsed().as_secs_f64() * 1000.0);

let t = Instant::now();
let header = provider.header_by_number(block_num)?;
println!(" header_by_number() = {:?} [{:.1}ms]", header.is_some(), t.elapsed().as_secs_f64() * 1000.0);

Check warning on line 28 in examples/probe_block_detail.rs

View workflow job for this annotation

GitHub Actions / Format & Lint

Diff in /home/runner/work/torii-core/torii-core/examples/probe_block_detail.rs

let t = Instant::now();
let hash = provider.block_hash_by_num(block_num)?;
println!(" block_hash_by_num() = {:?} [{:.1}ms]", hash.is_some(), t.elapsed().as_secs_f64() * 1000.0);

Check warning on line 32 in examples/probe_block_detail.rs

View workflow job for this annotation

GitHub Actions / Format & Lint

Diff in /home/runner/work/torii-core/torii-core/examples/probe_block_detail.rs

let t = Instant::now();
let txs = provider.transactions_by_block(block_num.into())?;
println!(" transactions_by_block() = {} txs [{:.1}ms]", txs.as_ref().map_or(0, |v| v.len()), t.elapsed().as_secs_f64() * 1000.0);

Check warning on line 36 in examples/probe_block_detail.rs

View workflow job for this annotation

GitHub Actions / Format & Lint

Diff in /home/runner/work/torii-core/torii-core/examples/probe_block_detail.rs

let t = Instant::now();
let receipts = provider.receipts_by_block(block_num.into())?;
println!(" receipts_by_block() = {} receipts [{:.1}ms]", receipts.as_ref().map_or(0, |v| v.len()), t.elapsed().as_secs_f64() * 1000.0);

Check warning on line 40 in examples/probe_block_detail.rs

View workflow job for this annotation

GitHub Actions / Format & Lint

Diff in /home/runner/work/torii-core/torii-core/examples/probe_block_detail.rs

let t = Instant::now();
let deployed = provider.deployed_contracts(block_num.into())?;
println!(" deployed_contracts() = {:?} [{:.1}ms]", deployed.as_ref().map(|v| v.len()), t.elapsed().as_secs_f64() * 1000.0);

Check warning on line 44 in examples/probe_block_detail.rs

View workflow job for this annotation

GitHub Actions / Format & Lint

Diff in /home/runner/work/torii-core/torii-core/examples/probe_block_detail.rs

let t = Instant::now();
let declared = provider.declared_classes(block_num.into())?;
println!(" declared_classes() = {:?} [{:.1}ms]", declared.as_ref().map(|v| v.len()), t.elapsed().as_secs_f64() * 1000.0);

println!("Done!");
Ok(())
}
52 changes: 52 additions & 0 deletions examples/probe_block_events.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
//! Probe event counts per block in a range.
//!
//! Usage: cargo run --release --example probe_block_events -- <db_path> <from> <to>

use std::env;
use katana_provider::api::block::BlockNumberProvider;
use katana_provider::api::transaction::ReceiptProvider;
use katana_provider::{DbProviderFactory, ProviderFactory};

fn main() -> anyhow::Result<()> {
let db_path = env::args().nth(1).expect("Usage: probe_block_events <db_path> <from> <to>");
let from: u64 = env::args().nth(2).and_then(|s| s.parse().ok()).unwrap_or(869960);
let to: u64 = env::args().nth(3).and_then(|s| s.parse().ok()).unwrap_or(870010);

let db = katana_db::Db::open_ro(&db_path)?;
let factory = DbProviderFactory::new(db);
let provider = factory.provider();

let chain_head = provider.latest_number()?;
println!("Chain head: {chain_head}");
println!("Probing blocks {from}-{to}");
println!();

let to = to.min(chain_head);
for block_num in from..=to {
let receipts = provider
.receipts_by_block(block_num.into())?
.unwrap_or_default();

let mut total_events = 0usize;
let mut tx_count = 0usize;
let mut max_events_in_tx = 0usize;
for receipt in &receipts {
let n = receipt.events().len();
total_events += n;
tx_count += 1;
max_events_in_tx = max_events_in_tx.max(n);
}

if total_events > 1000 {
println!(
"Block {block_num}: {total_events:>10} events, {tx_count:>5} txs, max {max_events_in_tx} events/tx *** DENSE ***"
);
} else {
println!(
"Block {block_num}: {total_events:>10} events, {tx_count:>5} txs"
);
}
}

Ok(())
}
61 changes: 61 additions & 0 deletions examples/probe_extract.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
//! Minimal reproduction of extract() hang.

use std::time::Instant;
use torii::etl::engine_db::{EngineDb, EngineDbConfig};
use torii::etl::extractor::{Extractor, KatanaDbConfig, KatanaDbExtractor};

#[tokio::main]
async fn main() -> anyhow::Result<()> {
let db_path = std::env::args().nth(1).expect("Usage: probe_extract <db_path>");
let from: u64 = std::env::args().nth(2).and_then(|s| s.parse().ok()).unwrap_or(869600);

eprintln!("Step 1: Creating EngineDb...");
let t = Instant::now();
let engine_db = EngineDb::new(EngineDbConfig {
path: "sqlite::memory:".to_string(),
}).await?;
eprintln!(" Done [{:.1}ms]", t.elapsed().as_secs_f64() * 1000.0);

eprintln!("Step 2: Creating KatanaDbExtractor...");
let t = Instant::now();
let config = KatanaDbConfig {
db_path,
from_block: from,
to_block: None,
batch_size: 5,
..Default::default()
};
let mut extractor = KatanaDbExtractor::new(config)?;
eprintln!(" Done [{:.1}ms]", t.elapsed().as_secs_f64() * 1000.0);

eprintln!("Step 3: Extracting in loop until finished...");
let start = Instant::now();
let mut batch_count = 0u64;
let mut total_events = 0u64;
loop {
let t = Instant::now();
let batch = extractor.extract(None, &engine_db).await?;
let ms = t.elapsed().as_secs_f64() * 1000.0;

if batch.is_empty() && extractor.is_finished() {
break;
}

batch_count += 1;
total_events += batch.events.len() as u64;

if let Some(cursor) = &batch.cursor {
extractor.commit_cursor(cursor, &engine_db).await?;
}

let max_block = batch.blocks.keys().max().copied().unwrap_or(0);
eprintln!(
" batch {batch_count}: block {max_block}, {} events in {ms:.1}ms (total: {total_events})",
batch.events.len()
);
}
let elapsed = start.elapsed();
eprintln!("Done! {batch_count} batches, {total_events} events in {:.1}s", elapsed.as_secs_f64());

Ok(())
}
Loading
Loading