Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion bins/torii-introspect-bin/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "torii-introspect-bin"
version = "0.1.5"
version = "0.1.6"
edition = "2021"
description = "Dojo introspect indexer for PostgreSQL or SQLite"

Expand Down
2 changes: 2 additions & 0 deletions crates/pathfinder/src/fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ pub trait EventFetcher {
to_block: u64,
) -> PFResult<(Vec<BlockContext>, Vec<StarknetEvent>)>;
}

impl EventFetcher for Connection {
fn get_events(&self, from_block: u64, to_block: u64) -> PFResult<Vec<StarknetEvent>> {
let total_events = self.get_number_of_events_for_blocks(from_block, to_block)?;
Expand All @@ -50,6 +51,7 @@ impl EventFetcher for Connection {
}
}
}

Ok(events)
}

Expand Down
2 changes: 2 additions & 0 deletions crates/pathfinder/src/test.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
use crate::connect;
#[cfg(feature = "etl")]
use crate::extractor::PathfinderExtractor;
use crate::fetcher::EventFetcher;

const DB_PATH: &str = "/mnt/store/mainnet.sqlite";

#[test]
#[cfg(feature = "etl")]
#[ignore = "requires /mnt/store/mainnet.sqlite snapshot"]
fn test_emitted_events() {
let mut extractor =
Expand Down
1 change: 1 addition & 0 deletions crates/torii-controllers-sink/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ torii-sql.workspace = true
anyhow.workspace = true
async-trait.workspace = true
chrono.workspace = true
hex.workspace = true
metrics.workspace = true
reqwest = { version = "0.12", features = ["json", "rustls-tls"] }
serde.workspace = true
Expand Down
136 changes: 129 additions & 7 deletions crates/torii-controllers-sink/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ struct ControllerNode {

#[derive(Debug, Clone, Deserialize)]
struct ControllerEdge {
node: ControllerNode,
node: Option<ControllerNode>,
}

#[derive(Debug, Clone, Deserialize, Default)]
Expand Down Expand Up @@ -93,7 +93,7 @@ impl TryFrom<ControllerNode> for StoredController {
value.address
)
})?;
let normalized = format!("{felt_addr:#066x}");
let normalized = normalize_controller_address(&felt_addr);
Ok(Self {
id: normalized.clone(),
address: normalized,
Expand All @@ -104,6 +104,10 @@ impl TryFrom<ControllerNode> for StoredController {
}
}

fn normalize_controller_address(address: &Felt) -> String {
format!("0x{}", hex::encode(address.to_be_bytes_vec()))
}

struct ControllersStore {
pool: Pool<Any>,
backend: DbBackend,
Expand Down Expand Up @@ -328,13 +332,22 @@ query {{
match result {
Ok(response) if response.status().is_success() => {
let body: ControllersResponse = response.json().await?;
if let Some(errors) = body.errors {
if let Some(errors) = &body.errors {
let msg = errors
.into_iter()
.map(|error| error.message)
.iter()
.map(|error| error.message.clone())
.collect::<Vec<_>>()
.join("; ");
return Err(anyhow!("controller GraphQL query failed: {msg}"));
if body.data.is_none() {
return Err(anyhow!("controller GraphQL query failed: {msg}"));
}

tracing::warn!(
target: "torii::sinks::controllers",
error_count = errors.len(),
errors = %msg,
"Controller GraphQL response returned partial errors; skipping bad edges"
);
}

return Ok(body
Expand All @@ -345,7 +358,15 @@ query {{
.controllers
.edges
.into_iter()
.map(|edge| edge.node)
.filter_map(|edge| {
if edge.node.is_none() {
tracing::warn!(
target: "torii::sinks::controllers",
"Skipping controller edge with null node"
);
}
edge.node
})
.collect());
}
Ok(response) if attempts < MAX_RETRIES => {
Expand Down Expand Up @@ -717,6 +738,107 @@ mod tests {
assert_eq!(username, "user_one");
}

#[tokio::test]
async fn controllers_sink_skips_null_nodes() {
let api_url = spawn_graphql_server(json!({
"data": {
"controllers": {
"edges": [
{ "node": null },
{
"node": {
"address": "0x123",
"createdAt": "2024-03-20T12:00:00Z",
"account": { "username": "test_user" }
}
}
]
}
}
}))
.await
.unwrap();

let sink = ControllersSink::new(":memory:", Some(1), Some(api_url))
.await
.unwrap();
sink.store.initialize().await.unwrap();

sink.process(&[], &make_batch(1_710_936_000, 1_710_936_100))
.await
.unwrap();

let count: i64 = query_scalar(&format!("SELECT COUNT(*) FROM {CONTROLLERS_TABLE}"))
.fetch_one(&sink.store.pool)
.await
.unwrap();
assert_eq!(count, 1);
}

#[tokio::test]
async fn controllers_sink_accepts_partial_graphql_errors_when_data_exists() {
let api_url = spawn_graphql_server(json!({
"errors": [
{
"message": "ent: account not found",
"path": ["controllers", "edges", 0, "node", "account"]
}
],
"data": {
"controllers": {
"edges": [
{ "node": null },
{
"node": {
"address": "0x123",
"createdAt": "2024-03-20T12:00:00Z",
"account": { "username": "test_user" }
}
}
]
}
}
}))
.await
.unwrap();

let sink = ControllersSink::new(":memory:", Some(1), Some(api_url))
.await
.unwrap();
sink.store.initialize().await.unwrap();

sink.process(&[], &make_batch(1_710_936_000, 1_710_936_100))
.await
.unwrap();

let username: String = query_scalar(&format!(
"SELECT username FROM {CONTROLLERS_TABLE} WHERE address = ?1"
))
.bind("0x0000000000000000000000000000000000000000000000000000000000000123")
.fetch_one(&sink.store.pool)
.await
.unwrap();
assert_eq!(username, "test_user");
}

#[test]
fn stored_controller_normalizes_addresses_to_fixed_hex() {
let controller = StoredController::try_from(ControllerNode {
address: "0x123".to_string(),
created_at: "2024-03-20T12:00:00Z".to_string(),
account: ControllerAccount {
username: "test_user".to_string(),
},
})
.unwrap();

assert_eq!(
controller.address,
"0x0000000000000000000000000000000000000000000000000000000000000123"
);
assert_eq!(controller.id, controller.address);
}

#[tokio::test]
async fn initialize_runs_full_sync_when_table_is_empty() {
let api_url = spawn_graphql_server(json!({
Expand Down
Loading