diff --git a/Cargo.lock b/Cargo.lock index 28ea17b6..91d0ac60 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1031,6 +1031,7 @@ dependencies = [ [[package]] name = "dojo-introspect" version = "0.1.0" +source = "git+https://github.com/dojoengine/dojo-introspect?rev=aadc3c9#aadc3c980706596a4a083413813a0a3ab01fded7" dependencies = [ "async-trait", "introspect-rust-macros", @@ -2011,6 +2012,7 @@ dependencies = [ [[package]] name = "introspect-events" version = "0.1.2" +source = "git+https://github.com/cartridge-gg/introspect?rev=34e93c1#34e93c10c867c53c622cce03abb6431c9dae0ef5" dependencies = [ "cainome-cairo-serde", "introspect-types", @@ -2022,6 +2024,7 @@ dependencies = [ [[package]] name = "introspect-rust-macros" version = "0.1.0" +source = "git+https://github.com/cartridge-gg/introspect?rev=34e93c1#34e93c10c867c53c622cce03abb6431c9dae0ef5" dependencies = [ "paste", "proc-macro2", @@ -2033,6 +2036,7 @@ dependencies = [ [[package]] name = "introspect-types" version = "0.1.2" +source = "git+https://github.com/cartridge-gg/introspect?rev=34e93c1#34e93c10c867c53c622cce03abb6431c9dae0ef5" dependencies = [ "blake3", "convert_case", @@ -5173,6 +5177,32 @@ dependencies = [ "tracing", ] +[[package]] +name = "torii-governance" +version = "0.1.0" +dependencies = [ + "anyhow", + "async-stream", + "async-trait", + "axum", + "chrono", + "futures", + "metrics", + "prost 0.13.5", + "prost-types 0.13.5", + "serde", + "serde_json", + "sqlx", + "starknet", + "tokio", + "tonic", + "tonic-build", + "torii", + "torii-common", + "torii-erc20", + "tracing", +] + [[package]] name = "torii-introspect" version = "0.1.0" @@ -5448,6 +5478,7 @@ dependencies = [ "torii-erc1155", "torii-erc20", "torii-erc721", + "torii-governance", "torii-runtime-common", "tracing", "tracing-subscriber", diff --git a/Cargo.toml b/Cargo.toml index 4c14aeb6..1b0f8435 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,6 +11,7 @@ members = [ "crates/torii-ecs-sink", "crates/torii-entities-historical-sink", "crates/torii-erc20", + "crates/torii-governance", "crates/torii-erc721", "crates/torii-erc1155", "crates/introspect", @@ -140,6 +141,7 @@ torii-arcade-sink.path = "crates/arcade-sink" torii-ecs-sink.path = "crates/torii-ecs-sink" torii-entities-historical-sink.path = "crates/torii-entities-historical-sink" torii-erc20.path = "crates/torii-erc20" +torii-governance.path = "crates/torii-governance" torii-erc721.path = "crates/torii-erc721" torii-erc1155.path = "crates/torii-erc1155" torii-dojo.path = "./crates/dojo" diff --git a/bins/torii-introspect-bin/src/main.rs b/bins/torii-introspect-bin/src/main.rs index ea102237..703eec3f 100644 --- a/bins/torii-introspect-bin/src/main.rs +++ b/bins/torii-introspect-bin/src/main.rs @@ -1070,10 +1070,12 @@ mod tests { TokenDbSetup { engine_url: "./torii-data/engine.db".to_string(), erc20_url: "./torii-data/erc20.db".to_string(), + governance_url: "./torii-data/governance.db".to_string(), erc721_url: "./torii-data/erc721.db".to_string(), erc1155_url: "./torii-data/erc1155.db".to_string(), engine_backend: DatabaseBackend::Sqlite, erc20_backend: DatabaseBackend::Sqlite, + governance_backend: DatabaseBackend::Sqlite, erc721_backend: DatabaseBackend::Sqlite, erc1155_backend: DatabaseBackend::Sqlite, } diff --git a/bins/torii-tokens/Cargo.toml b/bins/torii-tokens/Cargo.toml index b6a98319..c272e566 100644 --- a/bins/torii-tokens/Cargo.toml +++ b/bins/torii-tokens/Cargo.toml @@ -15,6 +15,7 @@ torii-common = { path = "../../crates/torii-common" } torii-config-common.workspace = true torii-runtime-common.workspace = true torii-erc20 = { path = "../../crates/torii-erc20" } +torii-governance = { path = "../../crates/torii-governance" } torii-erc721 = { path = "../../crates/torii-erc721" } torii-erc1155 = { path = "../../crates/torii-erc1155" } diff --git a/bins/torii-tokens/src/main.rs b/bins/torii-tokens/src/main.rs index 3d49c1ff..25d008b5 100644 --- a/bins/torii-tokens/src/main.rs +++ b/bins/torii-tokens/src/main.rs @@ -69,6 +69,11 @@ use torii_erc20::{ Erc20Decoder, Erc20MetadataCommandHandler, Erc20Rule, Erc20Service, Erc20Sink, Erc20Storage, FILE_DESCRIPTOR_SET as ERC20_DESCRIPTOR_SET, }; +use torii_governance::proto::governance_server::GovernanceServer; +use torii_governance::{ + bootstrap_governance_registry, GovernanceDecoder, GovernanceRule, GovernanceService, + GovernanceSink, GovernanceStorage, FILE_DESCRIPTOR_SET as GOVERNANCE_DESCRIPTOR_SET, +}; use torii_erc721::proto::erc721_server::Erc721Server; use torii_erc721::{ @@ -85,13 +90,15 @@ use torii_erc1155::{ async fn contracts_from_registry( engine_db: &torii::etl::EngineDb, -) -> Result<(Vec, Vec, Vec)> { +) -> Result<(Vec, Vec, Vec, Vec)> { let mappings = engine_db.get_all_contract_decoders().await?; let erc20_id = DecoderId::new("erc20"); + let governance_id = DecoderId::new("governance"); let erc721_id = DecoderId::new("erc721"); let erc1155_id = DecoderId::new("erc1155"); let mut erc20 = Vec::new(); + let mut governance = Vec::new(); let mut erc721 = Vec::new(); let mut erc1155 = Vec::new(); @@ -99,6 +106,9 @@ async fn contracts_from_registry( if decoder_ids.contains(&erc20_id) { erc20.push(contract); } + if decoder_ids.contains(&governance_id) { + governance.push(contract); + } if decoder_ids.contains(&erc721_id) { erc721.push(contract); } @@ -107,7 +117,7 @@ async fn contracts_from_registry( } } - Ok((erc20, erc721, erc1155)) + Ok((erc20, governance, erc721, erc1155)) } fn extend_unique(target: &mut Vec, additions: Vec) { @@ -278,6 +288,7 @@ async fn run_indexer(config: Config) -> Result<()> { )); let mut all_erc20_addresses: Vec = Vec::new(); + let mut all_governance_addresses: Vec = Vec::new(); let mut all_erc721_addresses: Vec = Vec::new(); let mut all_erc1155_addresses: Vec = Vec::new(); @@ -335,6 +346,11 @@ async fn run_indexer(config: Config) -> Result<()> { db_setup.erc20_backend, db_setup.erc20_url ); + tracing::info!( + "Governance storage backend: {:?} ({})", + db_setup.governance_backend, + db_setup.governance_url + ); tracing::info!( "ERC721 storage backend: {:?} ({})", db_setup.erc721_backend, @@ -355,6 +371,7 @@ async fn run_indexer(config: Config) -> Result<()> { ContractRegistry::new(provider.clone(), engine_db.clone()) .with_rpc_parallelism(config.rpc_parallelism) .with_rule(Box::new(Erc20Rule::new())) + .with_rule(Box::new(GovernanceRule::new())) .with_rule(Box::new(Erc721Rule::new())) .with_rule(Box::new(Erc1155Rule::new())), ); @@ -365,33 +382,60 @@ async fn run_indexer(config: Config) -> Result<()> { tracing::info!("Loaded {} contract mappings from database", loaded_count); } + let governance_bootstrap = + bootstrap_governance_registry(provider.clone(), engine_db.clone(), registry.shared_cache()) + .await?; + extend_unique( + &mut all_governance_addresses, + governance_bootstrap + .governors + .iter() + .map(|entry| entry.governor) + .collect(), + ); + extend_unique( + &mut all_erc20_addresses, + governance_bootstrap.added_votes_tokens.clone(), + ); + if config.mode == ExtractionMode::Event { - let (reg_erc20, reg_erc721, reg_erc1155) = contracts_from_registry(&engine_db).await?; + let (reg_erc20, reg_governance, reg_erc721, reg_erc1155) = + contracts_from_registry(&engine_db).await?; let before = ( all_erc20_addresses.len(), + all_governance_addresses.len(), all_erc721_addresses.len(), all_erc1155_addresses.len(), ); extend_unique(&mut all_erc20_addresses, reg_erc20); + extend_unique(&mut all_governance_addresses, reg_governance); extend_unique(&mut all_erc721_addresses, reg_erc721); extend_unique(&mut all_erc1155_addresses, reg_erc1155); let added_from_registry = ( all_erc20_addresses.len().saturating_sub(before.0), - all_erc721_addresses.len().saturating_sub(before.1), - all_erc1155_addresses.len().saturating_sub(before.2), + all_governance_addresses.len().saturating_sub(before.1), + all_erc721_addresses.len().saturating_sub(before.2), + all_erc1155_addresses.len().saturating_sub(before.3), ); - if added_from_registry.0 + added_from_registry.1 + added_from_registry.2 > 0 { + if added_from_registry.0 + + added_from_registry.1 + + added_from_registry.2 + + added_from_registry.3 + > 0 + { tracing::info!( target: "torii_tokens", erc20 = added_from_registry.0, - erc721 = added_from_registry.1, - erc1155 = added_from_registry.2, + governance = added_from_registry.1, + erc721 = added_from_registry.2, + erc1155 = added_from_registry.3, "Loaded event-mode contracts from registry" ); } if all_erc20_addresses.is_empty() + && all_governance_addresses.is_empty() && all_erc721_addresses.is_empty() && all_erc1155_addresses.is_empty() { @@ -399,20 +443,38 @@ async fn run_indexer(config: Config) -> Result<()> { bootstrap_registry_for_event_mode(provider.clone(), &engine_db, ®istry, &config) .await?; if identified > 0 { - let (boot_erc20, boot_erc721, boot_erc1155) = + let bootstrap = bootstrap_governance_registry( + provider.clone(), + engine_db.clone(), + registry.shared_cache(), + ) + .await?; + extend_unique( + &mut all_governance_addresses, + bootstrap + .governors + .iter() + .map(|entry| entry.governor) + .collect(), + ); + extend_unique(&mut all_erc20_addresses, bootstrap.added_votes_tokens); + + let (boot_erc20, boot_governance, boot_erc721, boot_erc1155) = contracts_from_registry(&engine_db).await?; extend_unique(&mut all_erc20_addresses, boot_erc20); + extend_unique(&mut all_governance_addresses, boot_governance); extend_unique(&mut all_erc721_addresses, boot_erc721); extend_unique(&mut all_erc1155_addresses, boot_erc1155); } } if all_erc20_addresses.is_empty() + && all_governance_addresses.is_empty() && all_erc721_addresses.is_empty() && all_erc1155_addresses.is_empty() { anyhow::bail!( - "Event mode could not resolve any contracts after registry lookup/bootstrap. Provide explicit --erc20/--erc721/--erc1155 or widen bootstrap with --event-bootstrap-blocks." + "Event mode could not resolve any contracts after registry lookup/bootstrap. Provide explicit token targets or widen bootstrap with --event-bootstrap-blocks." ); } } @@ -451,6 +513,14 @@ async fn run_indexer(config: Config) -> Result<()> { }); } + for addr in &all_governance_addresses { + event_configs.push(ContractEventConfig { + address: *addr, + from_block: config.from_block, + to_block, + }); + } + for addr in &all_erc721_addresses { event_configs.push(ContractEventConfig { address: *addr, @@ -521,18 +591,21 @@ async fn run_indexer(config: Config) -> Result<()> { .command_bus_queue_size(config.metadata_queue_capacity) .engine_database_url(db_setup.engine_url.clone()) .with_extractor(extractor) - .with_contract_identifier(registry); + .with_contract_identifier(registry.clone()); let mut enabled_types: Vec<&str> = Vec::new(); let mut erc20_grpc_service: Option = None; + let mut governance_grpc_service: Option = None; let mut erc721_grpc_service: Option = None; let mut erc1155_grpc_service: Option = None; let mut token_uri_services = Vec::new(); + let mut erc20_storage_for_governance: Option> = None; // Global extraction modes create all token infra for runtime auto-discovery. let is_global_mode = config.mode == ExtractionMode::BlockRange || config.mode == ExtractionMode::GlobalEvent; let create_erc20 = is_global_mode || !all_erc20_addresses.is_empty(); + let create_governance = is_global_mode || !all_governance_addresses.is_empty(); let create_erc721 = is_global_mode || !all_erc721_addresses.is_empty(); let create_erc1155 = is_global_mode || !all_erc1155_addresses.is_empty(); @@ -546,6 +619,7 @@ async fn run_indexer(config: Config) -> Result<()> { torii_config = torii_config.add_decoder(decoder); let grpc_service = Erc20Service::new(storage.clone()); + erc20_storage_for_governance = Some(storage.clone()); torii_config = torii_config.with_command_handler(Box::new(Erc20MetadataCommandHandler::new( provider.clone(), @@ -583,6 +657,50 @@ async fn run_indexer(config: Config) -> Result<()> { } } + if create_governance { + enabled_types.push("Governance"); + + let storage = Arc::new(GovernanceStorage::new(&db_setup.governance_url).await?); + tracing::info!( + "Governance database initialized: {}", + db_setup.governance_url + ); + + let decoder = Arc::new(GovernanceDecoder::new()); + torii_config = torii_config.add_decoder(decoder); + + let grpc_service = GovernanceService::new(storage.clone()); + let mut sink = GovernanceSink::new( + storage, + provider.clone(), + engine_db.clone(), + registry.shared_cache(), + ) + .with_grpc_service(grpc_service.clone()); + if let Some(erc20_storage) = &erc20_storage_for_governance { + sink = sink.with_erc20_storage(erc20_storage.clone()); + } + torii_config = torii_config.add_sink_boxed(Box::new(sink)); + + governance_grpc_service = Some(grpc_service); + reflection_builder = + reflection_builder.register_encoded_file_descriptor_set(GOVERNANCE_DESCRIPTOR_SET); + + let governance_decoder_id = DecoderId::new("governance"); + for address in &all_governance_addresses { + torii_config = torii_config.map_contract(*address, vec![governance_decoder_id]); + } + + if all_governance_addresses.is_empty() { + tracing::info!("Governance configured for auto-discovery"); + } else { + tracing::info!( + "Governance configured with {} known governors", + all_governance_addresses.len() + ); + } + } + if create_erc721 { enabled_types.push("ERC721"); @@ -699,43 +817,83 @@ async fn run_indexer(config: Config) -> Result<()> { let erc20_server = erc20_grpc_service .map(|service| Erc20Server::new(service).accept_compressed(CompressionEncoding::Gzip)); + let governance_server = governance_grpc_service + .map(|service| GovernanceServer::new(service).accept_compressed(CompressionEncoding::Gzip)); let erc721_server = erc721_grpc_service .map(|service| Erc721Server::new(service).accept_compressed(CompressionEncoding::Gzip)); let erc1155_server = erc1155_grpc_service .map(|service| Erc1155Server::new(service).accept_compressed(CompressionEncoding::Gzip)); let mut grpc_builder = tonic::transport::Server::builder(); - let grpc_router = match (erc20_server, erc721_server, erc1155_server) { - (Some(erc20), Some(erc721), Some(erc1155)) => grpc_builder + let grpc_router = match ( + erc20_server, + governance_server, + erc721_server, + erc1155_server, + ) { + (Some(erc20), Some(governance), Some(erc721), Some(erc1155)) => grpc_builder .add_service(tonic_web::enable(erc20)) + .add_service(tonic_web::enable(governance)) .add_service(tonic_web::enable(erc721)) .add_service(tonic_web::enable(erc1155)) .add_service(reflection.clone()), - (Some(erc20), Some(erc721), None) => grpc_builder + (Some(erc20), Some(governance), Some(erc721), None) => grpc_builder .add_service(tonic_web::enable(erc20)) + .add_service(tonic_web::enable(governance)) .add_service(tonic_web::enable(erc721)) .add_service(reflection.clone()), - (Some(erc20), None, Some(erc1155)) => grpc_builder + (Some(erc20), Some(governance), None, Some(erc1155)) => grpc_builder + .add_service(tonic_web::enable(erc20)) + .add_service(tonic_web::enable(governance)) + .add_service(tonic_web::enable(erc1155)) + .add_service(reflection.clone()), + (Some(erc20), None, Some(erc721), Some(erc1155)) => grpc_builder .add_service(tonic_web::enable(erc20)) + .add_service(tonic_web::enable(erc721)) .add_service(tonic_web::enable(erc1155)) .add_service(reflection.clone()), - (None, Some(erc721), Some(erc1155)) => grpc_builder + (None, Some(governance), Some(erc721), Some(erc1155)) => grpc_builder + .add_service(tonic_web::enable(governance)) .add_service(tonic_web::enable(erc721)) .add_service(tonic_web::enable(erc1155)) .add_service(reflection.clone()), - (Some(erc20), None, None) => grpc_builder + (Some(erc20), Some(governance), None, None) => grpc_builder .add_service(tonic_web::enable(erc20)) + .add_service(tonic_web::enable(governance)) .add_service(reflection.clone()), - (None, Some(erc721), None) => grpc_builder + (Some(erc20), None, Some(erc721), None) => grpc_builder + .add_service(tonic_web::enable(erc20)) .add_service(tonic_web::enable(erc721)) .add_service(reflection.clone()), - (None, None, Some(erc1155)) => grpc_builder + (Some(erc20), None, None, Some(erc1155)) => grpc_builder + .add_service(tonic_web::enable(erc20)) .add_service(tonic_web::enable(erc1155)) .add_service(reflection.clone()), - (None, None, None) => { - // No token services, just reflection - grpc_builder.add_service(reflection) - } + (None, Some(governance), Some(erc721), None) => grpc_builder + .add_service(tonic_web::enable(governance)) + .add_service(tonic_web::enable(erc721)) + .add_service(reflection.clone()), + (None, Some(governance), None, Some(erc1155)) => grpc_builder + .add_service(tonic_web::enable(governance)) + .add_service(tonic_web::enable(erc1155)) + .add_service(reflection.clone()), + (None, None, Some(erc721), Some(erc1155)) => grpc_builder + .add_service(tonic_web::enable(erc721)) + .add_service(tonic_web::enable(erc1155)) + .add_service(reflection.clone()), + (Some(erc20), None, None, None) => grpc_builder + .add_service(tonic_web::enable(erc20)) + .add_service(reflection.clone()), + (None, Some(governance), None, None) => grpc_builder + .add_service(tonic_web::enable(governance)) + .add_service(reflection.clone()), + (None, None, Some(erc721), None) => grpc_builder + .add_service(tonic_web::enable(erc721)) + .add_service(reflection.clone()), + (None, None, None, Some(erc1155)) => grpc_builder + .add_service(tonic_web::enable(erc1155)) + .add_service(reflection.clone()), + (None, None, None, None) => grpc_builder.add_service(reflection), }; let torii_config = torii_config @@ -760,6 +918,11 @@ async fn run_indexer(config: Config) -> Result<()> { if create_erc20 { tracing::info!(" - torii.sinks.erc20.Erc20 (ERC20 queries and subscriptions)"); } + if create_governance { + tracing::info!( + " - torii.sinks.governance.Governance (governance queries and subscriptions)" + ); + } if create_erc721 { tracing::info!(" - torii.sinks.erc721.Erc721 (ERC721 queries and subscriptions)"); } diff --git a/crates/torii-governance/Cargo.toml b/crates/torii-governance/Cargo.toml new file mode 100644 index 00000000..31c47edc --- /dev/null +++ b/crates/torii-governance/Cargo.toml @@ -0,0 +1,33 @@ +[package] +name = "torii-governance" +version = "0.1.0" +edition = "2021" +description = "Governance indexer library for Torii" + +[dependencies] +torii = { path = "../.." } +torii-common = { path = "../torii-common" } +torii-erc20 = { path = "../torii-erc20" } + +tokio = { version = "1", features = ["full"] } +async-trait = "0.1" +starknet = "0.17" +sqlx = { version = "0.8.6", features = ["runtime-tokio", "sqlite", "postgres", "any"] } +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" +prost = "0.13" +prost-types = "0.13" +tonic = "0.12" +chrono = "0.4" +async-stream = "0.3" +futures = "0.3" +axum = "0.7" +anyhow = "1.0" +tracing = "0.1" +metrics = "0.24" + +[build-dependencies] +tonic-build = "0.12" + +[lints] +workspace = true diff --git a/crates/torii-governance/build.rs b/crates/torii-governance/build.rs new file mode 100644 index 00000000..2da0589f --- /dev/null +++ b/crates/torii-governance/build.rs @@ -0,0 +1,14 @@ +fn main() -> Result<(), Box> { + std::fs::create_dir_all("src/generated")?; + + tonic_build::configure() + .build_server(true) + .build_client(false) + .out_dir("src/generated") + .file_descriptor_set_path("src/generated/governance_descriptor.bin") + .compile_protos(&["proto/governance.proto"], &["proto"])?; + + println!("cargo:rerun-if-changed=proto/governance.proto"); + + Ok(()) +} diff --git a/crates/torii-governance/proto/governance.proto b/crates/torii-governance/proto/governance.proto new file mode 100644 index 00000000..e35a06f3 --- /dev/null +++ b/crates/torii-governance/proto/governance.proto @@ -0,0 +1,199 @@ +syntax = "proto3"; + +package torii.sinks.governance; + +message Cursor { + int64 id = 1; +} + +enum VotingPowerSource { + VOTING_POWER_SOURCE_UNSPECIFIED = 0; + VOTING_POWER_SOURCE_DELEGATE_VOTES_CHANGED = 1; + VOTING_POWER_SOURCE_RPC_GET_VOTES = 2; + VOTING_POWER_SOURCE_ERC20_BALANCE = 3; +} + +message Governor { + bytes address = 1; + optional bytes votes_token = 2; + optional string name = 3; + optional string version = 4; + bool src5_supported = 5; + bool governor_interface_supported = 6; +} + +message Proposal { + bytes governor = 1; + bytes proposal_id = 2; + optional bytes proposer = 3; + optional string description = 4; + uint64 snapshot = 5; + uint64 deadline = 6; + string status = 7; + uint64 created_block = 8; + bytes created_tx_hash = 9; + int64 created_timestamp = 10; + optional uint64 queued_block = 11; + optional uint64 executed_block = 12; + optional uint64 canceled_block = 13; +} + +message Vote { + bytes governor = 1; + bytes proposal_id = 2; + bytes voter = 3; + uint32 support = 4; + bytes weight = 5; + optional string reason = 6; + bytes params = 7; + uint64 block_number = 8; + bytes tx_hash = 9; + int64 timestamp = 10; +} + +message Delegation { + bytes votes_token = 1; + bytes delegator = 2; + bytes from_delegate = 3; + bytes to_delegate = 4; + uint64 block_number = 5; + bytes tx_hash = 6; + int64 timestamp = 7; +} + +message VotingPower { + bytes votes_token = 1; + bytes account = 2; + optional bytes delegate = 3; + bytes voting_power = 4; + VotingPowerSource source = 5; + uint64 last_block = 6; + bytes last_tx_hash = 7; +} + +message GovernorFilter { + repeated bytes addresses = 1; +} + +message ProposalFilter { + optional bytes governor = 1; + optional bytes proposer = 2; + optional string status = 3; +} + +message VoteFilter { + optional bytes governor = 1; + optional bytes proposal_id = 2; + optional bytes voter = 3; +} + +message DelegationFilter { + optional bytes votes_token = 1; + optional bytes delegator = 2; + optional bytes delegatee = 3; +} + +message VotingPowerFilter { + optional bytes votes_token = 1; + optional bytes account = 2; +} + +message GetGovernorsRequest { + GovernorFilter filter = 1; +} + +message GetGovernorsResponse { + repeated Governor governors = 1; +} + +message GetProposalsRequest { + ProposalFilter filter = 1; + optional Cursor cursor = 2; + uint32 limit = 3; +} + +message GetProposalsResponse { + repeated Proposal proposals = 1; + optional Cursor next_cursor = 2; +} + +message GetProposalRequest { + bytes governor = 1; + bytes proposal_id = 2; +} + +message GetProposalResponse { + optional Proposal proposal = 1; +} + +message GetVotesRequest { + VoteFilter filter = 1; + optional Cursor cursor = 2; + uint32 limit = 3; +} + +message GetVotesResponse { + repeated Vote votes = 1; + optional Cursor next_cursor = 2; +} + +message GetDelegationsRequest { + DelegationFilter filter = 1; + optional Cursor cursor = 2; + uint32 limit = 3; +} + +message GetDelegationsResponse { + repeated Delegation delegations = 1; + optional Cursor next_cursor = 2; +} + +message GetVotingPowerRequest { + VotingPowerFilter filter = 1; +} + +message GetVotingPowerResponse { + repeated VotingPower entries = 1; +} + +message SubscribeProposalsRequest { + string client_id = 1; + ProposalFilter filter = 2; +} + +message SubscribeVotesRequest { + string client_id = 1; + VoteFilter filter = 2; +} + +message SubscribeDelegationsRequest { + string client_id = 1; + DelegationFilter filter = 2; +} + +message ProposalUpdate { + Proposal proposal = 1; + int64 timestamp = 2; +} + +message VoteUpdate { + Vote vote = 1; + int64 timestamp = 2; +} + +message DelegationUpdate { + Delegation delegation = 1; + int64 timestamp = 2; +} + +service Governance { + rpc GetGovernors(GetGovernorsRequest) returns (GetGovernorsResponse); + rpc GetProposals(GetProposalsRequest) returns (GetProposalsResponse); + rpc GetProposal(GetProposalRequest) returns (GetProposalResponse); + rpc GetVotes(GetVotesRequest) returns (GetVotesResponse); + rpc GetDelegations(GetDelegationsRequest) returns (GetDelegationsResponse); + rpc GetVotingPower(GetVotingPowerRequest) returns (GetVotingPowerResponse); + rpc SubscribeProposals(SubscribeProposalsRequest) returns (stream ProposalUpdate); + rpc SubscribeVotes(SubscribeVotesRequest) returns (stream VoteUpdate); + rpc SubscribeDelegations(SubscribeDelegationsRequest) returns (stream DelegationUpdate); +} diff --git a/crates/torii-governance/src/decoder.rs b/crates/torii-governance/src/decoder.rs new file mode 100644 index 00000000..4fa2b2be --- /dev/null +++ b/crates/torii-governance/src/decoder.rs @@ -0,0 +1,426 @@ +use anyhow::Result; +use async_trait::async_trait; +use starknet::core::codec::Decode; +use starknet::core::types::{ByteArray, EmittedEvent, Felt, U256}; +use starknet::core::utils::parse_cairo_short_string; +use starknet::macros::selector; +use std::any::Any; +use std::collections::HashMap; +use torii::etl::{Decoder, Envelope, TypedBody}; + +fn payload(event: &EmittedEvent) -> Vec { + event + .keys + .iter() + .skip(1) + .copied() + .chain(event.data.iter().copied()) + .collect() +} + +fn parse_u256(parts: &[Felt]) -> U256 { + match parts { + [] => U256::from(0u64), + [single] => { + let bytes = single.to_bytes_be(); + U256::from(u128::from_be_bytes(bytes[16..32].try_into().unwrap())) + } + [low, high, ..] => U256::from_words( + u128::from_be_bytes(low.to_bytes_be()[16..32].try_into().unwrap()), + u128::from_be_bytes(high.to_bytes_be()[16..32].try_into().unwrap()), + ), + } +} + +fn parse_string(parts: &[Felt]) -> Option { + if parts.is_empty() { + return None; + } + if parts.len() == 1 { + return parse_cairo_short_string(&parts[0]).ok(); + } + if let Ok(byte_array) = ByteArray::decode(parts) { + return String::try_from(byte_array).ok(); + } + parse_cairo_short_string(&parts[0]).ok() +} + +macro_rules! typed_body { + ($name:ident, $type_id:literal) => { + impl TypedBody for $name { + fn envelope_type_id(&self) -> torii::etl::envelope::TypeId { + torii::etl::envelope::TypeId::new($type_id) + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn as_any_mut(&mut self) -> &mut dyn Any { + self + } + } + }; +} + +#[derive(Debug, Clone)] +pub struct ProposalCreated { + pub governor: Felt, + pub proposal_id: Felt, + pub proposer: Felt, + pub block_number: u64, + pub transaction_hash: Felt, +} +typed_body!(ProposalCreated, "governance.proposal_created"); + +#[derive(Debug, Clone)] +pub struct ProposalQueued { + pub governor: Felt, + pub proposal_id: Felt, + pub eta_seconds: Option, + pub block_number: u64, + pub transaction_hash: Felt, +} +typed_body!(ProposalQueued, "governance.proposal_queued"); + +#[derive(Debug, Clone)] +pub struct ProposalExecuted { + pub governor: Felt, + pub proposal_id: Felt, + pub block_number: u64, + pub transaction_hash: Felt, +} +typed_body!(ProposalExecuted, "governance.proposal_executed"); + +#[derive(Debug, Clone)] +pub struct ProposalCanceled { + pub governor: Felt, + pub proposal_id: Felt, + pub block_number: u64, + pub transaction_hash: Felt, +} +typed_body!(ProposalCanceled, "governance.proposal_canceled"); + +#[derive(Debug, Clone)] +pub struct VoteCast { + pub governor: Felt, + pub proposal_id: Felt, + pub voter: Felt, + pub support: u8, + pub weight: U256, + pub reason: Option, + pub block_number: u64, + pub transaction_hash: Felt, +} +typed_body!(VoteCast, "governance.vote_cast"); + +#[derive(Debug, Clone)] +pub struct VoteCastWithParams { + pub governor: Felt, + pub proposal_id: Felt, + pub voter: Felt, + pub support: u8, + pub weight: U256, + pub reason: Option, + pub params: Vec, + pub block_number: u64, + pub transaction_hash: Felt, +} +typed_body!(VoteCastWithParams, "governance.vote_cast_with_params"); + +#[derive(Debug, Clone)] +pub struct DelegateChanged { + pub votes_token: Felt, + pub delegator: Felt, + pub from_delegate: Felt, + pub to_delegate: Felt, + pub block_number: u64, + pub transaction_hash: Felt, +} +typed_body!(DelegateChanged, "governance.delegate_changed"); + +#[derive(Debug, Clone)] +pub struct DelegateVotesChanged { + pub votes_token: Felt, + pub delegate: Felt, + pub previous_votes: U256, + pub new_votes: U256, + pub block_number: u64, + pub transaction_hash: Felt, +} +typed_body!(DelegateVotesChanged, "governance.delegate_votes_changed"); + +pub struct GovernanceDecoder; + +impl GovernanceDecoder { + pub fn new() -> Self { + Self + } + + fn envelope(event: &EmittedEvent, kind: &str, body: Box) -> Envelope { + let mut metadata = HashMap::new(); + metadata.insert("contract".to_string(), format!("{:#x}", event.from_address)); + metadata.insert( + "block_number".to_string(), + event.block_number.unwrap_or(0).to_string(), + ); + metadata.insert( + "tx_hash".to_string(), + format!("{:#x}", event.transaction_hash), + ); + Envelope::new( + format!( + "governance_{}_{}_{}", + kind, + event.block_number.unwrap_or(0), + event.transaction_hash + ), + body, + metadata, + ) + } + + fn decode_proposal_created(event: &EmittedEvent) -> Option { + let payload = payload(event); + if payload.len() < 2 { + return None; + } + Some(Self::envelope( + event, + "proposal_created", + Box::new(ProposalCreated { + governor: event.from_address, + proposal_id: payload[0], + proposer: payload[1], + block_number: event.block_number.unwrap_or(0), + transaction_hash: event.transaction_hash, + }), + )) + } + + fn decode_proposal_queued(event: &EmittedEvent) -> Option { + let payload = payload(event); + let eta_seconds = payload.get(1).and_then(|value| (*value).try_into().ok()); + payload.first().copied().map(|proposal_id| { + Self::envelope( + event, + "proposal_queued", + Box::new(ProposalQueued { + governor: event.from_address, + proposal_id, + eta_seconds, + block_number: event.block_number.unwrap_or(0), + transaction_hash: event.transaction_hash, + }), + ) + }) + } + + fn decode_single_proposal( + event: &EmittedEvent, + kind: &str, + canceled: bool, + ) -> Option { + let proposal_id = payload(event).first().copied()?; + Some(Self::envelope( + event, + kind, + if canceled { + Box::new(ProposalCanceled { + governor: event.from_address, + proposal_id, + block_number: event.block_number.unwrap_or(0), + transaction_hash: event.transaction_hash, + }) + } else { + Box::new(ProposalExecuted { + governor: event.from_address, + proposal_id, + block_number: event.block_number.unwrap_or(0), + transaction_hash: event.transaction_hash, + }) + }, + )) + } + + fn decode_vote_cast(event: &EmittedEvent, with_params: bool) -> Option { + let payload = payload(event); + if payload.len() < 5 { + return None; + } + let voter = payload[0]; + let proposal_id = payload[1]; + let support: u8 = payload[2].try_into().ok()?; + let weight = parse_u256(&payload[3..5.min(payload.len())]); + let tail = if payload.len() > 5 { + &payload[5..] + } else { + &[] + }; + if with_params { + Some(Self::envelope( + event, + "vote_cast_with_params", + Box::new(VoteCastWithParams { + governor: event.from_address, + proposal_id, + voter, + support, + weight, + reason: parse_string(tail), + params: tail.to_vec(), + block_number: event.block_number.unwrap_or(0), + transaction_hash: event.transaction_hash, + }), + )) + } else { + Some(Self::envelope( + event, + "vote_cast", + Box::new(VoteCast { + governor: event.from_address, + proposal_id, + voter, + support, + weight, + reason: parse_string(tail), + block_number: event.block_number.unwrap_or(0), + transaction_hash: event.transaction_hash, + }), + )) + } + } + + fn decode_delegate_changed(event: &EmittedEvent) -> Option { + let payload = payload(event); + if payload.len() < 3 { + return None; + } + Some(Self::envelope( + event, + "delegate_changed", + Box::new(DelegateChanged { + votes_token: event.from_address, + delegator: payload[0], + from_delegate: payload[1], + to_delegate: payload[2], + block_number: event.block_number.unwrap_or(0), + transaction_hash: event.transaction_hash, + }), + )) + } + + fn decode_delegate_votes_changed(event: &EmittedEvent) -> Option { + let payload = payload(event); + if payload.len() < 5 { + return None; + } + Some(Self::envelope( + event, + "delegate_votes_changed", + Box::new(DelegateVotesChanged { + votes_token: event.from_address, + delegate: payload[0], + previous_votes: parse_u256(&payload[1..3]), + new_votes: parse_u256(&payload[3..5]), + block_number: event.block_number.unwrap_or(0), + transaction_hash: event.transaction_hash, + }), + )) + } +} + +impl Default for GovernanceDecoder { + fn default() -> Self { + Self::new() + } +} + +#[async_trait] +impl Decoder for GovernanceDecoder { + fn decoder_name(&self) -> &'static str { + "governance" + } + + async fn decode_event(&self, event: &EmittedEvent) -> Result> { + if event.keys.is_empty() { + return Ok(Vec::new()); + } + let selector = event.keys[0]; + let envelope = if selector == selector!("ProposalCreated") { + Self::decode_proposal_created(event) + } else if selector == selector!("ProposalQueued") { + Self::decode_proposal_queued(event) + } else if selector == selector!("ProposalExecuted") { + Self::decode_single_proposal(event, "proposal_executed", false) + } else if selector == selector!("ProposalCanceled") { + Self::decode_single_proposal(event, "proposal_canceled", true) + } else if selector == selector!("VoteCast") { + Self::decode_vote_cast(event, false) + } else if selector == selector!("VoteCastWithParams") { + Self::decode_vote_cast(event, true) + } else if selector == selector!("DelegateChanged") { + Self::decode_delegate_changed(event) + } else if selector == selector!("DelegateVotesChanged") { + Self::decode_delegate_votes_changed(event) + } else { + None + }; + + Ok(envelope.into_iter().collect()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn decodes_proposal_created() { + let decoder = GovernanceDecoder::new(); + let event = EmittedEvent { + from_address: Felt::from(1u64), + keys: vec![selector!("ProposalCreated")], + data: vec![Felt::from(2u64), Felt::from(3u64)], + block_hash: None, + block_number: Some(10), + transaction_hash: Felt::from(4u64), + }; + let envelopes = decoder.decode_event(&event).await.unwrap(); + let body = envelopes[0] + .body + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(body.proposal_id, Felt::from(2u64)); + assert_eq!(body.proposer, Felt::from(3u64)); + } + + #[tokio::test] + async fn decodes_delegate_votes_changed() { + let decoder = GovernanceDecoder::new(); + let event = EmittedEvent { + from_address: Felt::from(9u64), + keys: vec![selector!("DelegateVotesChanged")], + data: vec![ + Felt::from(5u64), + Felt::from(7u64), + Felt::ZERO, + Felt::from(8u64), + Felt::ZERO, + ], + block_hash: None, + block_number: Some(42), + transaction_hash: Felt::from(11u64), + }; + let envelopes = decoder.decode_event(&event).await.unwrap(); + let body = envelopes[0] + .body + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(body.delegate, Felt::from(5u64)); + assert_eq!(body.previous_votes, U256::from(7u64)); + assert_eq!(body.new_votes, U256::from(8u64)); + } +} diff --git a/crates/torii-governance/src/discovery.rs b/crates/torii-governance/src/discovery.rs new file mode 100644 index 00000000..aaf5ff3d --- /dev/null +++ b/crates/torii-governance/src/discovery.rs @@ -0,0 +1,293 @@ +use anyhow::{anyhow, Result}; +use starknet::core::codec::Decode; +use starknet::core::types::{BlockId, BlockTag, ByteArray, Felt, FunctionCall, U256}; +use starknet::core::utils::parse_cairo_short_string; +use starknet::macros::selector; +use starknet::providers::jsonrpc::{HttpTransport, JsonRpcClient}; +use starknet::providers::Provider; +use std::collections::{HashMap, HashSet}; +use std::sync::Arc; +use tokio::sync::RwLock; +use torii::etl::decoder::DecoderId; +use torii::etl::EngineDb; + +pub const ISRC5_ID: Felt = + Felt::from_hex_unchecked("0x3f918d17e5ee77373b56385708f855659a07f75997f365cf87748628532a055"); +pub const IGOVERNOR_ID: Felt = + Felt::from_hex_unchecked("0x1100a1f8546595b5bd75a6cd8fcc5b015370655e66f275963321c5cd0357ac9"); + +pub fn governance_decoder_id() -> DecoderId { + DecoderId::new("governance") +} + +#[derive(Debug, Clone)] +pub struct GovernanceMetadata { + pub governor: Felt, + pub votes_token: Felt, + pub name: Option, + pub version: Option, + pub src5_supported: bool, + pub governor_interface_supported: bool, +} + +#[derive(Debug, Clone, Default)] +pub struct GovernanceBootstrap { + pub governors: Vec, + pub added_votes_tokens: Vec, +} + +#[derive(Debug, Clone, Default)] +pub struct ProposalMetadata { + pub proposer: Option, + pub snapshot: Option, + pub deadline: Option, +} + +fn decode_string_result(result: &[Felt]) -> Option { + if result.is_empty() { + return None; + } + + if result.len() == 1 { + return parse_cairo_short_string(&result[0]) + .ok() + .filter(|value| !value.is_empty()); + } + + if let Ok(byte_array) = ByteArray::decode(result) { + if let Ok(value) = String::try_from(byte_array) { + if !value.is_empty() { + return Some(value); + } + } + } + + parse_cairo_short_string(&result[0]) + .ok() + .filter(|value| !value.is_empty()) +} + +async fn call_contract( + provider: &JsonRpcClient, + contract_address: Felt, + entry_point_selector: Felt, + calldata: Vec, +) -> Result> { + provider + .call( + FunctionCall { + contract_address, + entry_point_selector, + calldata, + }, + BlockId::Tag(BlockTag::Latest), + ) + .await + .map_err(|error| anyhow!(error)) +} + +fn parse_u64_result(result: &[Felt]) -> Option { + result.first().and_then(|value| (*value).try_into().ok()) +} + +fn parse_felt_result(result: &[Felt]) -> Option { + result.first().copied() +} + +pub async fn fetch_current_votes( + provider: &JsonRpcClient, + votes_token: Felt, + account: Felt, +) -> Result { + let response = + call_contract(provider, votes_token, selector!("get_votes"), vec![account]).await?; + Ok(match response.as_slice() { + [] => U256::from(0u64), + [single] => { + let bytes = single.to_bytes_be(); + U256::from(u128::from_be_bytes(bytes[16..32].try_into().unwrap())) + } + [low, high, ..] => U256::from_words( + u128::from_be_bytes(low.to_bytes_be()[16..32].try_into().unwrap()), + u128::from_be_bytes(high.to_bytes_be()[16..32].try_into().unwrap()), + ), + }) +} + +pub async fn fetch_proposal_metadata( + provider: &JsonRpcClient, + governor: Felt, + proposal_id: Felt, +) -> ProposalMetadata { + let proposer = call_contract( + provider, + governor, + selector!("proposal_proposer"), + vec![proposal_id], + ) + .await + .ok() + .and_then(|result| parse_felt_result(&result)); + let snapshot = call_contract( + provider, + governor, + selector!("proposal_snapshot"), + vec![proposal_id], + ) + .await + .ok() + .and_then(|result| parse_u64_result(&result)); + let deadline = call_contract( + provider, + governor, + selector!("proposal_deadline"), + vec![proposal_id], + ) + .await + .ok() + .and_then(|result| parse_u64_result(&result)); + + ProposalMetadata { + proposer, + snapshot, + deadline, + } +} + +async fn supports_interface( + provider: &JsonRpcClient, + contract: Felt, + interface_id: Felt, +) -> Result { + let response = call_contract( + provider, + contract, + selector!("supports_interface"), + vec![interface_id], + ) + .await?; + + Ok(response.first().copied().unwrap_or(Felt::ZERO) != Felt::ZERO) +} + +async fn fetch_votes_token( + provider: &JsonRpcClient, + governor: Felt, +) -> Result { + let response = call_contract(provider, governor, selector!("token"), Vec::new()).await?; + response + .first() + .copied() + .ok_or_else(|| anyhow!("token() returned no address")) +} + +pub async fn fetch_governance_metadata( + provider: &JsonRpcClient, + governor: Felt, +) -> Result { + let src5_supported = supports_interface(provider, governor, ISRC5_ID) + .await + .unwrap_or(false); + let governor_interface_supported = supports_interface(provider, governor, IGOVERNOR_ID) + .await + .unwrap_or(false); + + if !src5_supported || !governor_interface_supported { + return Err(anyhow!("governor does not support required interfaces")); + } + + let votes_token = fetch_votes_token(provider, governor).await?; + let name = call_contract(provider, governor, selector!("name"), Vec::new()) + .await + .ok() + .and_then(|value| decode_string_result(&value)); + let version = call_contract(provider, governor, selector!("version"), Vec::new()) + .await + .ok() + .and_then(|value| decode_string_result(&value)); + + Ok(GovernanceMetadata { + governor, + votes_token, + name, + version, + src5_supported, + governor_interface_supported, + }) +} + +#[allow(clippy::implicit_hasher)] +pub async fn register_governor_and_votes_token( + engine_db: &EngineDb, + registry_cache: &Arc>>>, + metadata: &GovernanceMetadata, +) -> Result { + let mut added_votes_token = false; + + { + let mut cache = registry_cache.write().await; + cache.insert(metadata.governor, vec![governance_decoder_id()]); + + let token_decoders = vec![DecoderId::new("erc20"), governance_decoder_id()]; + let existing = cache + .get(&metadata.votes_token) + .cloned() + .unwrap_or_default(); + if existing != token_decoders { + cache.insert(metadata.votes_token, token_decoders); + added_votes_token = true; + } + } + + engine_db + .set_contract_decoders(metadata.governor, &[governance_decoder_id()]) + .await?; + engine_db + .set_contract_decoders( + metadata.votes_token, + &[DecoderId::new("erc20"), governance_decoder_id()], + ) + .await?; + + Ok(added_votes_token) +} + +#[allow(clippy::implicit_hasher)] +pub async fn bootstrap_governance_registry( + provider: Arc>, + engine_db: Arc, + registry_cache: Arc>>>, +) -> Result { + let mappings = engine_db.get_all_contract_decoders().await?; + let mut bootstrap = GovernanceBootstrap::default(); + let mut seen_votes_tokens = HashSet::new(); + + for (contract, decoder_ids, _) in mappings { + if !decoder_ids.contains(&governance_decoder_id()) { + continue; + } + + let metadata = match fetch_governance_metadata(provider.as_ref(), contract).await { + Ok(metadata) => metadata, + Err(error) => { + tracing::debug!( + target: "torii_governance::discovery", + governor = %format!("{contract:#x}"), + error = %error, + "Skipping governance candidate during bootstrap" + ); + continue; + } + }; + + if register_governor_and_votes_token(engine_db.as_ref(), ®istry_cache, &metadata).await? + && seen_votes_tokens.insert(metadata.votes_token) + { + bootstrap.added_votes_tokens.push(metadata.votes_token); + } + + bootstrap.governors.push(metadata); + } + + Ok(bootstrap) +} diff --git a/crates/torii-governance/src/generated/governance_descriptor.bin b/crates/torii-governance/src/generated/governance_descriptor.bin new file mode 100644 index 00000000..f8bd0b20 Binary files /dev/null and b/crates/torii-governance/src/generated/governance_descriptor.bin differ diff --git a/crates/torii-governance/src/generated/torii.sinks.governance.rs b/crates/torii-governance/src/generated/torii.sinks.governance.rs new file mode 100644 index 00000000..1337be41 --- /dev/null +++ b/crates/torii-governance/src/generated/torii.sinks.governance.rs @@ -0,0 +1,923 @@ +// This file is @generated by prost-build. +#[derive(Clone, Copy, PartialEq, ::prost::Message)] +pub struct Cursor { + #[prost(int64, tag = "1")] + pub id: i64, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct Governor { + #[prost(bytes = "vec", tag = "1")] + pub address: ::prost::alloc::vec::Vec, + #[prost(bytes = "vec", optional, tag = "2")] + pub votes_token: ::core::option::Option<::prost::alloc::vec::Vec>, + #[prost(string, optional, tag = "3")] + pub name: ::core::option::Option<::prost::alloc::string::String>, + #[prost(string, optional, tag = "4")] + pub version: ::core::option::Option<::prost::alloc::string::String>, + #[prost(bool, tag = "5")] + pub src5_supported: bool, + #[prost(bool, tag = "6")] + pub governor_interface_supported: bool, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct Proposal { + #[prost(bytes = "vec", tag = "1")] + pub governor: ::prost::alloc::vec::Vec, + #[prost(bytes = "vec", tag = "2")] + pub proposal_id: ::prost::alloc::vec::Vec, + #[prost(bytes = "vec", optional, tag = "3")] + pub proposer: ::core::option::Option<::prost::alloc::vec::Vec>, + #[prost(string, optional, tag = "4")] + pub description: ::core::option::Option<::prost::alloc::string::String>, + #[prost(uint64, tag = "5")] + pub snapshot: u64, + #[prost(uint64, tag = "6")] + pub deadline: u64, + #[prost(string, tag = "7")] + pub status: ::prost::alloc::string::String, + #[prost(uint64, tag = "8")] + pub created_block: u64, + #[prost(bytes = "vec", tag = "9")] + pub created_tx_hash: ::prost::alloc::vec::Vec, + #[prost(int64, tag = "10")] + pub created_timestamp: i64, + #[prost(uint64, optional, tag = "11")] + pub queued_block: ::core::option::Option, + #[prost(uint64, optional, tag = "12")] + pub executed_block: ::core::option::Option, + #[prost(uint64, optional, tag = "13")] + pub canceled_block: ::core::option::Option, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct Vote { + #[prost(bytes = "vec", tag = "1")] + pub governor: ::prost::alloc::vec::Vec, + #[prost(bytes = "vec", tag = "2")] + pub proposal_id: ::prost::alloc::vec::Vec, + #[prost(bytes = "vec", tag = "3")] + pub voter: ::prost::alloc::vec::Vec, + #[prost(uint32, tag = "4")] + pub support: u32, + #[prost(bytes = "vec", tag = "5")] + pub weight: ::prost::alloc::vec::Vec, + #[prost(string, optional, tag = "6")] + pub reason: ::core::option::Option<::prost::alloc::string::String>, + #[prost(bytes = "vec", tag = "7")] + pub params: ::prost::alloc::vec::Vec, + #[prost(uint64, tag = "8")] + pub block_number: u64, + #[prost(bytes = "vec", tag = "9")] + pub tx_hash: ::prost::alloc::vec::Vec, + #[prost(int64, tag = "10")] + pub timestamp: i64, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct Delegation { + #[prost(bytes = "vec", tag = "1")] + pub votes_token: ::prost::alloc::vec::Vec, + #[prost(bytes = "vec", tag = "2")] + pub delegator: ::prost::alloc::vec::Vec, + #[prost(bytes = "vec", tag = "3")] + pub from_delegate: ::prost::alloc::vec::Vec, + #[prost(bytes = "vec", tag = "4")] + pub to_delegate: ::prost::alloc::vec::Vec, + #[prost(uint64, tag = "5")] + pub block_number: u64, + #[prost(bytes = "vec", tag = "6")] + pub tx_hash: ::prost::alloc::vec::Vec, + #[prost(int64, tag = "7")] + pub timestamp: i64, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct VotingPower { + #[prost(bytes = "vec", tag = "1")] + pub votes_token: ::prost::alloc::vec::Vec, + #[prost(bytes = "vec", tag = "2")] + pub account: ::prost::alloc::vec::Vec, + #[prost(bytes = "vec", optional, tag = "3")] + pub delegate: ::core::option::Option<::prost::alloc::vec::Vec>, + #[prost(bytes = "vec", tag = "4")] + pub voting_power: ::prost::alloc::vec::Vec, + #[prost(enumeration = "VotingPowerSource", tag = "5")] + pub source: i32, + #[prost(uint64, tag = "6")] + pub last_block: u64, + #[prost(bytes = "vec", tag = "7")] + pub last_tx_hash: ::prost::alloc::vec::Vec, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct GovernorFilter { + #[prost(bytes = "vec", repeated, tag = "1")] + pub addresses: ::prost::alloc::vec::Vec<::prost::alloc::vec::Vec>, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct ProposalFilter { + #[prost(bytes = "vec", optional, tag = "1")] + pub governor: ::core::option::Option<::prost::alloc::vec::Vec>, + #[prost(bytes = "vec", optional, tag = "2")] + pub proposer: ::core::option::Option<::prost::alloc::vec::Vec>, + #[prost(string, optional, tag = "3")] + pub status: ::core::option::Option<::prost::alloc::string::String>, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct VoteFilter { + #[prost(bytes = "vec", optional, tag = "1")] + pub governor: ::core::option::Option<::prost::alloc::vec::Vec>, + #[prost(bytes = "vec", optional, tag = "2")] + pub proposal_id: ::core::option::Option<::prost::alloc::vec::Vec>, + #[prost(bytes = "vec", optional, tag = "3")] + pub voter: ::core::option::Option<::prost::alloc::vec::Vec>, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct DelegationFilter { + #[prost(bytes = "vec", optional, tag = "1")] + pub votes_token: ::core::option::Option<::prost::alloc::vec::Vec>, + #[prost(bytes = "vec", optional, tag = "2")] + pub delegator: ::core::option::Option<::prost::alloc::vec::Vec>, + #[prost(bytes = "vec", optional, tag = "3")] + pub delegatee: ::core::option::Option<::prost::alloc::vec::Vec>, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct VotingPowerFilter { + #[prost(bytes = "vec", optional, tag = "1")] + pub votes_token: ::core::option::Option<::prost::alloc::vec::Vec>, + #[prost(bytes = "vec", optional, tag = "2")] + pub account: ::core::option::Option<::prost::alloc::vec::Vec>, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct GetGovernorsRequest { + #[prost(message, optional, tag = "1")] + pub filter: ::core::option::Option, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct GetGovernorsResponse { + #[prost(message, repeated, tag = "1")] + pub governors: ::prost::alloc::vec::Vec, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct GetProposalsRequest { + #[prost(message, optional, tag = "1")] + pub filter: ::core::option::Option, + #[prost(message, optional, tag = "2")] + pub cursor: ::core::option::Option, + #[prost(uint32, tag = "3")] + pub limit: u32, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct GetProposalsResponse { + #[prost(message, repeated, tag = "1")] + pub proposals: ::prost::alloc::vec::Vec, + #[prost(message, optional, tag = "2")] + pub next_cursor: ::core::option::Option, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct GetProposalRequest { + #[prost(bytes = "vec", tag = "1")] + pub governor: ::prost::alloc::vec::Vec, + #[prost(bytes = "vec", tag = "2")] + pub proposal_id: ::prost::alloc::vec::Vec, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct GetProposalResponse { + #[prost(message, optional, tag = "1")] + pub proposal: ::core::option::Option, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct GetVotesRequest { + #[prost(message, optional, tag = "1")] + pub filter: ::core::option::Option, + #[prost(message, optional, tag = "2")] + pub cursor: ::core::option::Option, + #[prost(uint32, tag = "3")] + pub limit: u32, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct GetVotesResponse { + #[prost(message, repeated, tag = "1")] + pub votes: ::prost::alloc::vec::Vec, + #[prost(message, optional, tag = "2")] + pub next_cursor: ::core::option::Option, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct GetDelegationsRequest { + #[prost(message, optional, tag = "1")] + pub filter: ::core::option::Option, + #[prost(message, optional, tag = "2")] + pub cursor: ::core::option::Option, + #[prost(uint32, tag = "3")] + pub limit: u32, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct GetDelegationsResponse { + #[prost(message, repeated, tag = "1")] + pub delegations: ::prost::alloc::vec::Vec, + #[prost(message, optional, tag = "2")] + pub next_cursor: ::core::option::Option, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct GetVotingPowerRequest { + #[prost(message, optional, tag = "1")] + pub filter: ::core::option::Option, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct GetVotingPowerResponse { + #[prost(message, repeated, tag = "1")] + pub entries: ::prost::alloc::vec::Vec, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct SubscribeProposalsRequest { + #[prost(string, tag = "1")] + pub client_id: ::prost::alloc::string::String, + #[prost(message, optional, tag = "2")] + pub filter: ::core::option::Option, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct SubscribeVotesRequest { + #[prost(string, tag = "1")] + pub client_id: ::prost::alloc::string::String, + #[prost(message, optional, tag = "2")] + pub filter: ::core::option::Option, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct SubscribeDelegationsRequest { + #[prost(string, tag = "1")] + pub client_id: ::prost::alloc::string::String, + #[prost(message, optional, tag = "2")] + pub filter: ::core::option::Option, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct ProposalUpdate { + #[prost(message, optional, tag = "1")] + pub proposal: ::core::option::Option, + #[prost(int64, tag = "2")] + pub timestamp: i64, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct VoteUpdate { + #[prost(message, optional, tag = "1")] + pub vote: ::core::option::Option, + #[prost(int64, tag = "2")] + pub timestamp: i64, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct DelegationUpdate { + #[prost(message, optional, tag = "1")] + pub delegation: ::core::option::Option, + #[prost(int64, tag = "2")] + pub timestamp: i64, +} +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] +#[repr(i32)] +pub enum VotingPowerSource { + Unspecified = 0, + DelegateVotesChanged = 1, + RpcGetVotes = 2, + Erc20Balance = 3, +} +impl VotingPowerSource { + /// String value of the enum field names used in the ProtoBuf definition. + /// + /// The values are not transformed in any way and thus are considered stable + /// (if the ProtoBuf definition does not change) and safe for programmatic use. + pub fn as_str_name(&self) -> &'static str { + match self { + Self::Unspecified => "VOTING_POWER_SOURCE_UNSPECIFIED", + Self::DelegateVotesChanged => "VOTING_POWER_SOURCE_DELEGATE_VOTES_CHANGED", + Self::RpcGetVotes => "VOTING_POWER_SOURCE_RPC_GET_VOTES", + Self::Erc20Balance => "VOTING_POWER_SOURCE_ERC20_BALANCE", + } + } + /// Creates an enum from field names used in the ProtoBuf definition. + pub fn from_str_name(value: &str) -> ::core::option::Option { + match value { + "VOTING_POWER_SOURCE_UNSPECIFIED" => Some(Self::Unspecified), + "VOTING_POWER_SOURCE_DELEGATE_VOTES_CHANGED" => { + Some(Self::DelegateVotesChanged) + } + "VOTING_POWER_SOURCE_RPC_GET_VOTES" => Some(Self::RpcGetVotes), + "VOTING_POWER_SOURCE_ERC20_BALANCE" => Some(Self::Erc20Balance), + _ => None, + } + } +} +/// Generated server implementations. +pub mod governance_server { + #![allow( + unused_variables, + dead_code, + missing_docs, + clippy::wildcard_imports, + clippy::let_unit_value, + )] + use tonic::codegen::*; + /// Generated trait containing gRPC methods that should be implemented for use with GovernanceServer. + #[async_trait] + pub trait Governance: std::marker::Send + std::marker::Sync + 'static { + async fn get_governors( + &self, + request: tonic::Request, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; + async fn get_proposals( + &self, + request: tonic::Request, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; + async fn get_proposal( + &self, + request: tonic::Request, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; + async fn get_votes( + &self, + request: tonic::Request, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; + async fn get_delegations( + &self, + request: tonic::Request, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; + async fn get_voting_power( + &self, + request: tonic::Request, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; + /// Server streaming response type for the SubscribeProposals method. + type SubscribeProposalsStream: tonic::codegen::tokio_stream::Stream< + Item = std::result::Result, + > + + std::marker::Send + + 'static; + async fn subscribe_proposals( + &self, + request: tonic::Request, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; + /// Server streaming response type for the SubscribeVotes method. + type SubscribeVotesStream: tonic::codegen::tokio_stream::Stream< + Item = std::result::Result, + > + + std::marker::Send + + 'static; + async fn subscribe_votes( + &self, + request: tonic::Request, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; + /// Server streaming response type for the SubscribeDelegations method. + type SubscribeDelegationsStream: tonic::codegen::tokio_stream::Stream< + Item = std::result::Result, + > + + std::marker::Send + + 'static; + async fn subscribe_delegations( + &self, + request: tonic::Request, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; + } + #[derive(Debug)] + pub struct GovernanceServer { + inner: Arc, + accept_compression_encodings: EnabledCompressionEncodings, + send_compression_encodings: EnabledCompressionEncodings, + max_decoding_message_size: Option, + max_encoding_message_size: Option, + } + impl GovernanceServer { + pub fn new(inner: T) -> Self { + Self::from_arc(Arc::new(inner)) + } + pub fn from_arc(inner: Arc) -> Self { + Self { + inner, + accept_compression_encodings: Default::default(), + send_compression_encodings: Default::default(), + max_decoding_message_size: None, + max_encoding_message_size: None, + } + } + pub fn with_interceptor( + inner: T, + interceptor: F, + ) -> InterceptedService + where + F: tonic::service::Interceptor, + { + InterceptedService::new(Self::new(inner), interceptor) + } + /// Enable decompressing requests with the given encoding. + #[must_use] + pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.accept_compression_encodings.enable(encoding); + self + } + /// Compress responses with the given encoding, if the client supports it. + #[must_use] + pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.send_compression_encodings.enable(encoding); + self + } + /// Limits the maximum size of a decoded message. + /// + /// Default: `4MB` + #[must_use] + pub fn max_decoding_message_size(mut self, limit: usize) -> Self { + self.max_decoding_message_size = Some(limit); + self + } + /// Limits the maximum size of an encoded message. + /// + /// Default: `usize::MAX` + #[must_use] + pub fn max_encoding_message_size(mut self, limit: usize) -> Self { + self.max_encoding_message_size = Some(limit); + self + } + } + impl tonic::codegen::Service> for GovernanceServer + where + T: Governance, + B: Body + std::marker::Send + 'static, + B::Error: Into + std::marker::Send + 'static, + { + type Response = http::Response; + type Error = std::convert::Infallible; + type Future = BoxFuture; + fn poll_ready( + &mut self, + _cx: &mut Context<'_>, + ) -> Poll> { + Poll::Ready(Ok(())) + } + fn call(&mut self, req: http::Request) -> Self::Future { + match req.uri().path() { + "/torii.sinks.governance.Governance/GetGovernors" => { + #[allow(non_camel_case_types)] + struct GetGovernorsSvc(pub Arc); + impl< + T: Governance, + > tonic::server::UnaryService + for GetGovernorsSvc { + type Response = super::GetGovernorsResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::get_governors(&inner, request).await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = GetGovernorsSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + "/torii.sinks.governance.Governance/GetProposals" => { + #[allow(non_camel_case_types)] + struct GetProposalsSvc(pub Arc); + impl< + T: Governance, + > tonic::server::UnaryService + for GetProposalsSvc { + type Response = super::GetProposalsResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::get_proposals(&inner, request).await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = GetProposalsSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + "/torii.sinks.governance.Governance/GetProposal" => { + #[allow(non_camel_case_types)] + struct GetProposalSvc(pub Arc); + impl< + T: Governance, + > tonic::server::UnaryService + for GetProposalSvc { + type Response = super::GetProposalResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::get_proposal(&inner, request).await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = GetProposalSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + "/torii.sinks.governance.Governance/GetVotes" => { + #[allow(non_camel_case_types)] + struct GetVotesSvc(pub Arc); + impl< + T: Governance, + > tonic::server::UnaryService + for GetVotesSvc { + type Response = super::GetVotesResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::get_votes(&inner, request).await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = GetVotesSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + "/torii.sinks.governance.Governance/GetDelegations" => { + #[allow(non_camel_case_types)] + struct GetDelegationsSvc(pub Arc); + impl< + T: Governance, + > tonic::server::UnaryService + for GetDelegationsSvc { + type Response = super::GetDelegationsResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::get_delegations(&inner, request).await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = GetDelegationsSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + "/torii.sinks.governance.Governance/GetVotingPower" => { + #[allow(non_camel_case_types)] + struct GetVotingPowerSvc(pub Arc); + impl< + T: Governance, + > tonic::server::UnaryService + for GetVotingPowerSvc { + type Response = super::GetVotingPowerResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::get_voting_power(&inner, request).await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = GetVotingPowerSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + "/torii.sinks.governance.Governance/SubscribeProposals" => { + #[allow(non_camel_case_types)] + struct SubscribeProposalsSvc(pub Arc); + impl< + T: Governance, + > tonic::server::ServerStreamingService< + super::SubscribeProposalsRequest, + > for SubscribeProposalsSvc { + type Response = super::ProposalUpdate; + type ResponseStream = T::SubscribeProposalsStream; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::subscribe_proposals(&inner, request) + .await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = SubscribeProposalsSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.server_streaming(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + "/torii.sinks.governance.Governance/SubscribeVotes" => { + #[allow(non_camel_case_types)] + struct SubscribeVotesSvc(pub Arc); + impl< + T: Governance, + > tonic::server::ServerStreamingService + for SubscribeVotesSvc { + type Response = super::VoteUpdate; + type ResponseStream = T::SubscribeVotesStream; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::subscribe_votes(&inner, request).await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = SubscribeVotesSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.server_streaming(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + "/torii.sinks.governance.Governance/SubscribeDelegations" => { + #[allow(non_camel_case_types)] + struct SubscribeDelegationsSvc(pub Arc); + impl< + T: Governance, + > tonic::server::ServerStreamingService< + super::SubscribeDelegationsRequest, + > for SubscribeDelegationsSvc { + type Response = super::DelegationUpdate; + type ResponseStream = T::SubscribeDelegationsStream; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::subscribe_delegations(&inner, request) + .await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = SubscribeDelegationsSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.server_streaming(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + _ => { + Box::pin(async move { + let mut response = http::Response::new(empty_body()); + let headers = response.headers_mut(); + headers + .insert( + tonic::Status::GRPC_STATUS, + (tonic::Code::Unimplemented as i32).into(), + ); + headers + .insert( + http::header::CONTENT_TYPE, + tonic::metadata::GRPC_CONTENT_TYPE, + ); + Ok(response) + }) + } + } + } + } + impl Clone for GovernanceServer { + fn clone(&self) -> Self { + let inner = self.inner.clone(); + Self { + inner, + accept_compression_encodings: self.accept_compression_encodings, + send_compression_encodings: self.send_compression_encodings, + max_decoding_message_size: self.max_decoding_message_size, + max_encoding_message_size: self.max_encoding_message_size, + } + } + } + /// Generated gRPC service name + pub const SERVICE_NAME: &str = "torii.sinks.governance.Governance"; + impl tonic::server::NamedService for GovernanceServer { + const NAME: &'static str = SERVICE_NAME; + } +} diff --git a/crates/torii-governance/src/grpc_service.rs b/crates/torii-governance/src/grpc_service.rs new file mode 100644 index 00000000..f84b533d --- /dev/null +++ b/crates/torii-governance/src/grpc_service.rs @@ -0,0 +1,433 @@ +use crate::proto::{ + governance_server::Governance as GovernanceTrait, Cursor, Delegation, DelegationFilter, + DelegationUpdate, GetDelegationsRequest, GetDelegationsResponse, GetGovernorsRequest, + GetGovernorsResponse, GetProposalRequest, GetProposalResponse, GetProposalsRequest, + GetProposalsResponse, GetVotesRequest, GetVotesResponse, GetVotingPowerRequest, + GetVotingPowerResponse, Governor, Proposal, ProposalFilter, ProposalUpdate, + SubscribeDelegationsRequest, SubscribeProposalsRequest, SubscribeVotesRequest, Vote, + VoteFilter, VoteUpdate, VotingPower, VotingPowerSource, +}; +use crate::storage::{ + DelegationCursor, DelegationRecord, GovernanceStorage, GovernorRecord, ProposalCursor, + ProposalRecord, VoteCursor, VoteRecord, VotingPowerRecord, +}; +use async_stream::stream; +use async_trait::async_trait; +use futures::Stream; +use std::pin::Pin; +use std::sync::Arc; +use tokio::sync::broadcast; +use tonic::{Request, Response, Status}; +use torii_common::bytes_to_felt; + +#[derive(Clone)] +pub struct GovernanceService { + storage: Arc, + proposal_tx: broadcast::Sender, + vote_tx: broadcast::Sender, + delegation_tx: broadcast::Sender, +} + +impl GovernanceService { + pub fn new(storage: Arc) -> Self { + let (proposal_tx, _) = broadcast::channel(1000); + let (vote_tx, _) = broadcast::channel(1000); + let (delegation_tx, _) = broadcast::channel(1000); + Self { + storage, + proposal_tx, + vote_tx, + delegation_tx, + } + } + + pub fn broadcast_proposal(&self, proposal: Proposal) { + let _ = self.proposal_tx.send(ProposalUpdate { + proposal: Some(proposal), + timestamp: chrono::Utc::now().timestamp(), + }); + } + + pub fn broadcast_vote(&self, vote: Vote) { + let _ = self.vote_tx.send(VoteUpdate { + vote: Some(vote), + timestamp: chrono::Utc::now().timestamp(), + }); + } + + pub fn broadcast_delegation(&self, delegation: Delegation) { + let _ = self.delegation_tx.send(DelegationUpdate { + delegation: Some(delegation), + timestamp: chrono::Utc::now().timestamp(), + }); + } + + fn governor_to_proto(record: GovernorRecord) -> Governor { + Governor { + address: record.address.to_bytes_be().to_vec(), + votes_token: record.votes_token.map(|value| value.to_bytes_be().to_vec()), + name: record.name, + version: record.version, + src5_supported: record.src5_supported, + governor_interface_supported: record.governor_interface_supported, + } + } + + fn proposal_to_proto(record: ProposalRecord) -> Proposal { + Proposal { + governor: record.governor.to_bytes_be().to_vec(), + proposal_id: record.proposal_id.to_bytes_be().to_vec(), + proposer: record.proposer.map(|value| value.to_bytes_be().to_vec()), + description: record.description, + snapshot: record.snapshot, + deadline: record.deadline, + status: record.status, + created_block: record.created_block, + created_tx_hash: record.created_tx_hash.to_bytes_be().to_vec(), + created_timestamp: record.created_timestamp.unwrap_or(0), + queued_block: record.queued_block, + executed_block: record.executed_block, + canceled_block: record.canceled_block, + } + } + + fn vote_to_proto(record: VoteRecord) -> Vote { + Vote { + governor: record.governor.to_bytes_be().to_vec(), + proposal_id: record.proposal_id.to_bytes_be().to_vec(), + voter: record.voter.to_bytes_be().to_vec(), + support: u32::from(record.support), + weight: torii_common::u256_to_bytes(record.weight), + reason: record.reason, + params: record.params, + block_number: record.block_number, + tx_hash: record.tx_hash.to_bytes_be().to_vec(), + timestamp: record.timestamp.unwrap_or(0), + } + } + + fn delegation_to_proto(record: DelegationRecord) -> Delegation { + Delegation { + votes_token: record.votes_token.to_bytes_be().to_vec(), + delegator: record.delegator.to_bytes_be().to_vec(), + from_delegate: record.from_delegate.to_bytes_be().to_vec(), + to_delegate: record.to_delegate.to_bytes_be().to_vec(), + block_number: record.block_number, + tx_hash: record.tx_hash.to_bytes_be().to_vec(), + timestamp: record.timestamp.unwrap_or(0), + } + } + + fn voting_power_to_proto(record: VotingPowerRecord) -> VotingPower { + VotingPower { + votes_token: record.votes_token.to_bytes_be().to_vec(), + account: record.account.to_bytes_be().to_vec(), + delegate: record.delegate.map(|value| value.to_bytes_be().to_vec()), + voting_power: torii_common::u256_to_bytes(record.voting_power), + source: match record.source { + crate::storage::VotingPowerSource::DelegateVotesChanged => { + VotingPowerSource::DelegateVotesChanged as i32 + } + crate::storage::VotingPowerSource::RpcGetVotes => { + VotingPowerSource::RpcGetVotes as i32 + } + crate::storage::VotingPowerSource::Erc20Balance => { + VotingPowerSource::Erc20Balance as i32 + } + }, + last_block: record.last_block, + last_tx_hash: record.last_tx_hash.to_bytes_be().to_vec(), + } + } + + fn matches_proposal_filter(proposal: &Proposal, filter: &ProposalFilter) -> bool { + if let Some(governor) = &filter.governor { + if proposal.governor != *governor { + return false; + } + } + if let Some(proposer) = &filter.proposer { + if proposal.proposer.as_ref() != Some(proposer) { + return false; + } + } + if let Some(status) = &filter.status { + if proposal.status != *status { + return false; + } + } + true + } + + fn matches_vote_filter(vote: &Vote, filter: &VoteFilter) -> bool { + if let Some(governor) = &filter.governor { + if vote.governor != *governor { + return false; + } + } + if let Some(proposal_id) = &filter.proposal_id { + if vote.proposal_id != *proposal_id { + return false; + } + } + if let Some(voter) = &filter.voter { + if vote.voter != *voter { + return false; + } + } + true + } + + fn matches_delegation_filter(delegation: &Delegation, filter: &DelegationFilter) -> bool { + if let Some(votes_token) = &filter.votes_token { + if delegation.votes_token != *votes_token { + return false; + } + } + if let Some(delegator) = &filter.delegator { + if delegation.delegator != *delegator { + return false; + } + } + if let Some(delegatee) = &filter.delegatee { + if delegation.to_delegate != *delegatee { + return false; + } + } + true + } +} + +#[async_trait] +impl GovernanceTrait for GovernanceService { + type SubscribeProposalsStream = + Pin> + Send>>; + type SubscribeVotesStream = Pin> + Send>>; + type SubscribeDelegationsStream = + Pin> + Send>>; + + async fn get_governors( + &self, + request: Request, + ) -> Result, Status> { + let addresses = request + .into_inner() + .filter + .unwrap_or_default() + .addresses + .into_iter() + .filter_map(|value| bytes_to_felt(&value)) + .collect::>(); + let governors = self + .storage + .list_governors(&addresses) + .await + .map_err(|error| Status::internal(error.to_string()))? + .into_iter() + .map(Self::governor_to_proto) + .collect(); + Ok(Response::new(GetGovernorsResponse { governors })) + } + + async fn get_proposals( + &self, + request: Request, + ) -> Result, Status> { + let req = request.into_inner(); + let filter = req.filter.unwrap_or_default(); + let limit = if req.limit == 0 { + 100 + } else { + req.limit.min(1000) + }; + let (proposals, next_cursor) = self + .storage + .get_proposals_filtered( + filter + .governor + .as_ref() + .and_then(|value| bytes_to_felt(value)), + filter + .proposer + .as_ref() + .and_then(|value| bytes_to_felt(value)), + filter.status.as_deref(), + req.cursor.map(|cursor| ProposalCursor { id: cursor.id }), + limit, + ) + .await + .map_err(|error| Status::internal(error.to_string()))?; + Ok(Response::new(GetProposalsResponse { + proposals: proposals.into_iter().map(Self::proposal_to_proto).collect(), + next_cursor: next_cursor.map(|cursor| Cursor { id: cursor.id }), + })) + } + + async fn get_proposal( + &self, + request: Request, + ) -> Result, Status> { + let req = request.into_inner(); + let governor = bytes_to_felt(&req.governor) + .ok_or_else(|| Status::invalid_argument("invalid governor"))?; + let proposal_id = bytes_to_felt(&req.proposal_id) + .ok_or_else(|| Status::invalid_argument("invalid proposal_id"))?; + let proposal = self + .storage + .get_proposal(governor, proposal_id) + .await + .map_err(|error| Status::internal(error.to_string()))? + .map(Self::proposal_to_proto); + Ok(Response::new(GetProposalResponse { proposal })) + } + + async fn get_votes( + &self, + request: Request, + ) -> Result, Status> { + let req = request.into_inner(); + let filter = req.filter.unwrap_or_default(); + let limit = if req.limit == 0 { + 100 + } else { + req.limit.min(1000) + }; + let (votes, next_cursor) = self + .storage + .get_votes_filtered( + filter + .governor + .as_ref() + .and_then(|value| bytes_to_felt(value)), + filter + .proposal_id + .as_ref() + .and_then(|value| bytes_to_felt(value)), + filter.voter.as_ref().and_then(|value| bytes_to_felt(value)), + req.cursor.map(|cursor| VoteCursor { id: cursor.id }), + limit, + ) + .await + .map_err(|error| Status::internal(error.to_string()))?; + Ok(Response::new(GetVotesResponse { + votes: votes.into_iter().map(Self::vote_to_proto).collect(), + next_cursor: next_cursor.map(|cursor| Cursor { id: cursor.id }), + })) + } + + async fn get_delegations( + &self, + request: Request, + ) -> Result, Status> { + let req = request.into_inner(); + let filter = req.filter.unwrap_or_default(); + let limit = if req.limit == 0 { + 100 + } else { + req.limit.min(1000) + }; + let (delegations, next_cursor) = self + .storage + .get_delegations_filtered( + filter + .votes_token + .as_ref() + .and_then(|value| bytes_to_felt(value)), + filter + .delegator + .as_ref() + .and_then(|value| bytes_to_felt(value)), + filter + .delegatee + .as_ref() + .and_then(|value| bytes_to_felt(value)), + req.cursor.map(|cursor| DelegationCursor { id: cursor.id }), + limit, + ) + .await + .map_err(|error| Status::internal(error.to_string()))?; + Ok(Response::new(GetDelegationsResponse { + delegations: delegations + .into_iter() + .map(Self::delegation_to_proto) + .collect(), + next_cursor: next_cursor.map(|cursor| Cursor { id: cursor.id }), + })) + } + + async fn get_voting_power( + &self, + request: Request, + ) -> Result, Status> { + let filter = request.into_inner().filter.unwrap_or_default(); + let entries = self + .storage + .get_voting_power( + filter + .votes_token + .as_ref() + .and_then(|value| bytes_to_felt(value)), + filter + .account + .as_ref() + .and_then(|value| bytes_to_felt(value)), + ) + .await + .map_err(|error| Status::internal(error.to_string()))? + .into_iter() + .map(Self::voting_power_to_proto) + .collect(); + Ok(Response::new(GetVotingPowerResponse { entries })) + } + + async fn subscribe_proposals( + &self, + request: Request, + ) -> Result, Status> { + let filter = request.into_inner().filter.unwrap_or_default(); + let mut rx = self.proposal_tx.subscribe(); + Ok(Response::new(Box::pin(stream! { + while let Ok(update) = rx.recv().await { + if let Some(proposal) = &update.proposal { + if Self::matches_proposal_filter(proposal, &filter) { + yield Ok(update); + } + } + } + }))) + } + + async fn subscribe_votes( + &self, + request: Request, + ) -> Result, Status> { + let filter = request.into_inner().filter.unwrap_or_default(); + let mut rx = self.vote_tx.subscribe(); + Ok(Response::new(Box::pin(stream! { + while let Ok(update) = rx.recv().await { + if let Some(vote) = &update.vote { + if Self::matches_vote_filter(vote, &filter) { + yield Ok(update); + } + } + } + }))) + } + + async fn subscribe_delegations( + &self, + request: Request, + ) -> Result, Status> { + let filter = request.into_inner().filter.unwrap_or_default(); + let mut rx = self.delegation_tx.subscribe(); + Ok(Response::new(Box::pin(stream! { + while let Ok(update) = rx.recv().await { + if let Some(delegation) = &update.delegation { + if Self::matches_delegation_filter(delegation, &filter) { + yield Ok(update); + } + } + } + }))) + } +} diff --git a/crates/torii-governance/src/identification.rs b/crates/torii-governance/src/identification.rs new file mode 100644 index 00000000..c6c1f4f6 --- /dev/null +++ b/crates/torii-governance/src/identification.rs @@ -0,0 +1,68 @@ +use anyhow::Result; +use starknet::core::types::Felt; +use torii::etl::decoder::DecoderId; +use torii::etl::extractor::ContractAbi; +use torii::etl::identification::IdentificationRule; + +use crate::discovery::governance_decoder_id; + +pub struct GovernanceRule; + +impl GovernanceRule { + pub fn new() -> Self { + Self + } +} + +impl Default for GovernanceRule { + fn default() -> Self { + Self::new() + } +} + +impl IdentificationRule for GovernanceRule { + fn name(&self) -> &'static str { + "governance" + } + + fn decoder_ids(&self) -> Vec { + vec![governance_decoder_id()] + } + + fn identify_by_abi( + &self, + _contract_address: Felt, + _class_hash: Felt, + abi: &ContractAbi, + ) -> Result> { + let has_supports_interface = abi.has_function("supports_interface"); + let has_token = abi.has_function("token"); + let has_state = abi.has_function("state"); + let has_snapshot = abi.has_function("proposal_snapshot"); + let has_deadline = abi.has_function("proposal_deadline"); + let has_proposal_event = abi.has_event("ProposalCreated") || abi.has_event("VoteCast"); + + if has_supports_interface + && has_token + && has_state + && has_snapshot + && has_deadline + && has_proposal_event + { + Ok(vec![governance_decoder_id()]) + } else { + Ok(Vec::new()) + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn governance_rule_has_expected_decoder() { + let ids = GovernanceRule::new().decoder_ids(); + assert_eq!(ids, vec![governance_decoder_id()]); + } +} diff --git a/crates/torii-governance/src/lib.rs b/crates/torii-governance/src/lib.rs new file mode 100644 index 00000000..bc7f8f61 --- /dev/null +++ b/crates/torii-governance/src/lib.rs @@ -0,0 +1,29 @@ +pub mod decoder; +pub mod discovery; +pub mod grpc_service; +pub mod identification; +pub mod sink; +pub mod storage; + +pub mod proto { + include!("generated/torii.sinks.governance.rs"); +} + +pub const FILE_DESCRIPTOR_SET: &[u8] = include_bytes!("generated/governance_descriptor.bin"); + +pub use decoder::{ + DelegateChanged, DelegateVotesChanged, GovernanceDecoder, ProposalCanceled, ProposalCreated, + ProposalExecuted, ProposalQueued, VoteCast, VoteCastWithParams, +}; +pub use discovery::{ + bootstrap_governance_registry, fetch_current_votes, fetch_proposal_metadata, + governance_decoder_id, register_governor_and_votes_token, GovernanceBootstrap, + GovernanceMetadata, ProposalMetadata, +}; +pub use grpc_service::GovernanceService; +pub use identification::GovernanceRule; +pub use sink::GovernanceSink; +pub use storage::{ + DelegationCursor, DelegationRecord, GovernanceStorage, GovernorRecord, ProposalCursor, + ProposalRecord, VoteCursor, VoteRecord, VotingPowerRecord, VotingPowerSource, +}; diff --git a/crates/torii-governance/src/sink.rs b/crates/torii-governance/src/sink.rs new file mode 100644 index 00000000..a8162c97 --- /dev/null +++ b/crates/torii-governance/src/sink.rs @@ -0,0 +1,560 @@ +use crate::decoder::{ + DelegateChanged, DelegateVotesChanged, ProposalCanceled, ProposalCreated, ProposalExecuted, + ProposalQueued, VoteCast, VoteCastWithParams, +}; +use crate::discovery::{ + fetch_current_votes, fetch_governance_metadata, fetch_proposal_metadata, + register_governor_and_votes_token, +}; +use crate::grpc_service::GovernanceService; +use crate::proto; +use crate::storage::{ + DelegationRecord, GovernanceStorage, GovernorRecord, ProposalRecord, VoteRecord, + VotingPowerRecord, VotingPowerSource, +}; +use async_trait::async_trait; +use axum::Router; +use prost::Message; +use prost_types::Any; +use starknet::core::types::Felt; +use starknet::providers::jsonrpc::{HttpTransport, JsonRpcClient}; +use std::collections::HashMap; +use std::sync::Arc; +use tokio::sync::{Mutex, RwLock}; +use torii::etl::decoder::DecoderId; +use torii::etl::sink::{EventBus, Sink, SinkContext, TopicInfo}; +use torii::etl::{EngineDb, Envelope, TypeId}; +use torii::grpc::UpdateType; +use torii_common::u256_to_bytes; +use torii_erc20::Erc20Storage; + +pub struct GovernanceSink { + storage: Arc, + provider: Arc>, + engine_db: Arc, + registry_cache: Arc>>>, + erc20_storage: Option>, + grpc_service: Option, + event_bus: Option>, + bootstrapped_governors: Mutex>, +} + +impl GovernanceSink { + pub fn new( + storage: Arc, + provider: Arc>, + engine_db: Arc, + registry_cache: Arc>>>, + ) -> Self { + Self { + storage, + provider, + engine_db, + registry_cache, + erc20_storage: None, + grpc_service: None, + event_bus: None, + bootstrapped_governors: Mutex::new(std::collections::HashSet::new()), + } + } + + pub fn with_grpc_service(mut self, grpc_service: GovernanceService) -> Self { + self.grpc_service = Some(grpc_service); + self + } + + pub fn with_erc20_storage(mut self, erc20_storage: Arc) -> Self { + self.erc20_storage = Some(erc20_storage); + self + } + + async fn ensure_governor_bootstrapped(&self, governor: Felt) { + { + let seen = self.bootstrapped_governors.lock().await; + if seen.contains(&governor) { + return; + } + } + + match fetch_governance_metadata(self.provider.as_ref(), governor).await { + Ok(metadata) => { + let _ = register_governor_and_votes_token( + self.engine_db.as_ref(), + &self.registry_cache, + &metadata, + ) + .await; + let _ = self + .storage + .upsert_governor(&GovernorRecord { + address: metadata.governor, + votes_token: Some(metadata.votes_token), + name: metadata.name, + version: metadata.version, + src5_supported: metadata.src5_supported, + governor_interface_supported: metadata.governor_interface_supported, + }) + .await; + self.bootstrapped_governors.lock().await.insert(governor); + } + Err(error) => { + tracing::debug!( + target: "torii_governance::sink", + governor = %format!("{governor:#x}"), + error = %error, + "Governance bootstrap skipped for contract" + ); + } + } + } + + async fn refresh_voting_power( + &self, + votes_token: Felt, + account: Felt, + delegate: Option, + block_number: u64, + tx_hash: Felt, + ) { + if let Ok(voting_power) = + fetch_current_votes(self.provider.as_ref(), votes_token, account).await + { + let _ = self + .storage + .upsert_voting_power(&VotingPowerRecord { + votes_token, + account, + delegate, + voting_power, + source: VotingPowerSource::RpcGetVotes, + last_block: block_number, + last_tx_hash: tx_hash, + }) + .await; + return; + } + + if let Some(erc20_storage) = &self.erc20_storage { + if let Ok(Some(balance)) = erc20_storage.get_balance(votes_token, account).await { + let _ = self + .storage + .upsert_voting_power(&VotingPowerRecord { + votes_token, + account, + delegate, + voting_power: balance, + source: VotingPowerSource::Erc20Balance, + last_block: block_number, + last_tx_hash: tx_hash, + }) + .await; + } + } + } + + async fn hydrate_proposal_record( + &self, + governor: Felt, + proposal_id: Felt, + existing: Option<&ProposalRecord>, + fallback_proposer: Option, + ) -> (Option, u64, u64) { + let metadata = fetch_proposal_metadata(self.provider.as_ref(), governor, proposal_id).await; + let proposer = metadata + .proposer + .or(fallback_proposer) + .or(existing.and_then(|value| value.proposer)); + let snapshot = metadata + .snapshot + .or(existing + .map(|value| value.snapshot) + .filter(|value| *value != 0)) + .unwrap_or(0); + let deadline = metadata + .deadline + .or(existing + .map(|value| value.deadline) + .filter(|value| *value != 0)) + .unwrap_or(0); + + (proposer, snapshot, deadline) + } + + fn proposal_proto(record: &ProposalRecord) -> proto::Proposal { + proto::Proposal { + governor: record.governor.to_bytes_be().to_vec(), + proposal_id: record.proposal_id.to_bytes_be().to_vec(), + proposer: record.proposer.map(|value| value.to_bytes_be().to_vec()), + description: record.description.clone(), + snapshot: record.snapshot, + deadline: record.deadline, + status: record.status.clone(), + created_block: record.created_block, + created_tx_hash: record.created_tx_hash.to_bytes_be().to_vec(), + created_timestamp: record.created_timestamp.unwrap_or(0), + queued_block: record.queued_block, + executed_block: record.executed_block, + canceled_block: record.canceled_block, + } + } + + fn vote_proto(record: &VoteRecord) -> proto::Vote { + proto::Vote { + governor: record.governor.to_bytes_be().to_vec(), + proposal_id: record.proposal_id.to_bytes_be().to_vec(), + voter: record.voter.to_bytes_be().to_vec(), + support: u32::from(record.support), + weight: u256_to_bytes(record.weight), + reason: record.reason.clone(), + params: record.params.clone(), + block_number: record.block_number, + tx_hash: record.tx_hash.to_bytes_be().to_vec(), + timestamp: record.timestamp.unwrap_or(0), + } + } + + fn delegation_proto(record: &DelegationRecord) -> proto::Delegation { + proto::Delegation { + votes_token: record.votes_token.to_bytes_be().to_vec(), + delegator: record.delegator.to_bytes_be().to_vec(), + from_delegate: record.from_delegate.to_bytes_be().to_vec(), + to_delegate: record.to_delegate.to_bytes_be().to_vec(), + block_number: record.block_number, + tx_hash: record.tx_hash.to_bytes_be().to_vec(), + timestamp: record.timestamp.unwrap_or(0), + } + } + + fn publish_proposal(&self, proposal: &proto::Proposal) { + if let Some(event_bus) = &self.event_bus { + let mut buf = Vec::new(); + if proposal.encode(&mut buf).is_ok() { + event_bus.publish_protobuf( + "governance.proposals", + "governance.proposal", + &Any { + type_url: "type.googleapis.com/torii.sinks.governance.Proposal".to_string(), + value: buf, + }, + proposal, + UpdateType::Created, + |_proposal: &proto::Proposal, _filters| true, + ); + } + } + if let Some(service) = &self.grpc_service { + service.broadcast_proposal(proposal.clone()); + } + } + + fn publish_vote(&self, vote: &proto::Vote) { + if let Some(service) = &self.grpc_service { + service.broadcast_vote(vote.clone()); + } + } + + fn publish_delegation(&self, delegation: &proto::Delegation) { + if let Some(service) = &self.grpc_service { + service.broadcast_delegation(delegation.clone()); + } + } +} + +#[async_trait] +impl Sink for GovernanceSink { + fn name(&self) -> &'static str { + "governance-sink" + } + + fn interested_types(&self) -> Vec { + vec![ + TypeId::new("governance.proposal_created"), + TypeId::new("governance.proposal_queued"), + TypeId::new("governance.proposal_executed"), + TypeId::new("governance.proposal_canceled"), + TypeId::new("governance.vote_cast"), + TypeId::new("governance.vote_cast_with_params"), + TypeId::new("governance.delegate_changed"), + TypeId::new("governance.delegate_votes_changed"), + ] + } + + async fn process( + &self, + envelopes: &[Envelope], + batch: &torii::etl::extractor::ExtractionBatch, + ) -> anyhow::Result<()> { + for envelope in envelopes { + if let Some(event) = envelope.body.as_any().downcast_ref::() { + self.ensure_governor_bootstrapped(event.governor).await; + let existing = self + .storage + .get_proposal(event.governor, event.proposal_id) + .await?; + let (proposer, snapshot, deadline) = self + .hydrate_proposal_record( + event.governor, + event.proposal_id, + existing.as_ref(), + Some(event.proposer), + ) + .await; + let record = ProposalRecord { + id: existing.as_ref().map_or(0, |value| value.id), + governor: event.governor, + proposal_id: event.proposal_id, + proposer, + description: existing + .as_ref() + .and_then(|value| value.description.clone()), + snapshot, + deadline, + status: "created".to_string(), + created_block: event.block_number, + created_tx_hash: event.transaction_hash, + created_timestamp: batch + .blocks + .get(&event.block_number) + .map(|block| block.timestamp as i64), + queued_block: existing.as_ref().and_then(|value| value.queued_block), + executed_block: existing.as_ref().and_then(|value| value.executed_block), + canceled_block: existing.as_ref().and_then(|value| value.canceled_block), + }; + self.storage.upsert_proposal(&record).await?; + self.publish_proposal(&Self::proposal_proto(&record)); + } else if let Some(event) = envelope.body.as_any().downcast_ref::() { + let existing = self + .storage + .get_proposal(event.governor, event.proposal_id) + .await?; + let (proposer, snapshot, deadline) = self + .hydrate_proposal_record( + event.governor, + event.proposal_id, + existing.as_ref(), + None, + ) + .await; + let record = ProposalRecord { + id: existing.as_ref().map_or(0, |value| value.id), + governor: event.governor, + proposal_id: event.proposal_id, + proposer, + description: existing + .as_ref() + .and_then(|value| value.description.clone()), + snapshot, + deadline, + status: "queued".to_string(), + created_block: existing + .as_ref() + .map_or(event.block_number, |value| value.created_block), + created_tx_hash: existing + .as_ref() + .map_or(event.transaction_hash, |value| value.created_tx_hash), + created_timestamp: existing.as_ref().and_then(|value| value.created_timestamp), + queued_block: Some(event.block_number), + executed_block: existing.as_ref().and_then(|value| value.executed_block), + canceled_block: existing.as_ref().and_then(|value| value.canceled_block), + }; + self.storage.upsert_proposal(&record).await?; + self.publish_proposal(&Self::proposal_proto(&record)); + } else if let Some(event) = envelope.body.as_any().downcast_ref::() { + let existing = self + .storage + .get_proposal(event.governor, event.proposal_id) + .await?; + let (proposer, snapshot, deadline) = self + .hydrate_proposal_record( + event.governor, + event.proposal_id, + existing.as_ref(), + None, + ) + .await; + let record = ProposalRecord { + id: existing.as_ref().map_or(0, |value| value.id), + governor: event.governor, + proposal_id: event.proposal_id, + proposer, + description: existing + .as_ref() + .and_then(|value| value.description.clone()), + snapshot, + deadline, + status: "executed".to_string(), + created_block: existing + .as_ref() + .map_or(event.block_number, |value| value.created_block), + created_tx_hash: existing + .as_ref() + .map_or(event.transaction_hash, |value| value.created_tx_hash), + created_timestamp: existing.as_ref().and_then(|value| value.created_timestamp), + queued_block: existing.as_ref().and_then(|value| value.queued_block), + executed_block: Some(event.block_number), + canceled_block: existing.as_ref().and_then(|value| value.canceled_block), + }; + self.storage.upsert_proposal(&record).await?; + self.publish_proposal(&Self::proposal_proto(&record)); + } else if let Some(event) = envelope.body.as_any().downcast_ref::() { + let existing = self + .storage + .get_proposal(event.governor, event.proposal_id) + .await?; + let (proposer, snapshot, deadline) = self + .hydrate_proposal_record( + event.governor, + event.proposal_id, + existing.as_ref(), + None, + ) + .await; + let record = ProposalRecord { + id: existing.as_ref().map_or(0, |value| value.id), + governor: event.governor, + proposal_id: event.proposal_id, + proposer, + description: existing + .as_ref() + .and_then(|value| value.description.clone()), + snapshot, + deadline, + status: "canceled".to_string(), + created_block: existing + .as_ref() + .map_or(event.block_number, |value| value.created_block), + created_tx_hash: existing + .as_ref() + .map_or(event.transaction_hash, |value| value.created_tx_hash), + created_timestamp: existing.as_ref().and_then(|value| value.created_timestamp), + queued_block: existing.as_ref().and_then(|value| value.queued_block), + executed_block: existing.as_ref().and_then(|value| value.executed_block), + canceled_block: Some(event.block_number), + }; + self.storage.upsert_proposal(&record).await?; + self.publish_proposal(&Self::proposal_proto(&record)); + } else if let Some(event) = envelope.body.as_any().downcast_ref::() { + let record = VoteRecord { + id: 0, + governor: event.governor, + proposal_id: event.proposal_id, + voter: event.voter, + support: event.support, + weight: event.weight, + reason: event.reason.clone(), + params: Vec::new(), + block_number: event.block_number, + tx_hash: event.transaction_hash, + timestamp: batch + .blocks + .get(&event.block_number) + .map(|block| block.timestamp as i64), + }; + self.storage.insert_vote(&record).await?; + self.publish_vote(&Self::vote_proto(&record)); + } else if let Some(event) = envelope.body.as_any().downcast_ref::() + { + let record = VoteRecord { + id: 0, + governor: event.governor, + proposal_id: event.proposal_id, + voter: event.voter, + support: event.support, + weight: event.weight, + reason: event.reason.clone(), + params: serde_json::to_vec( + &event + .params + .iter() + .map(ToString::to_string) + .collect::>(), + ) + .unwrap_or_default(), + block_number: event.block_number, + tx_hash: event.transaction_hash, + timestamp: batch + .blocks + .get(&event.block_number) + .map(|block| block.timestamp as i64), + }; + self.storage.insert_vote(&record).await?; + self.publish_vote(&Self::vote_proto(&record)); + } else if let Some(event) = envelope.body.as_any().downcast_ref::() { + let record = DelegationRecord { + id: 0, + votes_token: event.votes_token, + delegator: event.delegator, + from_delegate: event.from_delegate, + to_delegate: event.to_delegate, + block_number: event.block_number, + tx_hash: event.transaction_hash, + timestamp: batch + .blocks + .get(&event.block_number) + .map(|block| block.timestamp as i64), + }; + self.storage.insert_delegation(&record).await?; + self.refresh_voting_power( + event.votes_token, + event.delegator, + Some(event.to_delegate), + event.block_number, + event.transaction_hash, + ) + .await; + self.publish_delegation(&Self::delegation_proto(&record)); + } else if let Some(event) = envelope + .body + .as_any() + .downcast_ref::() + { + self.storage + .upsert_voting_power(&VotingPowerRecord { + votes_token: event.votes_token, + account: event.delegate, + delegate: Some(event.delegate), + voting_power: event.new_votes, + source: VotingPowerSource::DelegateVotesChanged, + last_block: event.block_number, + last_tx_hash: event.transaction_hash, + }) + .await?; + } + } + + Ok(()) + } + + fn topics(&self) -> Vec { + vec![ + TopicInfo::new( + "governance.proposals", + vec!["governor".to_string()], + "Governance proposal updates", + ), + TopicInfo::new( + "governance.votes", + vec!["governor".to_string()], + "Governance vote updates", + ), + TopicInfo::new( + "governance.delegations", + vec!["votes_token".to_string()], + "Governance delegation updates", + ), + ] + } + + fn build_routes(&self) -> Router { + Router::new() + } + + async fn initialize( + &mut self, + event_bus: Arc, + _context: &SinkContext, + ) -> anyhow::Result<()> { + self.event_bus = Some(event_bus); + Ok(()) + } +} diff --git a/crates/torii-governance/src/storage.rs b/crates/torii-governance/src/storage.rs new file mode 100644 index 00000000..9504afcb --- /dev/null +++ b/crates/torii-governance/src/storage.rs @@ -0,0 +1,784 @@ +use anyhow::{Context, Result}; +use serde::{Deserialize, Serialize}; +use sqlx::{any::AnyPoolOptions, Any, Pool, Row}; +use starknet::core::types::{Felt, U256}; +use torii_common::{blob_to_felt, blob_to_u256, felt_to_blob, u256_to_blob}; + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +pub enum VotingPowerSource { + DelegateVotesChanged, + RpcGetVotes, + Erc20Balance, +} + +impl VotingPowerSource { + pub fn as_str(self) -> &'static str { + match self { + Self::DelegateVotesChanged => "delegate_votes_changed", + Self::RpcGetVotes => "rpc_get_votes", + Self::Erc20Balance => "erc20_balance", + } + } + + pub fn priority(self) -> i32 { + match self { + Self::DelegateVotesChanged => 3, + Self::RpcGetVotes => 2, + Self::Erc20Balance => 1, + } + } + + pub fn parse(value: &str) -> Self { + match value { + "delegate_votes_changed" => Self::DelegateVotesChanged, + "rpc_get_votes" => Self::RpcGetVotes, + _ => Self::Erc20Balance, + } + } +} + +#[derive(Debug, Clone)] +pub struct GovernorRecord { + pub address: Felt, + pub votes_token: Option, + pub name: Option, + pub version: Option, + pub src5_supported: bool, + pub governor_interface_supported: bool, +} + +#[derive(Debug, Clone)] +pub struct ProposalRecord { + pub id: i64, + pub governor: Felt, + pub proposal_id: Felt, + pub proposer: Option, + pub description: Option, + pub snapshot: u64, + pub deadline: u64, + pub status: String, + pub created_block: u64, + pub created_tx_hash: Felt, + pub created_timestamp: Option, + pub queued_block: Option, + pub executed_block: Option, + pub canceled_block: Option, +} + +#[derive(Debug, Clone)] +pub struct VoteRecord { + pub id: i64, + pub governor: Felt, + pub proposal_id: Felt, + pub voter: Felt, + pub support: u8, + pub weight: U256, + pub reason: Option, + pub params: Vec, + pub block_number: u64, + pub tx_hash: Felt, + pub timestamp: Option, +} + +#[derive(Debug, Clone)] +pub struct DelegationRecord { + pub id: i64, + pub votes_token: Felt, + pub delegator: Felt, + pub from_delegate: Felt, + pub to_delegate: Felt, + pub block_number: u64, + pub tx_hash: Felt, + pub timestamp: Option, +} + +#[derive(Debug, Clone)] +pub struct VotingPowerRecord { + pub votes_token: Felt, + pub account: Felt, + pub delegate: Option, + pub voting_power: U256, + pub source: VotingPowerSource, + pub last_block: u64, + pub last_tx_hash: Felt, +} + +#[derive(Debug, Clone, Copy)] +pub struct ProposalCursor { + pub id: i64, +} + +#[derive(Debug, Clone, Copy)] +pub struct VoteCursor { + pub id: i64, +} + +#[derive(Debug, Clone, Copy)] +pub struct DelegationCursor { + pub id: i64, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum Backend { + Sqlite, + Postgres, +} + +pub struct GovernanceStorage { + pool: Pool, + backend: Backend, +} + +impl GovernanceStorage { + pub async fn new(url: &str) -> Result { + sqlx::any::install_default_drivers(); + + let backend = if url.starts_with("postgres://") || url.starts_with("postgresql://") { + Backend::Postgres + } else { + Backend::Sqlite + }; + + let database_url = if backend == Backend::Sqlite && !url.starts_with("sqlite:") { + let path = std::path::Path::new(url); + if let Some(parent) = path.parent() { + if !parent.as_os_str().is_empty() { + tokio::fs::create_dir_all(parent).await?; + } + } + format!("sqlite:{}", path.display()) + } else { + url.to_string() + }; + + let pool = AnyPoolOptions::new() + .max_connections(if backend == Backend::Sqlite { 1 } else { 5 }) + .connect(&database_url) + .await?; + + let storage = Self { pool, backend }; + storage.init_schema().await?; + Ok(storage) + } + + fn table(&self, name: &str) -> String { + match self.backend { + Backend::Sqlite => name.to_string(), + Backend::Postgres => format!("governance.{name}"), + } + } + + async fn init_schema(&self) -> Result<()> { + if self.backend == Backend::Sqlite { + sqlx::query("PRAGMA journal_mode=WAL") + .execute(&self.pool) + .await?; + sqlx::query("PRAGMA synchronous=NORMAL") + .execute(&self.pool) + .await?; + } else { + sqlx::query("CREATE SCHEMA IF NOT EXISTS governance") + .execute(&self.pool) + .await?; + } + + let statements = match self.backend { + Backend::Sqlite => vec![ + "CREATE TABLE IF NOT EXISTS governors (address BLOB PRIMARY KEY, votes_token BLOB, name TEXT, version TEXT, src5_supported INTEGER NOT NULL, governor_interface_supported INTEGER NOT NULL)", + "CREATE TABLE IF NOT EXISTS proposals (id INTEGER PRIMARY KEY AUTOINCREMENT, governor BLOB NOT NULL, proposal_id BLOB NOT NULL, proposer BLOB, description TEXT, snapshot INTEGER NOT NULL DEFAULT 0, deadline INTEGER NOT NULL DEFAULT 0, status TEXT NOT NULL, created_block INTEGER NOT NULL, created_tx_hash BLOB NOT NULL, created_timestamp INTEGER, queued_block INTEGER, executed_block INTEGER, canceled_block INTEGER, UNIQUE(governor, proposal_id))", + "CREATE INDEX IF NOT EXISTS proposals_governor_idx ON proposals (governor, id DESC)", + "CREATE TABLE IF NOT EXISTS votes (id INTEGER PRIMARY KEY AUTOINCREMENT, governor BLOB NOT NULL, proposal_id BLOB NOT NULL, voter BLOB NOT NULL, support INTEGER NOT NULL, weight BLOB NOT NULL, reason TEXT, params BLOB NOT NULL, block_number INTEGER NOT NULL, tx_hash BLOB NOT NULL, timestamp INTEGER)", + "CREATE INDEX IF NOT EXISTS votes_query_idx ON votes (governor, proposal_id, voter, id DESC)", + "CREATE TABLE IF NOT EXISTS delegations (id INTEGER PRIMARY KEY AUTOINCREMENT, votes_token BLOB NOT NULL, delegator BLOB NOT NULL, from_delegate BLOB NOT NULL, to_delegate BLOB NOT NULL, block_number INTEGER NOT NULL, tx_hash BLOB NOT NULL, timestamp INTEGER)", + "CREATE INDEX IF NOT EXISTS delegations_query_idx ON delegations (votes_token, delegator, to_delegate, id DESC)", + "CREATE TABLE IF NOT EXISTS voting_power (votes_token BLOB NOT NULL, account BLOB NOT NULL, delegate BLOB, voting_power BLOB NOT NULL, source TEXT NOT NULL, source_priority INTEGER NOT NULL, last_block INTEGER NOT NULL, last_tx_hash BLOB NOT NULL, PRIMARY KEY(votes_token, account))", + ], + Backend::Postgres => vec![ + "CREATE TABLE IF NOT EXISTS governance.governors (address BYTEA PRIMARY KEY, votes_token BYTEA, name TEXT, version TEXT, src5_supported BOOLEAN NOT NULL, governor_interface_supported BOOLEAN NOT NULL)", + "CREATE TABLE IF NOT EXISTS governance.proposals (id BIGSERIAL PRIMARY KEY, governor BYTEA NOT NULL, proposal_id BYTEA NOT NULL, proposer BYTEA, description TEXT, snapshot BIGINT NOT NULL DEFAULT 0, deadline BIGINT NOT NULL DEFAULT 0, status TEXT NOT NULL, created_block BIGINT NOT NULL, created_tx_hash BYTEA NOT NULL, created_timestamp BIGINT, queued_block BIGINT, executed_block BIGINT, canceled_block BIGINT, UNIQUE(governor, proposal_id))", + "CREATE INDEX IF NOT EXISTS proposals_governor_idx ON governance.proposals (governor, id DESC)", + "CREATE TABLE IF NOT EXISTS governance.votes (id BIGSERIAL PRIMARY KEY, governor BYTEA NOT NULL, proposal_id BYTEA NOT NULL, voter BYTEA NOT NULL, support INTEGER NOT NULL, weight BYTEA NOT NULL, reason TEXT, params BYTEA NOT NULL, block_number BIGINT NOT NULL, tx_hash BYTEA NOT NULL, timestamp BIGINT)", + "CREATE INDEX IF NOT EXISTS votes_query_idx ON governance.votes (governor, proposal_id, voter, id DESC)", + "CREATE TABLE IF NOT EXISTS governance.delegations (id BIGSERIAL PRIMARY KEY, votes_token BYTEA NOT NULL, delegator BYTEA NOT NULL, from_delegate BYTEA NOT NULL, to_delegate BYTEA NOT NULL, block_number BIGINT NOT NULL, tx_hash BYTEA NOT NULL, timestamp BIGINT)", + "CREATE INDEX IF NOT EXISTS delegations_query_idx ON governance.delegations (votes_token, delegator, to_delegate, id DESC)", + "CREATE TABLE IF NOT EXISTS governance.voting_power (votes_token BYTEA NOT NULL, account BYTEA NOT NULL, delegate BYTEA, voting_power BYTEA NOT NULL, source TEXT NOT NULL, source_priority INTEGER NOT NULL, last_block BIGINT NOT NULL, last_tx_hash BYTEA NOT NULL, PRIMARY KEY(votes_token, account))", + ], + }; + + for statement in statements { + sqlx::query(statement).execute(&self.pool).await?; + } + + Ok(()) + } + + pub async fn upsert_governor(&self, record: &GovernorRecord) -> Result<()> { + let table = self.table("governors"); + let sql = match self.backend { + Backend::Sqlite => format!( + "INSERT INTO {table} (address, votes_token, name, version, src5_supported, governor_interface_supported) VALUES (?, ?, ?, ?, ?, ?) ON CONFLICT(address) DO UPDATE SET votes_token = COALESCE(excluded.votes_token, {table}.votes_token), name = COALESCE(excluded.name, {table}.name), version = COALESCE(excluded.version, {table}.version), src5_supported = excluded.src5_supported, governor_interface_supported = excluded.governor_interface_supported" + ), + Backend::Postgres => format!( + "INSERT INTO {table} (address, votes_token, name, version, src5_supported, governor_interface_supported) VALUES ($1, $2, $3, $4, $5, $6) ON CONFLICT(address) DO UPDATE SET votes_token = COALESCE(EXCLUDED.votes_token, {table}.votes_token), name = COALESCE(EXCLUDED.name, {table}.name), version = COALESCE(EXCLUDED.version, {table}.version), src5_supported = EXCLUDED.src5_supported, governor_interface_supported = EXCLUDED.governor_interface_supported" + ), + }; + + sqlx::query(&sql) + .bind(felt_to_blob(record.address)) + .bind(record.votes_token.map(felt_to_blob)) + .bind(record.name.clone()) + .bind(record.version.clone()) + .bind(record.src5_supported) + .bind(record.governor_interface_supported) + .execute(&self.pool) + .await?; + Ok(()) + } + + pub async fn get_governor(&self, governor: Felt) -> Result> { + let table = self.table("governors"); + let sql = if self.backend == Backend::Sqlite { + format!("SELECT address, votes_token, name, version, src5_supported, governor_interface_supported FROM {table} WHERE address = ?") + } else { + format!("SELECT address, votes_token, name, version, src5_supported, governor_interface_supported FROM {table} WHERE address = $1") + }; + let row = sqlx::query(&sql) + .bind(felt_to_blob(governor)) + .fetch_optional(&self.pool) + .await?; + + Ok(row.map(|row| GovernorRecord { + address: blob_to_felt(row.get::, _>(0).as_slice()), + votes_token: row + .get::>, _>(1) + .map(|value| blob_to_felt(&value)), + name: row.get(2), + version: row.get(3), + src5_supported: bool_from_row(&row, 4), + governor_interface_supported: bool_from_row(&row, 5), + })) + } + + pub async fn list_governors(&self, addresses: &[Felt]) -> Result> { + let table = self.table("governors"); + let mut sql = format!("SELECT address, votes_token, name, version, src5_supported, governor_interface_supported FROM {table}"); + let rows = if addresses.is_empty() { + sql.push_str(" ORDER BY address"); + sqlx::query(&sql).fetch_all(&self.pool).await? + } else { + let placeholders = (0..addresses.len()) + .map(|idx| match self.backend { + Backend::Sqlite => "?".to_string(), + Backend::Postgres => format!("${}", idx + 1), + }) + .collect::>() + .join(", "); + sql.push_str(&format!( + " WHERE address IN ({placeholders}) ORDER BY address" + )); + let mut query = sqlx::query(&sql); + for address in addresses { + query = query.bind(felt_to_blob(*address)); + } + query.fetch_all(&self.pool).await? + }; + + rows.into_iter() + .map(|row| { + Ok(GovernorRecord { + address: blob_to_felt(row.get::, _>(0).as_slice()), + votes_token: row + .get::>, _>(1) + .map(|value| blob_to_felt(&value)), + name: row.get(2), + version: row.get(3), + src5_supported: bool_from_row(&row, 4), + governor_interface_supported: bool_from_row(&row, 5), + }) + }) + .collect() + } + + pub async fn upsert_proposal(&self, record: &ProposalRecord) -> Result<()> { + let table = self.table("proposals"); + let sql = match self.backend { + Backend::Sqlite => format!( + "INSERT INTO {table} (governor, proposal_id, proposer, description, snapshot, deadline, status, created_block, created_tx_hash, created_timestamp, queued_block, executed_block, canceled_block) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(governor, proposal_id) DO UPDATE SET proposer = COALESCE(excluded.proposer, {table}.proposer), description = COALESCE(excluded.description, {table}.description), snapshot = CASE WHEN excluded.snapshot = 0 THEN {table}.snapshot ELSE excluded.snapshot END, deadline = CASE WHEN excluded.deadline = 0 THEN {table}.deadline ELSE excluded.deadline END, status = excluded.status, queued_block = COALESCE(excluded.queued_block, {table}.queued_block), executed_block = COALESCE(excluded.executed_block, {table}.executed_block), canceled_block = COALESCE(excluded.canceled_block, {table}.canceled_block)" + ), + Backend::Postgres => format!( + "INSERT INTO {table} (governor, proposal_id, proposer, description, snapshot, deadline, status, created_block, created_tx_hash, created_timestamp, queued_block, executed_block, canceled_block) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13) ON CONFLICT(governor, proposal_id) DO UPDATE SET proposer = COALESCE(EXCLUDED.proposer, {table}.proposer), description = COALESCE(EXCLUDED.description, {table}.description), snapshot = CASE WHEN EXCLUDED.snapshot = 0 THEN {table}.snapshot ELSE EXCLUDED.snapshot END, deadline = CASE WHEN EXCLUDED.deadline = 0 THEN {table}.deadline ELSE EXCLUDED.deadline END, status = EXCLUDED.status, queued_block = COALESCE(EXCLUDED.queued_block, {table}.queued_block), executed_block = COALESCE(EXCLUDED.executed_block, {table}.executed_block), canceled_block = COALESCE(EXCLUDED.canceled_block, {table}.canceled_block)" + ), + }; + + sqlx::query(&sql) + .bind(felt_to_blob(record.governor)) + .bind(felt_to_blob(record.proposal_id)) + .bind(record.proposer.map(felt_to_blob)) + .bind(record.description.clone()) + .bind(record.snapshot as i64) + .bind(record.deadline as i64) + .bind(record.status.clone()) + .bind(record.created_block as i64) + .bind(felt_to_blob(record.created_tx_hash)) + .bind(record.created_timestamp) + .bind(record.queued_block.map(|value| value as i64)) + .bind(record.executed_block.map(|value| value as i64)) + .bind(record.canceled_block.map(|value| value as i64)) + .execute(&self.pool) + .await?; + + Ok(()) + } + + pub async fn get_proposal( + &self, + governor: Felt, + proposal_id: Felt, + ) -> Result> { + let table = self.table("proposals"); + let sql = if self.backend == Backend::Sqlite { + format!("SELECT id, governor, proposal_id, proposer, description, snapshot, deadline, status, created_block, created_tx_hash, created_timestamp, queued_block, executed_block, canceled_block FROM {table} WHERE governor = ? AND proposal_id = ?") + } else { + format!("SELECT id, governor, proposal_id, proposer, description, snapshot, deadline, status, created_block, created_tx_hash, created_timestamp, queued_block, executed_block, canceled_block FROM {table} WHERE governor = $1 AND proposal_id = $2") + }; + let row = sqlx::query(&sql) + .bind(felt_to_blob(governor)) + .bind(felt_to_blob(proposal_id)) + .fetch_optional(&self.pool) + .await?; + row.map(parse_proposal_row).transpose() + } + + pub async fn get_proposals_filtered( + &self, + governor: Option, + proposer: Option, + status: Option<&str>, + cursor: Option, + limit: u32, + ) -> Result<(Vec, Option)> { + let table = self.table("proposals"); + let mut sql = format!("SELECT id, governor, proposal_id, proposer, description, snapshot, deadline, status, created_block, created_tx_hash, created_timestamp, queued_block, executed_block, canceled_block FROM {table} WHERE 1=1"); + let mut binds: Vec = Vec::new(); + if let Some(governor) = governor { + sql.push_str(" AND governor = "); + push_placeholder(&mut sql, self.backend, binds.len() + 1); + binds.push(BindValue::Bytes(felt_to_blob(governor))); + } + if let Some(proposer) = proposer { + sql.push_str(" AND proposer = "); + push_placeholder(&mut sql, self.backend, binds.len() + 1); + binds.push(BindValue::Bytes(felt_to_blob(proposer))); + } + if let Some(status) = status { + sql.push_str(" AND status = "); + push_placeholder(&mut sql, self.backend, binds.len() + 1); + binds.push(BindValue::String(status.to_string())); + } + if let Some(cursor) = cursor { + sql.push_str(" AND id < "); + push_placeholder(&mut sql, self.backend, binds.len() + 1); + binds.push(BindValue::I64(cursor.id)); + } + sql.push_str(" ORDER BY id DESC LIMIT "); + push_placeholder(&mut sql, self.backend, binds.len() + 1); + binds.push(BindValue::I64(limit as i64 + 1)); + + let rows = bind_query(sqlx::query(&sql), binds) + .fetch_all(&self.pool) + .await?; + let mut proposals = rows + .into_iter() + .map(parse_proposal_row) + .collect::>>()?; + let has_more = proposals.len() as u32 > limit; + let next_cursor = if proposals.len() as u32 > limit { + let last = proposals.pop().context("expected extra proposal row")?; + Some(ProposalCursor { id: last.id }) + } else { + None + }; + Ok((proposals, next_cursor.filter(|_| has_more))) + } + + pub async fn insert_vote(&self, record: &VoteRecord) -> Result<()> { + let table = self.table("votes"); + let sql = if self.backend == Backend::Sqlite { + format!("INSERT INTO {table} (governor, proposal_id, voter, support, weight, reason, params, block_number, tx_hash, timestamp) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)") + } else { + format!("INSERT INTO {table} (governor, proposal_id, voter, support, weight, reason, params, block_number, tx_hash, timestamp) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)") + }; + sqlx::query(&sql) + .bind(felt_to_blob(record.governor)) + .bind(felt_to_blob(record.proposal_id)) + .bind(felt_to_blob(record.voter)) + .bind(record.support as i32) + .bind(u256_to_blob(record.weight)) + .bind(record.reason.clone()) + .bind(record.params.clone()) + .bind(record.block_number as i64) + .bind(felt_to_blob(record.tx_hash)) + .bind(record.timestamp) + .execute(&self.pool) + .await?; + Ok(()) + } + + pub async fn get_votes_filtered( + &self, + governor: Option, + proposal_id: Option, + voter: Option, + cursor: Option, + limit: u32, + ) -> Result<(Vec, Option)> { + let table = self.table("votes"); + let mut sql = format!("SELECT id, governor, proposal_id, voter, support, weight, reason, params, block_number, tx_hash, timestamp FROM {table} WHERE 1=1"); + let mut binds = Vec::new(); + if let Some(governor) = governor { + sql.push_str(" AND governor = "); + push_placeholder(&mut sql, self.backend, binds.len() + 1); + binds.push(BindValue::Bytes(felt_to_blob(governor))); + } + if let Some(proposal_id) = proposal_id { + sql.push_str(" AND proposal_id = "); + push_placeholder(&mut sql, self.backend, binds.len() + 1); + binds.push(BindValue::Bytes(felt_to_blob(proposal_id))); + } + if let Some(voter) = voter { + sql.push_str(" AND voter = "); + push_placeholder(&mut sql, self.backend, binds.len() + 1); + binds.push(BindValue::Bytes(felt_to_blob(voter))); + } + if let Some(cursor) = cursor { + sql.push_str(" AND id < "); + push_placeholder(&mut sql, self.backend, binds.len() + 1); + binds.push(BindValue::I64(cursor.id)); + } + sql.push_str(" ORDER BY id DESC LIMIT "); + push_placeholder(&mut sql, self.backend, binds.len() + 1); + binds.push(BindValue::I64(limit as i64 + 1)); + let rows = bind_query(sqlx::query(&sql), binds) + .fetch_all(&self.pool) + .await?; + let mut votes = rows + .into_iter() + .map(parse_vote_row) + .collect::>>()?; + let next_cursor = if votes.len() as u32 > limit { + let last = votes.pop().context("expected extra vote row")?; + Some(VoteCursor { id: last.id }) + } else { + None + }; + Ok((votes, next_cursor)) + } + + pub async fn insert_delegation(&self, record: &DelegationRecord) -> Result<()> { + let table = self.table("delegations"); + let sql = if self.backend == Backend::Sqlite { + format!("INSERT INTO {table} (votes_token, delegator, from_delegate, to_delegate, block_number, tx_hash, timestamp) VALUES (?, ?, ?, ?, ?, ?, ?)") + } else { + format!("INSERT INTO {table} (votes_token, delegator, from_delegate, to_delegate, block_number, tx_hash, timestamp) VALUES ($1, $2, $3, $4, $5, $6, $7)") + }; + sqlx::query(&sql) + .bind(felt_to_blob(record.votes_token)) + .bind(felt_to_blob(record.delegator)) + .bind(felt_to_blob(record.from_delegate)) + .bind(felt_to_blob(record.to_delegate)) + .bind(record.block_number as i64) + .bind(felt_to_blob(record.tx_hash)) + .bind(record.timestamp) + .execute(&self.pool) + .await?; + Ok(()) + } + + pub async fn get_delegations_filtered( + &self, + votes_token: Option, + delegator: Option, + delegatee: Option, + cursor: Option, + limit: u32, + ) -> Result<(Vec, Option)> { + let table = self.table("delegations"); + let mut sql = format!("SELECT id, votes_token, delegator, from_delegate, to_delegate, block_number, tx_hash, timestamp FROM {table} WHERE 1=1"); + let mut binds = Vec::new(); + if let Some(votes_token) = votes_token { + sql.push_str(" AND votes_token = "); + push_placeholder(&mut sql, self.backend, binds.len() + 1); + binds.push(BindValue::Bytes(felt_to_blob(votes_token))); + } + if let Some(delegator) = delegator { + sql.push_str(" AND delegator = "); + push_placeholder(&mut sql, self.backend, binds.len() + 1); + binds.push(BindValue::Bytes(felt_to_blob(delegator))); + } + if let Some(delegatee) = delegatee { + sql.push_str(" AND to_delegate = "); + push_placeholder(&mut sql, self.backend, binds.len() + 1); + binds.push(BindValue::Bytes(felt_to_blob(delegatee))); + } + if let Some(cursor) = cursor { + sql.push_str(" AND id < "); + push_placeholder(&mut sql, self.backend, binds.len() + 1); + binds.push(BindValue::I64(cursor.id)); + } + sql.push_str(" ORDER BY id DESC LIMIT "); + push_placeholder(&mut sql, self.backend, binds.len() + 1); + binds.push(BindValue::I64(limit as i64 + 1)); + let rows = bind_query(sqlx::query(&sql), binds) + .fetch_all(&self.pool) + .await?; + let mut delegations = rows + .into_iter() + .map(parse_delegation_row) + .collect::>>()?; + let next_cursor = if delegations.len() as u32 > limit { + let last = delegations.pop().context("expected extra delegation row")?; + Some(DelegationCursor { id: last.id }) + } else { + None + }; + Ok((delegations, next_cursor)) + } + + pub async fn upsert_voting_power(&self, record: &VotingPowerRecord) -> Result<()> { + let existing = self + .get_voting_power_entry(record.votes_token, record.account) + .await?; + if existing + .as_ref() + .is_some_and(|current| current.source.priority() > record.source.priority()) + { + return Ok(()); + } + + let table = self.table("voting_power"); + let sql = match self.backend { + Backend::Sqlite => format!( + "INSERT INTO {table} (votes_token, account, delegate, voting_power, source, source_priority, last_block, last_tx_hash) VALUES (?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(votes_token, account) DO UPDATE SET delegate = excluded.delegate, voting_power = excluded.voting_power, source = excluded.source, source_priority = excluded.source_priority, last_block = excluded.last_block, last_tx_hash = excluded.last_tx_hash" + ), + Backend::Postgres => format!( + "INSERT INTO {table} (votes_token, account, delegate, voting_power, source, source_priority, last_block, last_tx_hash) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) ON CONFLICT(votes_token, account) DO UPDATE SET delegate = EXCLUDED.delegate, voting_power = EXCLUDED.voting_power, source = EXCLUDED.source, source_priority = EXCLUDED.source_priority, last_block = EXCLUDED.last_block, last_tx_hash = EXCLUDED.last_tx_hash" + ), + }; + sqlx::query(&sql) + .bind(felt_to_blob(record.votes_token)) + .bind(felt_to_blob(record.account)) + .bind(record.delegate.map(felt_to_blob)) + .bind(u256_to_blob(record.voting_power)) + .bind(record.source.as_str()) + .bind(record.source.priority()) + .bind(record.last_block as i64) + .bind(felt_to_blob(record.last_tx_hash)) + .execute(&self.pool) + .await?; + Ok(()) + } + + async fn get_voting_power_entry( + &self, + votes_token: Felt, + account: Felt, + ) -> Result> { + let table = self.table("voting_power"); + let sql = if self.backend == Backend::Sqlite { + format!("SELECT votes_token, account, delegate, voting_power, source, last_block, last_tx_hash FROM {table} WHERE votes_token = ? AND account = ?") + } else { + format!("SELECT votes_token, account, delegate, voting_power, source, last_block, last_tx_hash FROM {table} WHERE votes_token = $1 AND account = $2") + }; + let row = sqlx::query(&sql) + .bind(felt_to_blob(votes_token)) + .bind(felt_to_blob(account)) + .fetch_optional(&self.pool) + .await?; + row.map(parse_voting_power_row).transpose() + } + + pub async fn get_voting_power( + &self, + votes_token: Option, + account: Option, + ) -> Result> { + let table = self.table("voting_power"); + let mut sql = format!("SELECT votes_token, account, delegate, voting_power, source, last_block, last_tx_hash FROM {table} WHERE 1=1"); + let mut binds = Vec::new(); + if let Some(votes_token) = votes_token { + sql.push_str(" AND votes_token = "); + push_placeholder(&mut sql, self.backend, binds.len() + 1); + binds.push(BindValue::Bytes(felt_to_blob(votes_token))); + } + if let Some(account) = account { + sql.push_str(" AND account = "); + push_placeholder(&mut sql, self.backend, binds.len() + 1); + binds.push(BindValue::Bytes(felt_to_blob(account))); + } + sql.push_str(" ORDER BY votes_token, account"); + bind_query(sqlx::query(&sql), binds) + .fetch_all(&self.pool) + .await? + .into_iter() + .map(parse_voting_power_row) + .collect() + } +} + +enum BindValue { + Bytes(Vec), + String(String), + I64(i64), +} + +fn push_placeholder(sql: &mut String, backend: Backend, index: usize) { + match backend { + Backend::Sqlite => sql.push('?'), + Backend::Postgres => sql.push_str(&format!("${index}")), + } +} + +fn bind_query<'q>( + mut query: sqlx::query::Query<'q, Any, sqlx::any::AnyArguments<'q>>, + binds: Vec, +) -> sqlx::query::Query<'q, Any, sqlx::any::AnyArguments<'q>> { + for bind in binds { + query = match bind { + BindValue::Bytes(value) => query.bind(value), + BindValue::String(value) => query.bind(value), + BindValue::I64(value) => query.bind(value), + }; + } + query +} + +fn parse_proposal_row(row: sqlx::any::AnyRow) -> Result { + Ok(ProposalRecord { + id: row.get(0), + governor: blob_to_felt(row.get::, _>(1).as_slice()), + proposal_id: blob_to_felt(row.get::, _>(2).as_slice()), + proposer: row + .get::>, _>(3) + .map(|value| blob_to_felt(&value)), + description: row.get(4), + snapshot: row.get::(5) as u64, + deadline: row.get::(6) as u64, + status: row.get(7), + created_block: row.get::(8) as u64, + created_tx_hash: blob_to_felt(row.get::, _>(9).as_slice()), + created_timestamp: row.get(10), + queued_block: row.get::, _>(11).map(|value| value as u64), + executed_block: row.get::, _>(12).map(|value| value as u64), + canceled_block: row.get::, _>(13).map(|value| value as u64), + }) +} + +fn parse_vote_row(row: sqlx::any::AnyRow) -> Result { + Ok(VoteRecord { + id: row.get(0), + governor: blob_to_felt(row.get::, _>(1).as_slice()), + proposal_id: blob_to_felt(row.get::, _>(2).as_slice()), + voter: blob_to_felt(row.get::, _>(3).as_slice()), + support: row.get::(4) as u8, + weight: blob_to_u256(row.get::, _>(5).as_slice()), + reason: row.get(6), + params: row.get(7), + block_number: row.get::(8) as u64, + tx_hash: blob_to_felt(row.get::, _>(9).as_slice()), + timestamp: row.get(10), + }) +} + +fn parse_delegation_row(row: sqlx::any::AnyRow) -> Result { + Ok(DelegationRecord { + id: row.get(0), + votes_token: blob_to_felt(row.get::, _>(1).as_slice()), + delegator: blob_to_felt(row.get::, _>(2).as_slice()), + from_delegate: blob_to_felt(row.get::, _>(3).as_slice()), + to_delegate: blob_to_felt(row.get::, _>(4).as_slice()), + block_number: row.get::(5) as u64, + tx_hash: blob_to_felt(row.get::, _>(6).as_slice()), + timestamp: row.get(7), + }) +} + +fn parse_voting_power_row(row: sqlx::any::AnyRow) -> Result { + Ok(VotingPowerRecord { + votes_token: blob_to_felt(row.get::, _>(0).as_slice()), + account: blob_to_felt(row.get::, _>(1).as_slice()), + delegate: row + .get::>, _>(2) + .map(|value| blob_to_felt(&value)), + voting_power: blob_to_u256(row.get::, _>(3).as_slice()), + source: VotingPowerSource::parse(&row.get::(4)), + last_block: row.get::(5) as u64, + last_tx_hash: blob_to_felt(row.get::, _>(6).as_slice()), + }) +} + +fn bool_from_row(row: &sqlx::any::AnyRow, index: usize) -> bool { + row.try_get::(index) + .unwrap_or_else(|_| row.get::(index) != 0) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn stores_and_reads_governance_records() { + let storage = GovernanceStorage::new("sqlite::memory:").await.unwrap(); + let governor = GovernorRecord { + address: Felt::from(1u64), + votes_token: Some(Felt::from(2u64)), + name: Some("DAO".to_string()), + version: Some("1".to_string()), + src5_supported: true, + governor_interface_supported: true, + }; + storage.upsert_governor(&governor).await.unwrap(); + let stored = storage + .get_governor(governor.address) + .await + .unwrap() + .unwrap(); + assert_eq!(stored.votes_token, governor.votes_token); + } + + #[tokio::test] + async fn voting_power_priority_prefers_delegate_votes_changed() { + let storage = GovernanceStorage::new("sqlite::memory:").await.unwrap(); + let token = Felt::from(10u64); + let account = Felt::from(20u64); + storage + .upsert_voting_power(&VotingPowerRecord { + votes_token: token, + account, + delegate: None, + voting_power: U256::from(10u64), + source: VotingPowerSource::DelegateVotesChanged, + last_block: 10, + last_tx_hash: Felt::from(30u64), + }) + .await + .unwrap(); + storage + .upsert_voting_power(&VotingPowerRecord { + votes_token: token, + account, + delegate: None, + voting_power: U256::from(5u64), + source: VotingPowerSource::Erc20Balance, + last_block: 11, + last_tx_hash: Felt::from(31u64), + }) + .await + .unwrap(); + let entries = storage + .get_voting_power(Some(token), Some(account)) + .await + .unwrap(); + assert_eq!(entries[0].voting_power, U256::from(10u64)); + assert_eq!(entries[0].source, VotingPowerSource::DelegateVotesChanged); + } +} diff --git a/crates/torii-runtime-common/src/database.rs b/crates/torii-runtime-common/src/database.rs index ef7aeec0..893cdb7c 100644 --- a/crates/torii-runtime-common/src/database.rs +++ b/crates/torii-runtime-common/src/database.rs @@ -76,10 +76,12 @@ pub fn resolve_single_db_setup(db_path: &str, database_url: Option<&str>) -> Sin pub struct TokenDbSetup { pub engine_url: String, pub erc20_url: String, + pub governance_url: String, pub erc721_url: String, pub erc1155_url: String, pub engine_backend: DatabaseBackend, pub erc20_backend: DatabaseBackend, + pub governance_backend: DatabaseBackend, pub erc721_backend: DatabaseBackend, pub erc1155_backend: DatabaseBackend, } @@ -99,6 +101,12 @@ pub fn resolve_token_db_setup( db_dir, "erc20.db", ); + let governance_url = resolve_storage_url( + storage_database_url, + engine_database_url, + db_dir, + "governance.db", + ); let erc721_url = resolve_storage_url( storage_database_url, engine_database_url, @@ -114,6 +122,7 @@ pub fn resolve_token_db_setup( let engine_backend = backend_from_url_or_path(&engine_url); let erc20_backend = backend_from_url_or_path(&erc20_url); + let governance_backend = backend_from_url_or_path(&governance_url); let erc721_backend = backend_from_url_or_path(&erc721_url); let erc1155_backend = backend_from_url_or_path(&erc1155_url); @@ -121,6 +130,7 @@ pub fn resolve_token_db_setup( .map(backend_from_url_or_path) .is_some_and(|backend| backend == DatabaseBackend::Postgres) && (erc20_backend != DatabaseBackend::Postgres + || governance_backend != DatabaseBackend::Postgres || erc721_backend != DatabaseBackend::Postgres || erc1155_backend != DatabaseBackend::Postgres) { @@ -132,10 +142,12 @@ pub fn resolve_token_db_setup( Ok(TokenDbSetup { engine_url, erc20_url, + governance_url, erc721_url, erc1155_url, engine_backend, erc20_backend, + governance_backend, erc721_backend, erc1155_backend, }) @@ -165,6 +177,7 @@ mod tests { assert_eq!(setup.erc20_backend, DatabaseBackend::Sqlite); assert!(setup.engine_url.ends_with("engine.db")); assert!(setup.erc20_url.ends_with("erc20.db")); + assert!(setup.governance_url.ends_with("governance.db")); } #[test] @@ -191,6 +204,7 @@ mod tests { ) .unwrap(); assert_eq!(setup.engine_backend, DatabaseBackend::Postgres); + assert_eq!(setup.governance_backend, DatabaseBackend::Postgres); assert_eq!(setup.erc721_backend, DatabaseBackend::Postgres); }