diff --git a/README.md b/README.md index 3c93876..166dc94 100644 --- a/README.md +++ b/README.md @@ -37,7 +37,7 @@ let downloads = collection::("downloads", DB {}) .with_index::() .with_index::() .with_index::() - build(); + .build(); downloads.save(dl)?; let my_dl = downloads.get(&dl.info_hash)?; @@ -64,10 +64,10 @@ pub struct Download { size: u64, } -#[derive(Clone, Copy, Eq, PartialEq)] +#[derive(Clone, Copy, Eq, PartialEq, Debug)] pub struct InfoHash([u8; 20]); -#[derive(Clone, Copy, Eq, PartialEq)] +#[derive(Clone, Copy, Eq, PartialEq, Debug)] pub enum Status { Queued, Submitted, diff --git a/src/collection.rs b/src/collection.rs index cfcf5ae..83892be 100644 --- a/src/collection.rs +++ b/src/collection.rs @@ -4,7 +4,10 @@ use crate::index::{Index, IndexKind}; use crate::index_registry::{Cons, ContainsIndex, IndexRegistry, Nil}; use crate::key::Key; use crate::scan::IndexScan; -use crate::store::{MultiStore, MultiStoreWriteHandle, ReadKVStore, WriteKVStore}; +use crate::store::{ + MultiStore, MultiStoreReadHandle, MultiStoreWriteHandle, ReadKVStore, WriteKVStore, +}; +use std::borrow::Borrow; use std::marker::PhantomData; pub struct Collection @@ -36,41 +39,149 @@ where } } - pub fn insert(&self, value: Record) -> Result<(), Error> { + /// Inserts a new record into the collection. + /// + /// Returns an error if a record with the same primary key already exists. + /// + /// All indexes are updated atomically within the same transaction. + pub fn insert(&self, value: impl Borrow) -> Result<(), Error> { + let value = value.borrow(); let pk = value.key(); let enc_pk = pk.encode(); + let mut tx = self.db.write(self.name)?; + let mut store = tx.open_store(Self::MAIN_STORE)?; - { - let mut store = tx.open_store(Self::MAIN_STORE)?; + if store.get(&enc_pk)?.is_some() { + Err(Error::AlreadyExists(format!("{:?}", pk)))? + } - if store.get(&enc_pk)?.is_some() { - Err(Error::AlreadyExists(self.name.to_string()))? - } + store.set(&enc_pk, &value.to_bytes()?)?; - store.set(&enc_pk, &value.to_bytes()?)?; - } + Indexes::update(&mut tx, &pk, None, value)?; tx.commit().map_err(Error::Backend) } - pub fn get(&self, _key: Record::Key<'_>) -> Result, Error> { - Ok(None) + /// Updates an existing record in the collection. + /// + /// Returns an error if the record does not already exist. + /// + /// Indexes are automatically updated when indexed fields change. + pub fn update(&self, value: impl Borrow) -> Result<(), Error> { + let value = value.borrow(); + let pk = value.key(); + let enc_pk = pk.encode(); + + let mut tx = self.db.write(self.name)?; + let mut store = tx.open_store(Self::MAIN_STORE)?; + + let old = store + .get(&enc_pk)? + .map(|bytes| Record::from_bytes(&bytes).map_err(Error::Codec)) + .transpose()?; + + if old.is_none() { + Err(Error::NotFound(format!("{:?}", pk)))? + } + + store.set(&enc_pk, &value.to_bytes()?)?; + + Indexes::update(&mut tx, &pk, old.as_ref(), value)?; + + tx.commit().map_err(Error::Backend) } - pub fn update(&self, _value: Record) -> Result<(), Error> { - Ok(()) + /// Saves a record into the collection. + /// + /// If the record already exists, it is updated. + /// Otherwise, a new record is inserted. + /// + /// Indexes are updated atomically within the same transaction. + pub fn save(&self, value: impl Borrow) -> Result<(), Error> { + let value = value.borrow(); + let pk = value.key(); + let enc_pk = pk.encode(); + + let mut tx = self.db.write(self.name)?; + let mut store = tx.open_store(Self::MAIN_STORE)?; + + let old = store + .get(&enc_pk)? + .map(|bytes| Record::from_bytes(&bytes).map_err(Error::Codec)) + .transpose()?; + + store.set(&enc_pk, &value.to_bytes()?)?; + + Indexes::update(&mut tx, &pk, old.as_ref(), value)?; + + tx.commit().map_err(Error::Backend) } - pub fn save(&self, _value: Record) -> Result<(), Error> { - Ok(()) + /// Removes a record from the collection by its primary key. + /// + /// If the record exists, all associated index entries are also removed. + /// + /// Returns `Ok(())` if the record does not exist. + pub fn remove<'a>( + &self, + key: impl Borrow< as Key>::OwnedKey>, + ) -> Result<(), Error> + where + Record: 'a, + { + let pk = key.borrow(); + let enc_pk = pk.encode(); + + let mut tx = self.db.write(self.name)?; + let mut store = tx.open_store(Self::MAIN_STORE)?; + + let record = store + .get(enc_pk)? + .map(|bytes| Record::from_bytes(&bytes).map_err(Error::Codec)) + .transpose()?; + + let record = match record { + Some(record) => record, + None => return Ok(()), + }; + + store.remove(key.borrow().encode())?; + + Indexes::remove(&mut tx, &record.key(), &record)?; + + tx.commit().map_err(Error::Backend) } - pub fn remove(&self, _key: Record::Key<'_>) -> Result<(), Error> { - Ok(()) + /// Retrieves a record from the collection by its primary key. + /// + /// Returns `Ok(None)` if the record does not exist. + pub fn get<'a>( + &self, + key: impl Borrow< as Key>::OwnedKey>, + ) -> Result, Error> + where + Record: 'a, + { + self.db + .read(self.name)? + .open_store(Self::MAIN_STORE)? + .get(key.borrow().encode())? + .map(|bytes| Record::from_bytes(&bytes).map_err(Error::Codec)) + .transpose() } - pub fn index<'a, Idx, P>( + /// Creates a typed scan over a collection index. + /// + /// Scans can be configured with: + /// - prefixes + /// - cursors + /// - ordering direction + /// - limits + /// + /// The returned scan is lazy and does not perform any database access + /// until iterated. + pub fn scan<'a, Idx, P>( &self, _idx: Idx, ) -> Result, Error> @@ -104,6 +215,7 @@ where pub fn with_index(self) -> CollectionBuilder> where Idx: Index, + for<'ik, 'pk> Idx::Kind<'ik>: IndexKind, Record::Key<'pk>>, { assert!( !Indexes::has_index(Idx::NAME), @@ -134,3 +246,613 @@ where _marker: PhantomData, } } + +#[cfg(test)] +mod tests { + use crate::collection::Collection; + use crate::entity::Entity; + use crate::error::{CodecError, Error}; + use crate::key::Key; + use crate::testing::{backend_error, MockDb, SpyRegistry}; + + // ── Minimal entity ──────────────────────────────────────────────────────── + + struct TestRecord { + id: u32, + } + + impl Entity for TestRecord { + type Key<'a> = u32; + + fn key(&self) -> u32 { + self.id + } + + fn to_bytes(&self) -> Result, CodecError> { + Ok(self.id.to_be_bytes().to_vec()) + } + + fn from_bytes(bytes: &[u8]) -> Result { + let id = u32::from_be_bytes( + bytes + .try_into() + .map_err(|_| CodecError::new(std::io::Error::other("bad length")))?, + ); + Ok(TestRecord { id }) + } + } + + // ── insert ──────────────────────────────────────────────────────────────── + + #[test] + fn insert() { + let enc_pk = 1u32.encode().to_vec(); + let enc_val = TestRecord { id: 1 }.to_bytes().unwrap(); + + macro_rules! run { + ($db:expr) => {{ + let db: MockDb = $db; + let log = db.log(); + let col = Collection::<_, TestRecord, SpyRegistry>::new("col", db); + let result = col.insert(TestRecord { id: 1 }); + (result, log) + }}; + } + + struct Case { + name: &'static str, + db: MockDb, + registry_fails: bool, + expect_result: fn(&Result<(), Error>), + expect_opens: &'static [&'static str], + expect_sets: usize, + expect_committed: bool, + expect_registry_called: bool, + } + + let cases = vec![ + Case { + name: "inserts new record", + db: MockDb::new(), + registry_fails: false, + expect_result: |r| assert!(r.is_ok()), + expect_opens: &["__main"], + expect_sets: 1, + expect_committed: true, + expect_registry_called: true, + }, + Case { + name: "fails when record already exists", + db: MockDb::new().with_data("__main", enc_pk.clone(), enc_val.clone()), + registry_fails: false, + expect_result: |r| assert!(matches!(r, Err(Error::AlreadyExists(_)))), + expect_opens: &["__main"], + expect_sets: 0, + expect_committed: false, + expect_registry_called: false, + }, + Case { + name: "propagates backend error from write()", + db: MockDb::new().with_write_err(|| backend_error("write failed")), + registry_fails: false, + expect_result: |r| assert!(matches!(r, Err(Error::Backend(_)))), + expect_opens: &[], + expect_sets: 0, + expect_committed: false, + expect_registry_called: false, + }, + Case { + name: "propagates backend error from commit()", + db: MockDb::new().with_commit_err(|| backend_error("commit failed")), + registry_fails: false, + expect_result: |r| assert!(matches!(r, Err(Error::Backend(_)))), + expect_opens: &["__main"], + expect_sets: 1, + expect_committed: true, + expect_registry_called: true, + }, + Case { + // set is called before the registry; commit is skipped on registry error + name: "propagates registry error", + db: MockDb::new(), + registry_fails: true, + expect_result: |r| assert!(matches!(r, Err(Error::Unexpected(_)))), + expect_opens: &["__main"], + expect_sets: 1, + expect_committed: false, + expect_registry_called: true, + }, + ]; + + for c in cases { + SpyRegistry::reset(); + SpyRegistry::set_fail(c.registry_fails); + let (result, log) = run!(c.db); + let log = log.borrow(); + + (c.expect_result)(&result); + assert_eq!(log.opens.as_slice(), c.expect_opens, "[{}] opens", c.name); + assert_eq!(log.sets.len(), c.expect_sets, "[{}] sets count", c.name); + assert_eq!(log.committed, c.expect_committed, "[{}] committed", c.name); + assert_eq!( + SpyRegistry::was_update_called(), + c.expect_registry_called, + "[{}] registry called", + c.name + ); + } + + // Verify the exact bytes written to the main store + SpyRegistry::reset(); + let (result, log) = run!(MockDb::new()); + assert!(result.is_ok()); + let log = log.borrow(); + assert_eq!( + log.sets[0].0, enc_pk, + "set key must be the encoded primary key" + ); + assert_eq!( + log.sets[0].1, enc_val, + "set value must be to_bytes() output" + ); + } + + // ── update ──────────────────────────────────────────────────────────────── + + #[test] + fn update() { + let enc_pk = 1u32.encode().to_vec(); + let enc_val = TestRecord { id: 1 }.to_bytes().unwrap(); + + let existing_db = || MockDb::new().with_data("__main", enc_pk.clone(), enc_val.clone()); + + macro_rules! run { + ($db:expr) => {{ + let db: MockDb = $db; + let log = db.log(); + let col = Collection::<_, TestRecord, SpyRegistry>::new("col", db); + let result = col.update(TestRecord { id: 1 }); + (result, log) + }}; + } + + struct Case { + name: &'static str, + db: MockDb, + registry_fails: bool, + expect_result: fn(&Result<(), Error>), + expect_opens: &'static [&'static str], + expect_sets: usize, + expect_committed: bool, + expect_registry_called: bool, + } + + let cases = vec![ + Case { + name: "updates existing record", + db: existing_db(), + registry_fails: false, + expect_result: |r| assert!(r.is_ok()), + expect_opens: &["__main"], + expect_sets: 1, + expect_committed: true, + expect_registry_called: true, + }, + Case { + name: "fails when record not found", + db: MockDb::new(), + registry_fails: false, + expect_result: |r| assert!(matches!(r, Err(Error::NotFound(_)))), + expect_opens: &["__main"], + expect_sets: 0, + expect_committed: false, + expect_registry_called: false, + }, + Case { + name: "propagates backend error from write()", + db: MockDb::new().with_write_err(|| backend_error("write failed")), + registry_fails: false, + expect_result: |r| assert!(matches!(r, Err(Error::Backend(_)))), + expect_opens: &[], + expect_sets: 0, + expect_committed: false, + expect_registry_called: false, + }, + Case { + name: "propagates backend error from commit()", + db: existing_db().with_commit_err(|| backend_error("commit failed")), + registry_fails: false, + expect_result: |r| assert!(matches!(r, Err(Error::Backend(_)))), + expect_opens: &["__main"], + expect_sets: 1, + expect_committed: true, + expect_registry_called: true, + }, + Case { + // from_bytes is called on the stored value before set — codec errors must surface + name: "propagates codec error from corrupted stored bytes", + db: MockDb::new().with_data("__main", enc_pk.clone(), vec![0x01]), + registry_fails: false, + expect_result: |r| assert!(matches!(r, Err(Error::Codec(_)))), + expect_opens: &["__main"], + expect_sets: 0, + expect_committed: false, + expect_registry_called: false, + }, + Case { + // set is called before the registry; commit is skipped on registry error + name: "propagates registry error", + db: existing_db(), + registry_fails: true, + expect_result: |r| assert!(matches!(r, Err(Error::Unexpected(_)))), + expect_opens: &["__main"], + expect_sets: 1, + expect_committed: false, + expect_registry_called: true, + }, + ]; + + for c in cases { + SpyRegistry::reset(); + SpyRegistry::set_fail(c.registry_fails); + let (result, log) = run!(c.db); + let log = log.borrow(); + + (c.expect_result)(&result); + assert_eq!(log.opens.as_slice(), c.expect_opens, "[{}] opens", c.name); + assert_eq!(log.sets.len(), c.expect_sets, "[{}] sets count", c.name); + assert_eq!(log.committed, c.expect_committed, "[{}] committed", c.name); + assert_eq!( + SpyRegistry::was_update_called(), + c.expect_registry_called, + "[{}] registry called", + c.name + ); + } + + // Verify the exact bytes written to the main store + SpyRegistry::reset(); + let (result, log) = run!(existing_db()); + assert!(result.is_ok()); + let log = log.borrow(); + assert_eq!( + log.sets[0].0, enc_pk, + "set key must be the encoded primary key" + ); + assert_eq!( + log.sets[0].1, enc_val, + "set value must be to_bytes() output" + ); + } + + // ── save ────────────────────────────────────────────────────────────────── + + #[test] + fn save() { + let enc_pk = 1u32.encode().to_vec(); + let enc_val = TestRecord { id: 1 }.to_bytes().unwrap(); + + let existing_db = || MockDb::new().with_data("__main", enc_pk.clone(), enc_val.clone()); + + macro_rules! run { + ($db:expr) => {{ + let db: MockDb = $db; + let log = db.log(); + let col = Collection::<_, TestRecord, SpyRegistry>::new("col", db); + let result = col.save(TestRecord { id: 1 }); + (result, log) + }}; + } + + struct Case { + name: &'static str, + db: MockDb, + registry_fails: bool, + expect_result: fn(&Result<(), Error>), + expect_opens: &'static [&'static str], + expect_sets: usize, + expect_committed: bool, + expect_registry_called: bool, + } + + let cases = vec![ + Case { + name: "save when record does not exist", + db: MockDb::new(), + registry_fails: false, + expect_result: |r| assert!(r.is_ok()), + expect_opens: &["__main"], + expect_sets: 1, + expect_committed: true, + expect_registry_called: true, + }, + Case { + name: "overwrites when record already exists", + db: existing_db(), + registry_fails: false, + expect_result: |r| assert!(r.is_ok()), + expect_opens: &["__main"], + expect_sets: 1, + expect_committed: true, + expect_registry_called: true, + }, + Case { + name: "propagates backend error from write()", + db: MockDb::new().with_write_err(|| backend_error("write failed")), + registry_fails: false, + expect_result: |r| assert!(matches!(r, Err(Error::Backend(_)))), + expect_opens: &[], + expect_sets: 0, + expect_committed: false, + expect_registry_called: false, + }, + Case { + name: "propagates backend error from commit()", + db: existing_db().with_commit_err(|| backend_error("commit failed")), + registry_fails: false, + expect_result: |r| assert!(matches!(r, Err(Error::Backend(_)))), + expect_opens: &["__main"], + expect_sets: 1, + expect_committed: true, + expect_registry_called: true, + }, + Case { + // from_bytes is called on any stored value before set — codec errors must surface + name: "propagates codec error from corrupted stored bytes", + db: MockDb::new().with_data("__main", enc_pk.clone(), vec![0x01]), + registry_fails: false, + expect_result: |r| assert!(matches!(r, Err(Error::Codec(_)))), + expect_opens: &["__main"], + expect_sets: 0, + expect_committed: false, + expect_registry_called: false, + }, + Case { + // set is called before the registry; commit is skipped on registry error + name: "propagates registry error", + db: existing_db(), + registry_fails: true, + expect_result: |r| assert!(matches!(r, Err(Error::Unexpected(_)))), + expect_opens: &["__main"], + expect_sets: 1, + expect_committed: false, + expect_registry_called: true, + }, + ]; + + for c in cases { + SpyRegistry::reset(); + SpyRegistry::set_fail(c.registry_fails); + let (result, log) = run!(c.db); + let log = log.borrow(); + + (c.expect_result)(&result); + assert_eq!(log.opens.as_slice(), c.expect_opens, "[{}] opens", c.name); + assert_eq!(log.sets.len(), c.expect_sets, "[{}] sets count", c.name); + assert_eq!(log.committed, c.expect_committed, "[{}] committed", c.name); + assert_eq!( + SpyRegistry::was_update_called(), + c.expect_registry_called, + "[{}] registry called", + c.name + ); + } + + // Verify the exact bytes written to the main store + SpyRegistry::reset(); + let (result, log) = run!(MockDb::new()); + assert!(result.is_ok()); + let log = log.borrow(); + assert_eq!( + log.sets[0].0, enc_pk, + "set key must be the encoded primary key" + ); + assert_eq!( + log.sets[0].1, enc_val, + "set value must be to_bytes() output" + ); + } + + // ── remove ──────────────────────────────────────────────────────────────── + + #[test] + fn remove() { + let enc_pk = 1u32.encode().to_vec(); + let enc_val = TestRecord { id: 1 }.to_bytes().unwrap(); + + let existing_db = || MockDb::new().with_data("__main", enc_pk.clone(), enc_val.clone()); + + macro_rules! run { + ($db:expr) => {{ + let db: MockDb = $db; + let log = db.log(); + let col = Collection::<_, TestRecord, SpyRegistry>::new("col", db); + let result = col.remove(1u32); + (result, log) + }}; + } + + struct Case { + name: &'static str, + db: MockDb, + registry_fails: bool, + expect_result: fn(&Result<(), Error>), + expect_opens: &'static [&'static str], + expect_removes: usize, + expect_committed: bool, + expect_registry_called: bool, + } + + let cases = vec![ + Case { + name: "removes existing record", + db: existing_db(), + registry_fails: false, + expect_result: |r| assert!(r.is_ok()), + expect_opens: &["__main"], + expect_removes: 1, + expect_committed: true, + expect_registry_called: true, + }, + Case { + // record absent → early Ok(()), no write to store, no registry + name: "returns ok when record does not exist", + db: MockDb::new(), + registry_fails: false, + expect_result: |r| assert!(r.is_ok()), + expect_opens: &["__main"], + expect_removes: 0, + expect_committed: false, + expect_registry_called: false, + }, + Case { + name: "propagates backend error from write()", + db: MockDb::new().with_write_err(|| backend_error("write failed")), + registry_fails: false, + expect_result: |r| assert!(matches!(r, Err(Error::Backend(_)))), + expect_opens: &[], + expect_removes: 0, + expect_committed: false, + expect_registry_called: false, + }, + Case { + name: "propagates backend error from commit()", + db: existing_db().with_commit_err(|| backend_error("commit failed")), + registry_fails: false, + expect_result: |r| assert!(matches!(r, Err(Error::Backend(_)))), + expect_opens: &["__main"], + expect_removes: 1, + expect_committed: true, + expect_registry_called: true, + }, + Case { + // from_bytes is called on any stored value before remove — codec errors must surface + name: "propagates codec error from corrupted stored bytes", + db: MockDb::new().with_data("__main", enc_pk.clone(), vec![0x01]), + registry_fails: false, + expect_result: |r| assert!(matches!(r, Err(Error::Codec(_)))), + expect_opens: &["__main"], + expect_removes: 0, + expect_committed: false, + expect_registry_called: false, + }, + Case { + // remove is called before the registry; commit is skipped on registry error + name: "propagates registry error", + db: existing_db(), + registry_fails: true, + expect_result: |r| assert!(matches!(r, Err(Error::Unexpected(_)))), + expect_opens: &["__main"], + expect_removes: 1, + expect_committed: false, + expect_registry_called: true, + }, + ]; + + for c in cases { + SpyRegistry::reset(); + SpyRegistry::set_fail(c.registry_fails); + let (result, log) = run!(c.db); + let log = log.borrow(); + + (c.expect_result)(&result); + assert_eq!(log.opens.as_slice(), c.expect_opens, "[{}] opens", c.name); + assert_eq!( + log.removes.len(), + c.expect_removes, + "[{}] removes count", + c.name + ); + assert_eq!(log.committed, c.expect_committed, "[{}] committed", c.name); + assert_eq!( + SpyRegistry::was_remove_called(), + c.expect_registry_called, + "[{}] registry called", + c.name + ); + } + + // Verify the exact key passed to store.remove() + SpyRegistry::reset(); + let (result, log) = run!(existing_db()); + assert!(result.is_ok()); + let log = log.borrow(); + assert_eq!( + log.removes[0], enc_pk, + "remove key must be the encoded primary key" + ); + } + + // ── get ─────────────────────────────────────────────────────────────────── + + #[test] + fn get() { + let enc_pk = 1u32.encode().to_vec(); + let enc_val = TestRecord { id: 1 }.to_bytes().unwrap(); + + let existing_db = || MockDb::new().with_data("__main", enc_pk.clone(), enc_val.clone()); + + macro_rules! run { + ($db:expr) => {{ + let db: MockDb = $db; + let log = db.log(); + let col = Collection::<_, TestRecord, SpyRegistry>::new("col", db); + let result = col.get(1u32); + (result, log) + }}; + } + + struct Case { + name: &'static str, + db: MockDb, + expect_result: fn(&Result, Error>), + expect_opens: &'static [&'static str], + } + + let cases = vec![ + Case { + name: "returns the record when it exists", + db: existing_db(), + expect_result: |r| { + let record = r.as_ref().unwrap().as_ref().unwrap(); + assert_eq!(record.id, 1); + }, + expect_opens: &["__main"], + }, + Case { + name: "returns None when record does not exist", + db: MockDb::new(), + expect_result: |r| assert!(matches!(r, Ok(None))), + expect_opens: &["__main"], + }, + Case { + name: "propagates backend error from read()", + db: MockDb::new().with_read_err(|| backend_error("read failed")), + expect_result: |r| assert!(matches!(r, Err(Error::Backend(_)))), + expect_opens: &[], + }, + Case { + name: "propagates codec error from corrupted stored bytes", + db: MockDb::new().with_data("__main", enc_pk.clone(), vec![0x01]), + expect_result: |r| assert!(matches!(r, Err(Error::Codec(_)))), + expect_opens: &["__main"], + }, + ]; + + for c in cases { + let (result, log) = run!(c.db); + let log = log.borrow(); + + (c.expect_result)(&result); + assert_eq!(log.opens.as_slice(), c.expect_opens, "[{}] opens", c.name); + } + + // Verify the exact key passed to store.get() + let (_, log) = run!(existing_db()); + let log = log.borrow(); + assert_eq!( + log.gets[0], enc_pk, + "get key must be the encoded primary key" + ); + } +} diff --git a/src/error.rs b/src/error.rs index eb1e9ea..c496d4e 100644 --- a/src/error.rs +++ b/src/error.rs @@ -21,6 +21,13 @@ pub enum Error { #[derive(Debug, thiserror::Error)] pub struct BackendError(Box); +impl BackendError { + #[cfg(test)] + pub(crate) fn new(e: impl std::error::Error + Send + Sync + 'static) -> Self { + BackendError(Box::new(e)) + } +} + impl Display for BackendError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { self.0.fmt(f) @@ -30,6 +37,13 @@ impl Display for BackendError { #[derive(Debug, thiserror::Error)] pub struct CodecError(Box); +impl CodecError { + #[cfg(test)] + pub(crate) fn new(e: impl std::error::Error + Send + Sync + 'static) -> Self { + CodecError(Box::new(e)) + } +} + impl Display for CodecError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { self.0.fmt(f) diff --git a/src/index.rs b/src/index.rs index 517ee64..d0d84a6 100644 --- a/src/index.rs +++ b/src/index.rs @@ -19,15 +19,19 @@ pub trait Index { fn key(entity: &Record) -> Self::Key<'_>; - fn set<'a, DB: MultiStoreWriteHandle>( + fn update<'a, DB: MultiStoreWriteHandle>( db: &mut DB, - old: Option<(&Record::Key<'a>, &'a Record)>, - new: (&Record::Key<'a>, &'a Record), - ) -> Result<(), Error> { - let new_skey = Self::Kind::store_key(Self::key(new.1), new.0); + pk: &Record::Key<'a>, + old: Option<&Record>, + new: &'a Record, + ) -> Result<(), Error> + where + for<'ik, 'pk> Self::Kind<'ik>: IndexKind, Record::Key<'pk>>, + { + let new_skey = Self::Kind::store_key(Self::key(new), pk); let mut store = None; - if let Some((pk, entity)) = old { + if let Some(entity) = old { let old_skey = Self::Kind::store_key(Self::key(entity), pk); if old_skey == new_skey { @@ -43,28 +47,28 @@ pub trait Index { let skey = new_skey.encode(); if store.get(&skey)?.is_some() { - Err(Error::AlreadyExists(Self::NAME.to_string()))? + Err(Error::AlreadyExists(format!("{:?}", new_skey)))? } - store.set(skey, new.0.encode())?; + store.set(skey, pk.encode())?; Ok(()) } fn remove<'a, DB: MultiStoreWriteHandle>( db: &mut DB, - target: (&Record::Key<'a>, &'a Record), + pk: &Record::Key<'a>, + item: &'a Record, ) -> Result<(), Error> { let mut store = db.open_store(Self::NAME)?; - let ikey = Self::key(target.1); - let skey = Self::Kind::store_key(ikey, target.0); + let skey = Self::Kind::store_key(Self::key(item), pk); store.remove(skey.encode()).map_err(Error::Backend) } } -pub type StoreKey<'a, I, PK, T> = - <>::Kind<'a> as IndexKind<>::Key<'a>, PK>>::StoreKey<'a>; +pub type StoreKey<'a, 'b, I, PK, T> = + <>::Kind<'a> as IndexKind<>::Key<'b>, PK>>::StoreKey<'a, 'b>; /// IndexKind helps to specify an index behavior by expressing the actual stored key in the index /// based on the index key and the underlying entity primary key. @@ -76,12 +80,12 @@ where IndexKey: Key, PrimaryKey: Key, { - type StoreKey<'a>: Key + type StoreKey<'a, 'b>: Key where IndexKey: 'a, - PrimaryKey: 'a; + PrimaryKey: 'b; - fn store_key<'a>(k: IndexKey, pk: &'a PrimaryKey) -> Self::StoreKey<'a> + fn store_key<'a, 'b>(k: IndexKey, pk: &'b PrimaryKey) -> Self::StoreKey<'a, 'b> where IndexKey: 'a; } @@ -93,13 +97,13 @@ where IndexKey: Key, PrimaryKey: Key, { - type StoreKey<'a> + type StoreKey<'a, 'b> = IndexKey where IndexKey: 'a, - PrimaryKey: 'a; + PrimaryKey: 'b; - fn store_key<'a>(k: IndexKey, _pk: &'a PrimaryKey) -> Self::StoreKey<'a> + fn store_key<'a, 'b>(k: IndexKey, _pk: &'b PrimaryKey) -> Self::StoreKey<'a, 'b> where IndexKey: 'a, { @@ -114,13 +118,13 @@ where IndexKey: Key + AppendKey, PrimaryKey: Key, { - type StoreKey<'a> - = >::Key<'a> + type StoreKey<'a, 'b> + = >::Key<'b> where IndexKey: 'a, - PrimaryKey: 'a; + PrimaryKey: 'b; - fn store_key<'a>(k: IndexKey, pk: &'a PrimaryKey) -> Self::StoreKey<'a> + fn store_key<'a, 'b>(k: IndexKey, pk: &'b PrimaryKey) -> Self::StoreKey<'a, 'b> where IndexKey: 'a, { diff --git a/src/index_registry.rs b/src/index_registry.rs index 4042f82..46a8b1d 100644 --- a/src/index_registry.rs +++ b/src/index_registry.rs @@ -12,15 +12,17 @@ pub struct Cons(PhantomData<(Head, Tail)>); /// IndexRegistry is a recursive HList trait to allow defining multiple indexes as generic types. pub trait IndexRegistry { - fn set<'a, DB: MultiStoreWriteHandle>( + fn update<'a, DB: MultiStoreWriteHandle>( db: &mut DB, - old: Option<(&T::Key<'a>, &'a T)>, - new: (&T::Key<'a>, &'a T), + pk: &T::Key<'a>, + old: Option<&T>, + new: &'a T, ) -> Result<(), Error>; fn remove<'a, DB: MultiStoreWriteHandle>( db: &mut DB, - target: (&T::Key<'a>, &'a T), + pk: &T::Key<'a>, + item: &'a T, ) -> Result<(), Error>; fn has_index(name: &str) -> bool; @@ -30,17 +32,19 @@ impl IndexRegistry for Nil where T: Entity, { - fn set<'a, DB: MultiStoreWriteHandle>( + fn update<'a, DB: MultiStoreWriteHandle>( _db: &mut DB, - _old: Option<(&T::Key<'a>, &'a T)>, - _new: (&T::Key<'a>, &'a T), + _pk: &T::Key<'a>, + _old: Option<&T>, + _new: &'a T, ) -> Result<(), Error> { Ok(()) } fn remove<'a, DB: MultiStoreWriteHandle>( _db: &mut DB, - _target: (&T::Key<'a>, &'a T), + _pk: &T::Key<'a>, + _item: &'a T, ) -> Result<(), Error> { Ok(()) } @@ -55,23 +59,25 @@ where T: Entity, Head: Index, Tail: IndexRegistry, - for<'a> Head::Kind<'a>: IndexKind, T::Key<'a>>, + for<'ik, 'pk> Head::Kind<'ik>: IndexKind, T::Key<'pk>>, { - fn set<'a, DB: MultiStoreWriteHandle>( + fn update<'a, DB: MultiStoreWriteHandle>( db: &mut DB, - old: Option<(&T::Key<'a>, &'a T)>, - new: (&T::Key<'a>, &'a T), + pk: &T::Key<'a>, + old: Option<&T>, + new: &'a T, ) -> Result<(), Error> { - Head::set(db, old, new)?; - Tail::set(db, old, new) + Head::update(db, pk, old, new)?; + Tail::update(db, pk, old, new) } fn remove<'a, DB: MultiStoreWriteHandle>( db: &mut DB, - target: (&T::Key<'a>, &'a T), + pk: &T::Key<'a>, + item: &'a T, ) -> Result<(), Error> { - Head::remove(db, target)?; - Tail::remove(db, target) + Head::remove(db, pk, item)?; + Tail::remove(db, pk, item) } fn has_index(name: &str) -> bool { @@ -135,17 +141,19 @@ mod tests { fn key(r: &Record) -> u32 { r.0 } - fn set( + fn update( db: &mut DB, - _old: Option<(&u32, &Record)>, - _new: (&u32, &Record), + _pk: &u32, + _old: Option<&Record>, + _new: &Record, ) -> Result<(), Error> { db.open_store(Self::NAME)?; Ok(()) } fn remove( db: &mut DB, - _target: (&u32, &Record), + _pk: &u32, + _item: &Record, ) -> Result<(), Error> { db.open_store(Self::NAME)?; Ok(()) @@ -164,16 +172,18 @@ mod tests { fn key(r: &Record) -> u32 { r.0 } - fn set( + fn update( _db: &mut DB, - _old: Option<(&u32, &Record)>, - _new: (&u32, &Record), + _pk: &u32, + _old: Option<&Record>, + _new: &Record, ) -> Result<(), Error> { Err(Error::Unexpected("injected".into())) } fn remove( _db: &mut DB, - _target: (&u32, &Record), + _pk: &u32, + _item: &Record, ) -> Result<(), Error> { Err(Error::Unexpected("injected".into())) } @@ -271,22 +281,20 @@ mod tests { // ── set ─────────────────────────────────────────────────────────────────── #[test] - fn set() { + fn update() { let record = Record(1); let pk = 1u32; let cases: &[(&dyn Fn(&mut Spy) -> Result<(), Error>, &[&str], bool)] = &[ ( - &|s| >::set(s, None, (&pk, &record)), + &|s| >::update(s, &pk, None, &record), &[], false, ), ( &|s| { - > as IndexRegistry>::set( - s, - None, - (&pk, &record), + > as IndexRegistry>::update( + s, &pk, None, &record, ) }, &["index_a", "index_b"], @@ -294,10 +302,8 @@ mod tests { ), ( &|s| { - > as IndexRegistry>::set( - s, - None, - (&pk, &record), + > as IndexRegistry>::update( + s, &pk, None, &record, ) }, &[], @@ -322,15 +328,14 @@ mod tests { let cases: &[(&dyn Fn(&mut Spy) -> Result<(), Error>, &[&str], bool)] = &[ ( - &|s| >::remove(s, (&pk, &record)), + &|s| >::remove(s, &pk, &record), &[], false, ), ( &|s| { > as IndexRegistry>::remove( - s, - (&pk, &record), + s, &pk, &record, ) }, &["index_a", "index_b"], @@ -339,8 +344,7 @@ mod tests { ( &|s| { > as IndexRegistry>::remove( - s, - (&pk, &record), + s, &pk, &record, ) }, &[], diff --git a/src/key.rs b/src/key.rs index 1c34925..6fd92f1 100644 --- a/src/key.rs +++ b/src/key.rs @@ -1,5 +1,6 @@ use crate::inline_vec::IVec; use crate::{impl_signed_integer_key, impl_unsigned_integer_key}; +use std::fmt::Debug; /// A value that can be encoded as an ordered key for Colette stores and indexes. /// @@ -37,7 +38,7 @@ use crate::{impl_signed_integer_key, impl_unsigned_integer_key}; /// /// Changing a `Key` implementation changes the physical storage layout and /// should be treated as a migration. -pub trait Key: Eq { +pub trait Key: Debug + Eq { /// The encoded size of the key. /// /// Fixed-size keys allow Colette to preallocate buffers efficiently. diff --git a/src/lib.rs b/src/lib.rs index c7f50c9..f8b3822 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -10,3 +10,6 @@ pub mod macros; pub mod prefix; pub mod scan; pub mod store; + +#[cfg(test)] +pub mod testing; diff --git a/src/scan.rs b/src/scan.rs index 1542dfe..1dd0334 100644 --- a/src/scan.rs +++ b/src/scan.rs @@ -42,17 +42,17 @@ pub enum ScanRange { pub struct IndexScan<'a, ReadHandle, Record, Idx> where + Self: 'a, ReadHandle: MultiStoreReadHandle, Record: Entity, Idx: Index, - Idx::Kind<'a>: IndexKind, Record::Key<'a>>, - Self: 'a, + for<'b> Idx::Kind<'b>: IndexKind, Record::Key<'b>>, { collection_name: &'static str, read_handle: ReadHandle, range: ScanRange, direction: Direction, - after: Option, Record>>, + after: Option, Record>>, _marker: PhantomData<(Record, Idx)>, } @@ -62,7 +62,7 @@ where ReadHandle: MultiStoreReadHandle, Record: Entity, Idx: Index, - Idx::Kind<'a>: IndexKind, Record::Key<'a>>, + for<'b> Idx::Kind<'b>: IndexKind, Record::Key<'b>>, { pub fn new(collection_name: &'static str, read_handle: ReadHandle) -> Self { Self { @@ -78,7 +78,7 @@ where pub fn range( mut self, - range: Range, Record>>>, + range: Range, Record>>>, ) -> Self { self.range = ScanRange::Range { left: range.start.map(|p| p.encode().as_ref().to_vec()), @@ -92,7 +92,7 @@ where self } - pub fn after(mut self, cursor: StoreKey<'a, Idx, Record::Key<'a>, Record>) -> Self { + pub fn after(mut self, cursor: StoreKey<'a, 'a, Idx, Record::Key<'a>, Record>) -> Self { self.after = Some(cursor); self } @@ -116,15 +116,15 @@ pub trait PrefixScan, KeyPrefix: Prefix> } impl<'a, ReadHandle, Record, Idx, KeyPrefix> - PrefixScan, Record>, KeyPrefix> + PrefixScan, Record>, KeyPrefix> for IndexScan<'a, ReadHandle, Record, Idx> where ReadHandle: MultiStoreReadHandle, Record: Entity, Idx: Index, - Idx::Kind<'a>: IndexKind, Record::Key<'a>>, KeyPrefix: Prefix, - StoreKey<'a, Idx, Record::Key<'a>, Record>: Key + Prefixable, + StoreKey<'a, 'a, Idx, Record::Key<'a>, Record>: Key + Prefixable, + for<'b> Idx::Kind<'b>: IndexKind, Record::Key<'b>>, { fn prefix(mut self, prefix: KeyPrefix) -> Self { self.range = ScanRange::Prefix(prefix.encode_prefix()); @@ -141,7 +141,7 @@ where fn range( mut self, - range: Range, Record>, KeyPrefix>>>, + range: Range, Record>, KeyPrefix>>>, ) -> Self { self.range = ScanRange::Range { left: range.start.map(|p| p.encode()), diff --git a/src/testing.rs b/src/testing.rs new file mode 100644 index 0000000..1621fcd --- /dev/null +++ b/src/testing.rs @@ -0,0 +1,324 @@ +//! Reusable mock implementations of the [`crate::store`] traits for use across +//! test modules. +//! +//! Every operation performed through a [`MockDb`] is recorded in a shared +//! [`TxLog`] that callers can inspect after the fact. + +use std::cell::{Cell, RefCell}; +use std::collections::HashMap; +use std::rc::Rc; + +use crate::entity::Entity; +use crate::error::{BackendError, Error}; +use crate::index_registry::IndexRegistry; +use crate::scan::{Direction, ScanRange}; +use crate::store::{ + MultiStore, MultiStoreReadHandle, MultiStoreWriteHandle, ReadKVStore, ReadWriteKVStore, + WriteKVStore, +}; + +// ── Error helpers ───────────────────────────────────────────────────────────── + +/// Constructs a [`BackendError`] from a static message, for use in error +/// factory functions passed to [`MockDb::with_write_err`] / +/// [`MockDb::with_commit_err`]. +pub fn backend_error(msg: &'static str) -> BackendError { + BackendError::new(std::io::Error::new(std::io::ErrorKind::Other, msg)) +} + +// ── TxLog ───────────────────────────────────────────────────────────────────── + +/// Shared log of all store operations performed via a [`MockDb`] instance. +/// +/// The log is shared between the db and all handles/stores it produces so +/// that callers can observe the full sequence of operations after a test call. +#[derive(Default, Debug)] +pub struct TxLog { + /// Names of stores opened via `open_store` (in call order). + pub opens: Vec, + /// Raw keys passed to `get` (in call order). + pub gets: Vec>, + /// `(key, value)` pairs passed to `set` (in call order). + pub sets: Vec<(Vec, Vec)>, + /// Raw keys passed to `remove` (in call order). + pub removes: Vec>, + /// Whether `commit` was called on the write handle. + pub committed: bool, +} + +// ── MockStore ───────────────────────────────────────────────────────────────── + +/// A mock [`ReadWriteKVStore`] that records every operation in a shared +/// [`TxLog`] and serves pre-configured byte values for `get` calls. +pub struct MockStore { + log: Rc>, + data: HashMap, Vec>, +} + +impl ReadKVStore for MockStore { + type Iter = std::iter::Empty, Vec), BackendError>>; + + fn get(&self, key: impl AsRef<[u8]>) -> Result>, BackendError> { + let key = key.as_ref().to_vec(); + self.log.borrow_mut().gets.push(key.clone()); + Ok(self.data.get(&key).cloned()) + } + + fn scan(&self, _: ScanRange, _: Direction) -> Result { + Ok(std::iter::empty()) + } +} + +impl WriteKVStore for MockStore { + fn set(&mut self, key: impl AsRef<[u8]>, value: impl AsRef<[u8]>) -> Result<(), BackendError> { + self.log + .borrow_mut() + .sets + .push((key.as_ref().to_vec(), value.as_ref().to_vec())); + Ok(()) + } + + fn remove(&mut self, key: impl AsRef<[u8]>) -> Result<(), BackendError> { + self.log.borrow_mut().removes.push(key.as_ref().to_vec()); + Ok(()) + } +} + +impl ReadWriteKVStore for MockStore {} + +// ── MockWriteHandle ─────────────────────────────────────────────────────────── + +/// A mock [`MultiStoreWriteHandle`]. +/// +/// Each call to `open_store` is logged and returns a [`MockStore`] seeded with +/// the data registered under that store name on the owning [`MockDb`]. +pub struct MockWriteHandle { + log: Rc>, + store_data: HashMap, Vec>>, + commit_err: Option BackendError>, +} + +impl MultiStoreWriteHandle for MockWriteHandle { + type Store = MockStore; + + fn open_store(&mut self, name: &str) -> Result { + self.log.borrow_mut().opens.push(name.to_string()); + let data = self.store_data.get(name).cloned().unwrap_or_default(); + Ok(MockStore { + log: self.log.clone(), + data, + }) + } + + fn commit(self) -> Result<(), BackendError> { + self.log.borrow_mut().committed = true; + match self.commit_err { + Some(make_err) => Err(make_err()), + None => Ok(()), + } + } +} + +// ── MockReadHandle ──────────────────────────────────────────────────────────── + +/// A mock [`MultiStoreReadHandle`]. +/// +/// Read operations are also recorded in the shared log. +pub struct MockReadHandle { + log: Rc>, + store_data: HashMap, Vec>>, +} + +impl MultiStoreReadHandle for MockReadHandle { + type Store = MockStore; + + fn open_store(&self, name: &str) -> Result { + self.log.borrow_mut().opens.push(name.to_string()); + let data = self.store_data.get(name).cloned().unwrap_or_default(); + Ok(MockStore { + log: self.log.clone(), + data, + }) + } +} + +// ── MockDb ──────────────────────────────────────────────────────────────────── + +/// A configurable mock [`MultiStore`]. +/// +/// # Usage +/// +/// ```rust,ignore +/// let db = MockDb::new() +/// .with_data("__main", enc_pk, enc_val) // simulate existing record +/// .with_commit_err(|| backend_error("disk full")); +/// +/// let log = db.log(); // clone the Rc before the db is moved +/// collection.insert(record)?; +/// +/// let log = log.borrow(); +/// assert_eq!(log.sets.len(), 1); +/// assert!(log.committed); +/// ``` +pub struct MockDb { + log: Rc>, + store_data: HashMap, Vec>>, + read_err: Option BackendError>, + write_err: Option BackendError>, + commit_err: Option BackendError>, +} + +impl Default for MockDb { + fn default() -> Self { + Self { + log: Rc::new(RefCell::new(TxLog::default())), + store_data: HashMap::new(), + read_err: None, + write_err: None, + commit_err: None, + } + } +} + +impl MockDb { + pub fn new() -> Self { + Self::default() + } + + /// Returns a handle to the shared operation log for post-call assertions. + /// + /// Clone this before moving `MockDb` into a [`Collection`]. + pub fn log(&self) -> Rc> { + self.log.clone() + } + + /// Pre-seeds a named store with a key/value entry (returned by `get`). + pub fn with_data( + mut self, + store: &str, + key: impl Into>, + value: impl Into>, + ) -> Self { + self.store_data + .entry(store.to_string()) + .or_default() + .insert(key.into(), value.into()); + self + } + + /// Makes `read()` return an error produced by `make_err`. + pub fn with_read_err(mut self, make_err: fn() -> BackendError) -> Self { + self.read_err = Some(make_err); + self + } + + /// Makes `write()` return an error produced by `make_err`. + pub fn with_write_err(mut self, make_err: fn() -> BackendError) -> Self { + self.write_err = Some(make_err); + self + } + + /// Makes `commit()` return an error produced by `make_err`. + pub fn with_commit_err(mut self, make_err: fn() -> BackendError) -> Self { + self.commit_err = Some(make_err); + self + } +} + +impl MultiStore for MockDb { + type ReadHandle = MockReadHandle; + type WriteHandle = MockWriteHandle; + + fn read(&self, _: &str) -> Result { + if let Some(make_err) = self.read_err { + return Err(make_err()); + } + Ok(MockReadHandle { + log: self.log.clone(), + store_data: self.store_data.clone(), + }) + } + + fn write(&self, _: &str) -> Result { + if let Some(make_err) = self.write_err { + return Err(make_err()); + } + Ok(MockWriteHandle { + log: self.log.clone(), + store_data: self.store_data.clone(), + commit_err: self.commit_err, + }) + } +} + +// ── SpyRegistry ─────────────────────────────────────────────────────────────── + +thread_local! { + static REGISTRY_UPDATE_CALLED: Cell = Cell::new(false); + static REGISTRY_REMOVE_CALLED: Cell = Cell::new(false); + static REGISTRY_SHOULD_FAIL: Cell = Cell::new(false); +} + +/// A mock [`IndexRegistry`] backed by thread-local flags. +/// +/// Each test thread starts with a clean slate. Call [`SpyRegistry::reset`] +/// between successive uses within the same test function. +pub struct SpyRegistry; + +impl SpyRegistry { + /// Resets all flags to their initial state. + pub fn reset() { + REGISTRY_UPDATE_CALLED.with(|c| c.set(false)); + REGISTRY_REMOVE_CALLED.with(|c| c.set(false)); + REGISTRY_SHOULD_FAIL.with(|c| c.set(false)); + } + + /// When set to `true`, the next `update` or `remove` call returns + /// `Err(Error::Unexpected(...))`. + pub fn set_fail(fail: bool) { + REGISTRY_SHOULD_FAIL.with(|c| c.set(fail)); + } + + /// Returns `true` if `update` was called since the last [`reset`]. + pub fn was_update_called() -> bool { + REGISTRY_UPDATE_CALLED.with(|c| c.get()) + } + + /// Returns `true` if `remove` was called since the last [`reset`]. + pub fn was_remove_called() -> bool { + REGISTRY_REMOVE_CALLED.with(|c| c.get()) + } + + fn fail_if_needed() -> Result<(), Error> { + if REGISTRY_SHOULD_FAIL.with(|c| c.get()) { + Err(Error::Unexpected("injected registry error".into())) + } else { + Ok(()) + } + } +} + +impl IndexRegistry for SpyRegistry { + fn update<'a, DB: MultiStoreWriteHandle>( + _db: &mut DB, + _pk: &T::Key<'a>, + _old: Option<&T>, + _new: &'a T, + ) -> Result<(), Error> { + REGISTRY_UPDATE_CALLED.with(|c| c.set(true)); + Self::fail_if_needed() + } + + fn remove<'a, DB: MultiStoreWriteHandle>( + _db: &mut DB, + _pk: &T::Key<'a>, + _item: &'a T, + ) -> Result<(), Error> { + REGISTRY_REMOVE_CALLED.with(|c| c.set(true)); + Self::fail_if_needed() + } + + fn has_index(_name: &str) -> bool { + false + } +}