Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
248 changes: 242 additions & 6 deletions crates/cache/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,15 @@ pub trait Cache: ReadOnlyCache + Send + Sync + std::fmt::Debug {
/// Mark a token as registered.
async fn mark_token_registered(&self, token_id: TokenId);

/// Rebuild the token-registration registry from committed storage.
///
/// Used after a rollback: `mark_token_registered` mutates the registry before SQL
/// commits, so a rolled-back chunk can leave the cache claiming a token is
/// registered when the storage row is gone. Resetting from `storage.token_ids()`
/// restores the cache to the last committed state without losing tokens that
/// previous chunks did register successfully.
async fn reset_token_registry(&self) -> Result<(), CacheError>;

/// Clear the balances diff.
async fn clear_balances_diff(&self);

Expand Down Expand Up @@ -134,6 +143,10 @@ impl Cache for InMemoryCache {
self.erc_cache.mark_token_registered(token_id).await
}

async fn reset_token_registry(&self) -> Result<(), CacheError> {
self.erc_cache.reset_token_registry().await
}

async fn clear_balances_diff(&self) {
self.erc_cache.balances_diff.clear();
self.erc_cache.balances_diff.shrink_to_fit();
Expand Down Expand Up @@ -252,23 +265,40 @@ pub struct ErcCache {
// the registry is a map of token_id to a mutex that is used to track if the token is registered
// we need a mutex for the token state to prevent race conditions in case of multiple token regs
pub token_id_registry: DashMap<TokenId, TokenState>,
storage: Arc<dyn ReadOnlyStorage>,
}

impl ErcCache {
pub async fn new(storage: Arc<dyn ReadOnlyStorage>) -> Result<Self, Error> {
// read existing token_id's from balances table and cache them
let token_id_registry: HashSet<TokenId> = storage.token_ids().await?;
let token_id_registry = Self::load_token_registry(&*storage).await?;

Ok(Self {
balances_diff: DashMap::new(),
total_supply_diff: DashMap::new(),
token_id_registry: token_id_registry
.into_iter()
.map(|token_id| (token_id, TokenState::Registered))
.collect(),
token_id_registry,
storage,
})
}

async fn load_token_registry(
storage: &dyn ReadOnlyStorage,
) -> Result<DashMap<TokenId, TokenState>, Error> {
let token_ids: HashSet<TokenId> = storage.token_ids().await?;
Ok(token_ids
.into_iter()
.map(|token_id| (token_id, TokenState::Registered))
.collect())
}

pub async fn reset_token_registry(&self) -> Result<(), Error> {
let rebuilt = Self::load_token_registry(&*self.storage).await?;
self.token_id_registry.clear();
for (token_id, state) in rebuilt {
self.token_id_registry.insert(token_id, state);
}
Ok(())
}

pub async fn get_token_registration_lock(&self, token_id: TokenId) -> Option<Arc<Mutex<()>>> {
let entry = self.token_id_registry.entry(token_id);
match entry {
Expand Down Expand Up @@ -486,3 +516,209 @@ pub fn get_entrypoint_name_from_class(class: &ClassAbi, selector: Felt) -> Optio
}),
}
}

#[cfg(test)]
mod tests {
use super::*;
use std::sync::Mutex;
use torii_proto::{
Achievement, AchievementQuery, Activity, ActivityQuery, AggregationEntry, AggregationQuery,
Contract, ContractQuery, Controller, ControllerQuery, Event, EventQuery, Page,
PlayerAchievementEntry, PlayerAchievementQuery, Query, SearchQuery, SearchResponse, Token,
TokenBalance, TokenBalanceQuery, TokenContract, TokenContractQuery, TokenQuery,
TokenTransfer, TokenTransferQuery, Transaction, TransactionQuery,
};
use torii_storage::StorageError;

/// Test stub that only services `token_ids` and `models`. The rest of `ReadOnlyStorage`
/// panics — these tests do not exercise any of those methods.
#[derive(Debug)]
struct StubStorage {
token_ids: Mutex<HashSet<TokenId>>,
models: Mutex<Vec<Model>>,
}

impl StubStorage {
fn new() -> Arc<Self> {
Arc::new(Self {
token_ids: Mutex::new(HashSet::new()),
models: Mutex::new(Vec::new()),
})
}

fn set_committed_tokens(&self, ids: Vec<TokenId>) {
*self.token_ids.lock().unwrap() = ids.into_iter().collect();
}
}

#[async_trait]
impl ReadOnlyStorage for StubStorage {
fn as_read_only(&self) -> &dyn ReadOnlyStorage {
self
}

async fn model(&self, _world: Felt, _selector: Felt) -> Result<Model, StorageError> {
unimplemented!()
}

async fn model_optional(
&self,
_world: Felt,
_selector: Felt,
) -> Result<Option<Model>, StorageError> {
unimplemented!()
}

async fn models(
&self,
_world_addresses: &[Felt],
_selectors: &[Felt],
) -> Result<Vec<Model>, StorageError> {
Ok(self.models.lock().unwrap().clone())
}

async fn token_ids(&self) -> Result<HashSet<TokenId>, StorageError> {
Ok(self.token_ids.lock().unwrap().clone())
}

async fn controllers(
&self,
_query: &ControllerQuery,
) -> Result<Page<Controller>, StorageError> {
unimplemented!()
}
async fn contracts(&self, _query: &ContractQuery) -> Result<Vec<Contract>, StorageError> {
unimplemented!()
}
async fn tokens(&self, _query: &TokenQuery) -> Result<Page<Token>, StorageError> {
unimplemented!()
}
async fn token_balances(
&self,
_query: &TokenBalanceQuery,
) -> Result<Page<TokenBalance>, StorageError> {
unimplemented!()
}
async fn token_contracts(
&self,
_query: &TokenContractQuery,
) -> Result<Page<TokenContract>, StorageError> {
unimplemented!()
}
async fn token_transfers(
&self,
_query: &TokenTransferQuery,
) -> Result<Page<TokenTransfer>, StorageError> {
unimplemented!()
}
async fn transactions(
&self,
_query: &TransactionQuery,
) -> Result<Page<Transaction>, StorageError> {
unimplemented!()
}
async fn events(&self, _query: EventQuery) -> Result<Page<Event>, StorageError> {
unimplemented!()
}
async fn entities(&self, _query: &Query) -> Result<Page<Entity>, StorageError> {
unimplemented!()
}
async fn event_messages(&self, _query: &Query) -> Result<Page<Entity>, StorageError> {
unimplemented!()
}
async fn entity_model(
&self,
_world: Felt,
_entity_id: Felt,
_model_selector: Felt,
) -> Result<Option<dojo_types::schema::Ty>, StorageError> {
unimplemented!()
}
async fn aggregations(
&self,
_query: &AggregationQuery,
) -> Result<Page<AggregationEntry>, StorageError> {
unimplemented!()
}
async fn activities(&self, _query: &ActivityQuery) -> Result<Page<Activity>, StorageError> {
unimplemented!()
}
async fn achievements(
&self,
_query: &AchievementQuery,
) -> Result<Page<Achievement>, StorageError> {
unimplemented!()
}
async fn player_achievements(
&self,
_query: &PlayerAchievementQuery,
) -> Result<Page<PlayerAchievementEntry>, StorageError> {
unimplemented!()
}
async fn search(&self, _query: &SearchQuery) -> Result<SearchResponse, StorageError> {
unimplemented!()
}
}

use torii_proto::schema::Entity;

fn token_id(byte: u8) -> TokenId {
TokenId::Contract(Felt::from(byte))
}

/// Mirrors the production hazard: `mark_token_registered` runs before SQL commit;
/// rollback drops the SQL but leaves the cache claiming the token is registered.
/// `reset_token_registry` must restore the cache to "what storage actually has".
#[tokio::test]
async fn reset_token_registry_drops_uncommitted_marks() {
let storage = StubStorage::new();
// T1 was registered in a previous (committed) chunk.
storage.set_committed_tokens(vec![token_id(1)]);

let cache = InMemoryCache::new(storage.clone()).await.unwrap();

// T2 gets marked in this chunk but the SQL row never lands (rollback).
cache.mark_token_registered(token_id(2)).await;
assert!(cache.is_token_registered(&token_id(1)).await);
assert!(cache.is_token_registered(&token_id(2)).await);

cache.reset_token_registry().await.unwrap();

assert!(cache.is_token_registered(&token_id(1)).await);
assert!(!cache.is_token_registered(&token_id(2).clone()).await);
}

/// `clear_models` is the model-cache half of the rollback recovery. After clearing,
/// `cache.model()` must report missing — callers fall through to storage which
/// reflects the committed (rolled-back) state.
#[tokio::test]
async fn clear_models_empties_the_cache() {
let storage = StubStorage::new();
let cache = InMemoryCache::new(storage).await.unwrap();

let world = Felt::from(0xa);
let selector = Felt::from(0xb);
let model = Model {
world_address: world,
namespace: "ns".into(),
name: "M".into(),
selector,
class_hash: Felt::ZERO,
contract_address: Felt::ZERO,
packed_size: 0,
unpacked_size: 0,
layout: dojo_world::contracts::abigen::model::Layout::Fixed(vec![]),
schema: dojo_types::schema::Ty::Tuple(vec![]),
use_legacy_store: true,
};
cache.register_model(world, selector, model).await;
assert!(cache.model(world, selector).await.is_ok());

cache.clear_models().await;

assert!(matches!(
cache.model(world, selector).await,
Err(CacheError::ModelNotFound(s)) if s == selector
));
}
}
3 changes: 3 additions & 0 deletions crates/indexer/engine/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,9 @@ impl<P: Provider + Send + Sync + Clone + std::fmt::Debug + 'static> Engine<P> {
error!(target: LOG_TARGET, error = ?e, "Processing fetched data.");
processing_erroring_out = true;
self.storage.rollback().await?;
self.cache.clear_balances_diff().await;
self.cache.clear_models().await;
self.cache.reset_token_registry().await?;
self.task_manager.clear_tasks();
gauge!("torii_indexer_backoff_delay_seconds", "operation" => "process").set(processing_backoff_delay.as_secs_f64());
sleep(processing_backoff_delay).await;
Expand Down
Loading