diff --git a/Cargo.lock b/Cargo.lock index 7d8eddb..0e4c899 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4902,6 +4902,7 @@ dependencies = [ "anyhow", "async-trait", "chrono", + "hex", "metrics", "reqwest", "serde", @@ -5132,7 +5133,7 @@ dependencies = [ [[package]] name = "torii-introspect-bin" -version = "0.1.5" +version = "0.1.6" dependencies = [ "anyhow", "clap", diff --git a/bins/torii-introspect-bin/Cargo.toml b/bins/torii-introspect-bin/Cargo.toml index 62049cc..17b3633 100644 --- a/bins/torii-introspect-bin/Cargo.toml +++ b/bins/torii-introspect-bin/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "torii-introspect-bin" -version = "0.1.5" +version = "0.1.6" edition = "2021" description = "Dojo introspect indexer for PostgreSQL or SQLite" diff --git a/crates/pathfinder/src/fetcher.rs b/crates/pathfinder/src/fetcher.rs index fa2d167..f5182cc 100644 --- a/crates/pathfinder/src/fetcher.rs +++ b/crates/pathfinder/src/fetcher.rs @@ -24,6 +24,7 @@ pub trait EventFetcher { to_block: u64, ) -> PFResult<(Vec, Vec)>; } + impl EventFetcher for Connection { fn get_events(&self, from_block: u64, to_block: u64) -> PFResult> { let total_events = self.get_number_of_events_for_blocks(from_block, to_block)?; @@ -50,6 +51,7 @@ impl EventFetcher for Connection { } } } + Ok(events) } diff --git a/crates/pathfinder/src/test.rs b/crates/pathfinder/src/test.rs index 0176c67..8451ce6 100644 --- a/crates/pathfinder/src/test.rs +++ b/crates/pathfinder/src/test.rs @@ -1,10 +1,12 @@ use crate::connect; +#[cfg(feature = "etl")] use crate::extractor::PathfinderExtractor; use crate::fetcher::EventFetcher; const DB_PATH: &str = "/mnt/store/mainnet.sqlite"; #[test] +#[cfg(feature = "etl")] #[ignore = "requires /mnt/store/mainnet.sqlite snapshot"] fn test_emitted_events() { let mut extractor = diff --git a/crates/torii-controllers-sink/Cargo.toml b/crates/torii-controllers-sink/Cargo.toml index 5435985..6396be7 100644 --- a/crates/torii-controllers-sink/Cargo.toml +++ b/crates/torii-controllers-sink/Cargo.toml @@ -11,6 +11,7 @@ torii-sql.workspace = true anyhow.workspace = true async-trait.workspace = true chrono.workspace = true +hex.workspace = true metrics.workspace = true reqwest = { version = "0.12", features = ["json", "rustls-tls"] } serde.workspace = true diff --git a/crates/torii-controllers-sink/src/lib.rs b/crates/torii-controllers-sink/src/lib.rs index 1df3705..a4f5d05 100644 --- a/crates/torii-controllers-sink/src/lib.rs +++ b/crates/torii-controllers-sink/src/lib.rs @@ -42,7 +42,7 @@ struct ControllerNode { #[derive(Debug, Clone, Deserialize)] struct ControllerEdge { - node: ControllerNode, + node: Option, } #[derive(Debug, Clone, Deserialize, Default)] @@ -93,7 +93,7 @@ impl TryFrom for StoredController { value.address ) })?; - let normalized = format!("{felt_addr:#066x}"); + let normalized = normalize_controller_address(&felt_addr); Ok(Self { id: normalized.clone(), address: normalized, @@ -104,6 +104,10 @@ impl TryFrom for StoredController { } } +fn normalize_controller_address(address: &Felt) -> String { + format!("0x{}", hex::encode(address.to_be_bytes_vec())) +} + struct ControllersStore { pool: Pool, backend: DbBackend, @@ -328,13 +332,22 @@ query {{ match result { Ok(response) if response.status().is_success() => { let body: ControllersResponse = response.json().await?; - if let Some(errors) = body.errors { + if let Some(errors) = &body.errors { let msg = errors - .into_iter() - .map(|error| error.message) + .iter() + .map(|error| error.message.clone()) .collect::>() .join("; "); - return Err(anyhow!("controller GraphQL query failed: {msg}")); + if body.data.is_none() { + return Err(anyhow!("controller GraphQL query failed: {msg}")); + } + + tracing::warn!( + target: "torii::sinks::controllers", + error_count = errors.len(), + errors = %msg, + "Controller GraphQL response returned partial errors; skipping bad edges" + ); } return Ok(body @@ -345,7 +358,15 @@ query {{ .controllers .edges .into_iter() - .map(|edge| edge.node) + .filter_map(|edge| { + if edge.node.is_none() { + tracing::warn!( + target: "torii::sinks::controllers", + "Skipping controller edge with null node" + ); + } + edge.node + }) .collect()); } Ok(response) if attempts < MAX_RETRIES => { @@ -717,6 +738,107 @@ mod tests { assert_eq!(username, "user_one"); } + #[tokio::test] + async fn controllers_sink_skips_null_nodes() { + let api_url = spawn_graphql_server(json!({ + "data": { + "controllers": { + "edges": [ + { "node": null }, + { + "node": { + "address": "0x123", + "createdAt": "2024-03-20T12:00:00Z", + "account": { "username": "test_user" } + } + } + ] + } + } + })) + .await + .unwrap(); + + let sink = ControllersSink::new(":memory:", Some(1), Some(api_url)) + .await + .unwrap(); + sink.store.initialize().await.unwrap(); + + sink.process(&[], &make_batch(1_710_936_000, 1_710_936_100)) + .await + .unwrap(); + + let count: i64 = query_scalar(&format!("SELECT COUNT(*) FROM {CONTROLLERS_TABLE}")) + .fetch_one(&sink.store.pool) + .await + .unwrap(); + assert_eq!(count, 1); + } + + #[tokio::test] + async fn controllers_sink_accepts_partial_graphql_errors_when_data_exists() { + let api_url = spawn_graphql_server(json!({ + "errors": [ + { + "message": "ent: account not found", + "path": ["controllers", "edges", 0, "node", "account"] + } + ], + "data": { + "controllers": { + "edges": [ + { "node": null }, + { + "node": { + "address": "0x123", + "createdAt": "2024-03-20T12:00:00Z", + "account": { "username": "test_user" } + } + } + ] + } + } + })) + .await + .unwrap(); + + let sink = ControllersSink::new(":memory:", Some(1), Some(api_url)) + .await + .unwrap(); + sink.store.initialize().await.unwrap(); + + sink.process(&[], &make_batch(1_710_936_000, 1_710_936_100)) + .await + .unwrap(); + + let username: String = query_scalar(&format!( + "SELECT username FROM {CONTROLLERS_TABLE} WHERE address = ?1" + )) + .bind("0x0000000000000000000000000000000000000000000000000000000000000123") + .fetch_one(&sink.store.pool) + .await + .unwrap(); + assert_eq!(username, "test_user"); + } + + #[test] + fn stored_controller_normalizes_addresses_to_fixed_hex() { + let controller = StoredController::try_from(ControllerNode { + address: "0x123".to_string(), + created_at: "2024-03-20T12:00:00Z".to_string(), + account: ControllerAccount { + username: "test_user".to_string(), + }, + }) + .unwrap(); + + assert_eq!( + controller.address, + "0x0000000000000000000000000000000000000000000000000000000000000123" + ); + assert_eq!(controller.id, controller.address); + } + #[tokio::test] async fn initialize_runs_full_sync_when_table_is_empty() { let api_url = spawn_graphql_server(json!({