From b15c4bec188948be6e647a64c8ec9976aa978d15 Mon Sep 17 00:00:00 2001 From: extollIT Enterprises Date: Tue, 12 May 2026 12:11:49 -0600 Subject: [PATCH] Initial commit --- Cargo.lock | 14 + Cargo.toml | 2 + crates/rig-valkey/Cargo.toml | 22 ++ crates/rig-valkey/LICENSE | 7 + crates/rig-valkey/README.md | 44 ++++ crates/rig-valkey/src/filter.rs | 139 ++++++++++ crates/rig-valkey/src/lib.rs | 438 ++++++++++++++++++++++++++++++++ sdd/constraints.md | 183 +++++++++++++ sdd/design.md | 165 ++++++++++++ sdd/evaluation.md | 215 ++++++++++++++++ src/lib.rs | 5 + tests/integrations.rs | 3 + tests/integrations/valkey.rs | 346 +++++++++++++++++++++++++ 13 files changed, 1583 insertions(+) create mode 100644 crates/rig-valkey/Cargo.toml create mode 100644 crates/rig-valkey/LICENSE create mode 100644 crates/rig-valkey/README.md create mode 100644 crates/rig-valkey/src/filter.rs create mode 100644 crates/rig-valkey/src/lib.rs create mode 100644 sdd/constraints.md create mode 100644 sdd/design.md create mode 100644 sdd/evaluation.md create mode 100644 tests/integrations/valkey.rs diff --git a/Cargo.lock b/Cargo.lock index 34e029794..416c55574 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9137,6 +9137,7 @@ dependencies = [ "rig-scylladb", "rig-sqlite", "rig-surrealdb", + "rig-valkey", "rig-vectorize", "rig-vertexai", "rmcp", @@ -9488,6 +9489,19 @@ dependencies = [ "uuid", ] +[[package]] +name = "rig-valkey" +version = "0.1.0" +dependencies = [ + "anyhow", + "nanoid", + "redis", + "rig-core", + "serde", + "serde_json", + "tracing", +] + [[package]] name = "rig-vectorize" version = "0.2.6" diff --git a/Cargo.toml b/Cargo.toml index 2da36241f..739492a6e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -160,6 +160,7 @@ rig-s3vectors = { path = "crates/rig-s3vectors", version = "0.2.6", optional = t rig-scylladb = { path = "crates/rig-scylladb", version = "0.2.6", optional = true, default-features = false } rig-sqlite = { path = "crates/rig-sqlite", version = "0.2.6", optional = true, default-features = false } rig-surrealdb = { path = "crates/rig-surrealdb", version = "0.2.6", optional = true, default-features = false } +rig-valkey = { path = "crates/rig-valkey", version = "0.1.0", optional = true, default-features = false } rig-vectorize = { path = "crates/rig-vectorize", version = "0.2.6", optional = true, default-features = false } rig-vertexai = { path = "crates/rig-vertexai", version = "0.3.6", optional = true, default-features = false } @@ -244,6 +245,7 @@ s3vectors = ["dep:rig-s3vectors"] scylladb = ["dep:rig-scylladb"] sqlite = ["dep:rig-sqlite"] surrealdb = ["dep:rig-surrealdb"] +valkey = ["dep:rig-valkey"] vectorize = ["dep:rig-vectorize"] vertexai = ["dep:rig-vertexai"] audio = ["rig-core/audio"] diff --git a/crates/rig-valkey/Cargo.toml b/crates/rig-valkey/Cargo.toml new file mode 100644 index 000000000..fec4f6e64 --- /dev/null +++ b/crates/rig-valkey/Cargo.toml @@ -0,0 +1,22 @@ +[package] +name = "rig-valkey" +version = "0.1.0" +edition = { workspace = true } +license = "MIT" +readme = "README.md" +description = "Valkey vector store integration for Rig." +repository = "https://github.com/0xPlaygrounds/rig" + +[lints] +workspace = true + +[dependencies] +nanoid = { workspace = true } +redis = { workspace = true, features = ["tokio-comp", "aio"] } +rig-core = { path = "../rig-core", version = "0.37.0", default-features = false } +serde = { workspace = true, features = ["derive"] } +serde_json = { workspace = true } +tracing = { workspace = true } + +[dev-dependencies] +anyhow = { workspace = true } diff --git a/crates/rig-valkey/LICENSE b/crates/rig-valkey/LICENSE new file mode 100644 index 000000000..878b5fbc3 --- /dev/null +++ b/crates/rig-valkey/LICENSE @@ -0,0 +1,7 @@ +Copyright (c) 2024, Playgrounds Analytics Inc. + +Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. diff --git a/crates/rig-valkey/README.md b/crates/rig-valkey/README.md new file mode 100644 index 000000000..d76b81c4f --- /dev/null +++ b/crates/rig-valkey/README.md @@ -0,0 +1,44 @@ +# rig-valkey + +Valkey vector store integration for [Rig](https://github.com/0xPlaygrounds/rig). + +This crate implements the `VectorStoreIndex` and `InsertDocuments` traits from +`rig-core`, backed by Valkey's vector search capabilities (`FT.SEARCH` with KNN). + +## Requirements + +- Valkey 8.0+ with the [valkey-search](https://github.com/valkey-io/valkey-search) module loaded +- A pre-existing FT index on the target keyspace + +## Usage + +```rust,no_run +use rig_valkey::{ValkeyVectorStore, ValkeyVectorStoreConfig}; +use rig_core::{providers::openai, vector_store::VectorStoreIndex, client::EmbeddingsClient}; +use rig_core::vector_store::request::VectorSearchRequest; + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + let redis_client = redis::Client::open("redis://127.0.0.1:6379")?; + let connection = redis_client.get_multiplexed_async_connection().await?; + + let openai_client = openai::Client::from_env()?; + let model = openai_client.embedding_model(openai::TEXT_EMBEDDING_ADA_002); + + let store = ValkeyVectorStore::new( + connection, + model, + "my_index", + ValkeyVectorStoreConfig::default(), + ).await?; + + let req = VectorSearchRequest::builder() + .query("What is machine learning?") + .samples(5) + .build(); + + let results = store.top_n::(req).await?; + println!("{results:?}"); + Ok(()) +} +``` diff --git a/crates/rig-valkey/src/filter.rs b/crates/rig-valkey/src/filter.rs new file mode 100644 index 000000000..a84f8e9d5 --- /dev/null +++ b/crates/rig-valkey/src/filter.rs @@ -0,0 +1,139 @@ +//! Valkey FT.SEARCH filter expression builder. +//! +//! [`ValkeySearchFilter`] implements the [`SearchFilter`] trait, producing +//! filter strings compatible with Valkey's `FT.SEARCH` query syntax. + +use rig_core::vector_store::request::SearchFilter; +use serde::Serialize; + +/// A filter expression for Valkey FT.SEARCH queries. +/// +/// Wraps a string in Valkey's query filter syntax. Combine filters with +/// [`SearchFilter::and`] and [`SearchFilter::or`]. +#[derive(Clone, Debug, Serialize)] +pub struct ValkeySearchFilter(String); + +impl ValkeySearchFilter { + /// Returns the raw filter expression string for use in FT.SEARCH queries. + pub fn as_str(&self) -> &str { + &self.0 + } +} + +impl SearchFilter for ValkeySearchFilter { + type Value = serde_json::Value; + + fn eq(key: impl AsRef, value: Self::Value) -> Self { + let key = sanitize_field_name(key.as_ref()); + let expr = match &value { + serde_json::Value::Number(n) => { + format!("@{key}:[{n} {n}]") + } + serde_json::Value::String(s) => { + let escaped = escape_tag_value(s); + format!("@{key}:{{{escaped}}}") + } + other => { + let s = other.to_string(); + let escaped = escape_tag_value(&s); + format!("@{key}:{{{escaped}}}") + } + }; + Self(expr) + } + + fn gt(key: impl AsRef, value: Self::Value) -> Self { + let key = sanitize_field_name(key.as_ref()); + let n = value_to_numeric(&value); + Self(format!("@{key}:[({n} +inf]")) + } + + fn lt(key: impl AsRef, value: Self::Value) -> Self { + let key = sanitize_field_name(key.as_ref()); + let n = value_to_numeric(&value); + Self(format!("@{key}:[-inf ({n}]")) + } + + fn and(self, rhs: Self) -> Self { + Self(format!("({}) ({})", self.0, rhs.0)) + } + + fn or(self, rhs: Self) -> Self { + Self(format!("({}) | ({})", self.0, rhs.0)) + } +} + +/// Strips any character that isn't alphanumeric or underscore from field names, +/// preventing query injection via the `@field:` syntax. +fn sanitize_field_name(key: &str) -> String { + key.chars() + .filter(|c| c.is_ascii_alphanumeric() || *c == '_') + .collect() +} + +/// Escapes special characters in TAG field values for FT.SEARCH syntax. +fn escape_tag_value(s: &str) -> String { + let mut out = String::with_capacity(s.len()); + for c in s.chars() { + if matches!( + c, + '\\' | ',' + | '.' + | '<' + | '>' + | '{' + | '}' + | '[' + | ']' + | '"' + | '\'' + | ':' + | ';' + | '!' + | '@' + | '#' + | '$' + | '%' + | '^' + | '&' + | '*' + | '(' + | ')' + | '-' + | '+' + | '=' + | '~' + | '|' + | '/' + | '?' + | ' ' + ) { + out.push('\\'); + } + out.push(c); + } + out +} + +/// Extracts a validated numeric string from a JSON value for range queries. +/// Non-numeric strings are treated as 0 to prevent injection. +fn value_to_numeric(value: &serde_json::Value) -> String { + match value { + serde_json::Value::Number(n) => n.to_string(), + serde_json::Value::String(s) => s.parse::().map_or("0".to_string(), |n| n.to_string()), + _ => "0".to_string(), + } +} + +impl From> for ValkeySearchFilter { + fn from(value: rig_core::vector_store::request::Filter) -> Self { + use rig_core::vector_store::request::Filter; + match value { + Filter::Eq(k, v) => ValkeySearchFilter::eq(k, v), + Filter::Gt(k, v) => ValkeySearchFilter::gt(k, v), + Filter::Lt(k, v) => ValkeySearchFilter::lt(k, v), + Filter::And(l, r) => Self::from(*l).and(Self::from(*r)), + Filter::Or(l, r) => Self::from(*l).or(Self::from(*r)), + } + } +} diff --git a/crates/rig-valkey/src/lib.rs b/crates/rig-valkey/src/lib.rs new file mode 100644 index 000000000..cfc19fd0a --- /dev/null +++ b/crates/rig-valkey/src/lib.rs @@ -0,0 +1,438 @@ +//! Valkey vector store integration for Rig. +//! +//! This crate provides [`ValkeyVectorStore`], a Rig vector store index backed +//! by Valkey's vector search capabilities (`FT.SEARCH` with KNN). +//! +//! The root `rig` facade re-exports this crate as `rig::valkey` when the +//! `valkey` feature is enabled. + +mod filter; + +pub use filter::ValkeySearchFilter; + +use redis::aio::MultiplexedConnection; +use rig_core::{ + Embed, OneOrMany, + embeddings::embedding::{Embedding, EmbeddingModel}, + vector_store::{ + InsertDocuments, VectorStoreError, VectorStoreIndex, request::VectorSearchRequest, + }, +}; +use serde::{Deserialize, Serialize}; + +/// Field alias used by FT.SEARCH to return the KNN distance score. +const SCORE_ALIAS: &str = "__score"; + +/// Configuration for [`ValkeyVectorStore`]. +/// +/// Use [`Default::default()`] for standard field names, or customize +/// to match your existing FT index schema. +#[derive(Clone, Debug)] +pub struct ValkeyVectorStoreConfig { + /// Hash key prefix. Keys are stored as `{prefix}:{id}`. + pub id_prefix: String, + /// Hash field name containing the vector embedding bytes. + pub embedding_field: String, + /// Hash field name containing the JSON-serialized document. + pub document_field: String, + /// Hash field name containing the embedded text fragment. + pub embedded_text_field: String, +} + +impl Default for ValkeyVectorStoreConfig { + fn default() -> Self { + Self { + id_prefix: "rig".to_string(), + embedding_field: "embedding".to_string(), + document_field: "document".to_string(), + embedded_text_field: "embedded_text".to_string(), + } + } +} + +/// A vector store backed by Valkey's FT.SEARCH with KNN vector queries. +/// +/// Requires a pre-existing FT index created via `FT.CREATE`. The constructor +/// validates the index exists using `FT.INFO`. +/// +/// # Example +/// ```no_run +/// use rig_valkey::{ValkeyVectorStore, ValkeyVectorStoreConfig}; +/// use rig_core::{providers::openai, vector_store::VectorStoreIndex, client::{EmbeddingsClient, ProviderClient}}; +/// use rig_core::vector_store::request::VectorSearchRequest; +/// +/// # async fn example() -> anyhow::Result<()> { +/// let redis_client = redis::Client::open("redis://127.0.0.1:6379")?; +/// let connection = redis_client.get_multiplexed_async_connection().await?; +/// +/// let openai_client = openai::Client::from_env()?; +/// let model = openai_client.embedding_model(openai::TEXT_EMBEDDING_ADA_002); +/// +/// let store = ValkeyVectorStore::new( +/// connection, +/// model, +/// "my_index", +/// ValkeyVectorStoreConfig::default(), +/// ).await?; +/// +/// let req = VectorSearchRequest::builder() +/// .query("search text") +/// .samples(5) +/// .build(); +/// +/// let results = store.top_n::(req).await?; +/// # Ok(()) +/// # } +/// ``` +pub struct ValkeyVectorStore +where + M: EmbeddingModel, +{ + connection: MultiplexedConnection, + model: M, + index_name: String, + config: ValkeyVectorStoreConfig, +} + +fn valkey_to_rig_error(e: redis::RedisError) -> VectorStoreError { + VectorStoreError::DatastoreError(Box::new(e)) +} + +impl ValkeyVectorStore +where + M: EmbeddingModel, +{ + /// Creates a new [`ValkeyVectorStore`]. + /// + /// Validates that the FT index exists by issuing `FT.INFO`. Returns an error + /// if the index is not found. + pub async fn new( + mut connection: MultiplexedConnection, + model: M, + index_name: &str, + config: ValkeyVectorStoreConfig, + ) -> Result { + // Validate index exists via FT.INFO + let result: Result = redis::cmd("FT.INFO") + .arg(index_name) + .query_async(&mut connection) + .await; + + match result { + Ok(_) => {} + Err(e) => { + let msg = e.to_string(); + if msg.contains("Unknown index name") || msg.contains("unknown command") { + return Err(VectorStoreError::DatastoreError( + format!("Index '{}' not found", index_name).into(), + )); + } + return Err(valkey_to_rig_error(e)); + } + } + + Ok(Self { + connection, + model, + index_name: index_name.to_string(), + config, + }) + } +} + +/// Serializes an embedding vector to little-endian f32 bytes for Valkey VECTOR fields. +/// +/// Valkey's vector search only supports FLOAT32 vectors, so f64 embeddings +/// are narrowed to f32 before byte conversion. +fn embedding_to_bytes(vec: &[f64]) -> Vec { + vec.iter().flat_map(|&f| (f as f32).to_le_bytes()).collect() +} + +impl ValkeyVectorStore +where + M: EmbeddingModel + Sync + Send, +{ + /// Executes a KNN vector search and returns the raw FT.SEARCH response. + async fn execute_knn_search( + &self, + req: &VectorSearchRequest, + return_fields: &[&str], + ) -> Result { + let prompt_embedding = self.model.embed_text(req.query()).await?; + let vec_bytes = embedding_to_bytes(&prompt_embedding.vec); + let samples = req.samples() as usize; + + let base_query = match req.filter() { + Some(f) => f.as_str(), + None => "*", + }; + + // Reject `=>` in filter expressions to prevent KNN injection. + // Also reject `=` followed by whitespace then `>` to prevent bypass. + if base_query.contains("=>") + || base_query + .find('=') + .and_then(|i| base_query[i + 1..].trim_start().chars().next()) + .is_some_and(|c| c == '>') + { + return Err(VectorStoreError::DatastoreError( + "Filter expression must not contain '=>'".into(), + )); + } + + let query_str = format!( + "({base_query})=>[KNN {samples} @{} $vec AS {SCORE_ALIAS}]", + self.config.embedding_field + ); + + let mut cmd = redis::cmd("FT.SEARCH"); + cmd.arg(&self.index_name) + .arg(&query_str) + .arg("PARAMS") + .arg(2) // 2 params: "vec" key + blob value + .arg("vec") + .arg(vec_bytes.as_slice()) + .arg("LIMIT") + .arg(0) + .arg(samples) + .arg("RETURN") + .arg(return_fields.len()); + + for field in return_fields { + cmd.arg(*field); + } + + // DIALECT 2 required for KNN =>[] syntax + cmd.arg("DIALECT").arg(2); + + let mut conn = self.connection.clone(); + cmd.query_async(&mut conn) + .await + .map_err(valkey_to_rig_error) + } +} + +impl VectorStoreIndex for ValkeyVectorStore +where + M: EmbeddingModel + Sync + Send, +{ + type Filter = ValkeySearchFilter; + + async fn top_n Deserialize<'a> + Send>( + &self, + req: VectorSearchRequest, + ) -> Result, VectorStoreError> { + let result = self + .execute_knn_search( + &req, + &[ + SCORE_ALIAS, + &self.config.document_field, + &self.config.embedded_text_field, + ], + ) + .await?; + parse_search_results(&result, &self.config) + } + + async fn top_n_ids( + &self, + req: VectorSearchRequest, + ) -> Result, VectorStoreError> { + let result = self.execute_knn_search(&req, &[SCORE_ALIAS]).await?; + parse_search_ids(&result, &self.config) + } +} + +impl InsertDocuments for ValkeyVectorStore +where + M: EmbeddingModel + Send + Sync, +{ + async fn insert_documents( + &self, + documents: Vec<(Doc, OneOrMany)>, + ) -> Result<(), VectorStoreError> { + let mut conn = self.connection.clone(); + let mut pipe = redis::pipe(); + + for (document, embeddings) in documents { + let json_doc = serde_json::to_string(&document)?; + + for embedding in embeddings.into_iter() { + let id = nanoid::nanoid!(); + let key = format!("{}:{}", self.config.id_prefix, id); + let vec_bytes = embedding_to_bytes(&embedding.vec); + + pipe.cmd("HSET") + .arg(&key) + .arg(&self.config.embedding_field) + .arg(vec_bytes.as_slice()) + .arg(&self.config.document_field) + .arg(&json_doc) + .arg(&self.config.embedded_text_field) + .arg(&embedding.document) + .ignore(); + } + } + + pipe.query_async::<()>(&mut conn) + .await + .map_err(valkey_to_rig_error)?; + + Ok(()) + } +} + +/// A parsed row from FT.SEARCH results. +struct SearchRow { + score: f64, + id: String, + fields: std::collections::HashMap, +} + +/// Parses FT.SEARCH response into rows of (score, id, field_map). +/// +/// FT.SEARCH returns: [total_results, key1, [field, value, ...], key2, [...], ...] +fn parse_search_rows( + value: &redis::Value, + config: &ValkeyVectorStoreConfig, +) -> Result, VectorStoreError> { + let items = match value { + redis::Value::Array(items) => items, + _ => { + return Err(VectorStoreError::DatastoreError( + "Unexpected FT.SEARCH response format".into(), + )); + } + }; + + if items.len() < 2 { + return Ok(Vec::new()); + } + + let mut rows = Vec::new(); + let mut i = 1; + + while i + 1 < items.len() { + let key_val = items.get(i).ok_or_else(|| { + VectorStoreError::DatastoreError("Missing key in FT.SEARCH response".into()) + })?; + let key = extract_string(key_val)?; + let id = strip_prefix(&key, &config.id_prefix); + + let fields_val = items.get(i + 1).ok_or_else(|| { + VectorStoreError::DatastoreError("Missing fields in FT.SEARCH response".into()) + })?; + let fields = match fields_val { + redis::Value::Array(f) => f, + _ => { + i += 2; + continue; + } + }; + + let field_map = fields_to_map(fields); + + // Score normalization: `1.0 - distance` assumes COSINE distance metric. + // COSINE distance ∈ [0, 2], so similarity ∈ [-1, 1]. + let score = field_map + .get(SCORE_ALIAS) + .and_then(|s| s.parse::().ok()) + .map(|d| 1.0 - d) + .ok_or_else(|| { + VectorStoreError::DatastoreError( + "Missing or invalid __score in FT.SEARCH response".into(), + ) + })?; + + rows.push(SearchRow { + score, + id, + fields: field_map, + }); + i += 2; + } + + Ok(rows) +} + +fn log_selected(results: &[(f64, String)]) { + tracing::info!(target: "rig", + "Selected documents: {}", + results.iter() + .map(|(score, id)| format!("{id} ({score})")) + .collect::>() + .join(", ") + ); +} + +fn parse_search_results Deserialize<'a>>( + value: &redis::Value, + config: &ValkeyVectorStoreConfig, +) -> Result, VectorStoreError> { + let rows = parse_search_rows(value, config)?; + let mut results = Vec::new(); + + for row in rows { + if let Some(json_str) = row.fields.get(config.document_field.as_str()) { + let doc: T = serde_json::from_str(json_str).map_err(VectorStoreError::JsonError)?; + results.push((row.score, row.id, doc)); + } + } + + log_selected( + &results + .iter() + .map(|(s, id, _)| (*s, id.clone())) + .collect::>(), + ); + + Ok(results) +} + +fn parse_search_ids( + value: &redis::Value, + config: &ValkeyVectorStoreConfig, +) -> Result, VectorStoreError> { + let rows = parse_search_rows(value, config)?; + let results: Vec<_> = rows.into_iter().map(|row| (row.score, row.id)).collect(); + + log_selected(&results); + + Ok(results) +} + +/// Extracts a String from a redis::Value. +fn extract_string(value: &redis::Value) -> Result { + match value { + redis::Value::BulkString(bytes) => String::from_utf8(bytes.clone()) + .map_err(|e| VectorStoreError::DatastoreError(Box::new(e))), + redis::Value::SimpleString(s) => Ok(s.clone()), + _ => Err(VectorStoreError::DatastoreError( + "Expected string value in FT.SEARCH response".into(), + )), + } +} + +/// Strips the key prefix to extract the document ID. +fn strip_prefix(key: &str, prefix: &str) -> String { + key.strip_prefix(prefix) + .and_then(|s| s.strip_prefix(':')) + .unwrap_or(key) + .to_string() +} + +/// Converts a flat [field, value, field, value, ...] array into a HashMap. +fn fields_to_map(fields: &[redis::Value]) -> std::collections::HashMap { + let mut map = std::collections::HashMap::with_capacity(fields.len() / 2); + let mut j = 0; + while j + 1 < fields.len() { + let key = fields.get(j).and_then(|v| extract_string(v).ok()); + let val = fields.get(j + 1).and_then(|v| extract_string(v).ok()); + if let (Some(k), Some(v)) = (key, val) { + map.insert(k, v); + } + j += 2; + } + map +} diff --git a/sdd/constraints.md b/sdd/constraints.md new file mode 100644 index 000000000..350378ff5 --- /dev/null +++ b/sdd/constraints.md @@ -0,0 +1,183 @@ +# rig-valkey Constraints + +## Hard Constraints + +### Workspace Lint Compliance + +The workspace enforces strict clippy lints in library code: + +```toml +[workspace.lints.clippy] +dbg_macro = "forbid" +await_holding_lock = "deny" +await_holding_refcell_ref = "deny" +expect_used = "deny" +expect_fun_call = "deny" +indexing_slicing = "deny" +panic = "deny" +panic_in_result_fn = "deny" +todo = "forbid" +unimplemented = "forbid" +unreachable = "deny" +unwrap_used = "deny" +``` + +All error paths must use `?` or explicit `Result` handling. No `.unwrap()`, `.expect()`, indexing with `[]`, or `panic!()` in library code. Tests are exempt. + +### WASM Compatibility Bounds + +Per AGENTS.md: use `WasmCompatSend` and `WasmCompatSync` in trait bounds instead of raw `Send`/`Sync`. The trait signatures in rig-core already use these. Even though this crate won't compile to WASM (the `redis` crate is tokio-based), we MUST use `WasmCompatSend`/`WasmCompatSync` in our bounds to satisfy the trait contracts. + +This is consistent with other backend crates (rig-mongodb, rig-qdrant) which also can't target WASM but still use the compat bounds. + +### Trait Signatures (must match exactly) + +```rust +// VectorStoreIndex +type Filter = ValkeySearchFilter; +async fn top_n Deserialize<'a> + Send>(&self, req: VectorSearchRequest) -> Result, VectorStoreError>; +async fn top_n_ids(&self, req: VectorSearchRequest) -> Result, VectorStoreError>; + +// InsertDocuments +async fn insert_documents(&self, documents: Vec<(Doc, OneOrMany)>) -> Result<(), VectorStoreError>; +``` + +### VectorStoreIndexDyn Blanket Impl Requirements + +For the blanket `VectorStoreIndexDyn` impl to apply, `ValkeySearchFilter` must satisfy: + +```rust +F: Debug + Clone + SearchFilter + + Send + Sync + Serialize + Deserialize + 'static +``` + +This means `ValkeySearchFilter::Value` MUST be `serde_json::Value`. + +### No Index Auto-Creation + +The crate must NOT create FT indexes automatically. The constructor validates the index exists (via `FT.INFO`) and returns an error if it doesn't. This follows the established pattern (MongoDB validates, Qdrant expects collection to exist). + +Rationale: Index creation is a schema decision that belongs to the operator, not the application code. + +### AGENTS.md Mandates + +From the repo's `AGENTS.md` (operational rules for this codebase): + +- **Vector stores must live in companion crates** (not in rig-core) ✓ +- **Implement both `top_n` and `top_n_ids`** ✓ +- **Use backend-specific filter type** → `ValkeySearchFilter` +- **Return `VectorStoreError` variants** — no ad hoc string errors +- **Use `WasmCompatSend` and `WasmCompatSync` bounds** (see above) +- **Builder style for configurable types** → `ValkeyVectorStoreConfig` should use builder pattern +- **No TODOs, stubs, placeholder implementations, or speculative APIs** +- **Prefer existing Rig traits/builders/error types** over new abstractions +- **Comments explain why, not what** +- **Full `where` clauses** for complex trait bounds + +## Dependencies + +### Required (runtime) + +| Crate | Version | Features | Notes | +|-------|---------|----------|-------| +| `rig-core` | `0.36.0` | `default-features = false` | Path dep to `../rig-core` | +| `redis` | `1.2.1` (workspace) | `tokio-comp`, `aio` | Already in workspace deps | +| `serde` | workspace | `derive` | Serialization | +| `serde_json` | workspace | — | Document serialization | +| `nanoid` | workspace | — | Key generation | +| `tracing` | workspace | — | Logging | + +### Required (dev) + +| Crate | Version | Features | Notes | +|-------|---------|----------|-------| +| `tokio` | workspace | `macros`, `rt-multi-thread` | Test runtime | +| `testcontainers` | `0.27` (workspace) | — | Valkey container | +| `httpmock` | workspace | — | Mock embedding API | +| `anyhow` | workspace | — | Test error handling | + +### NOT using + +- `valkey-glide` — No Rust client exists. The `redis` crate is protocol-compatible with Valkey. +- `redis` feature `vector-sets` — This is for the VSET commands (experimental), not FT.SEARCH. We use raw `redis::cmd()` for FT.* commands. + +## Valkey Server Requirements + +- **Minimum version**: Valkey 8.0+ (includes valkey-search module with FT.* commands) +- **Required module**: `valkey-search` (provides FT.CREATE, FT.SEARCH, FT.INFO) +- **Docker image for tests**: `valkey/valkey-extensions:8.0` (bundles search, json, bloom modules; the base `valkey/valkey` image does NOT include search) + +## Compatibility Notes + +### redis crate and Valkey + +The `redis` crate (v1.2.1) communicates via RESP protocol which is fully compatible with Valkey. No special configuration needed — just point at a Valkey endpoint instead of Redis. + +### FT.SEARCH Dialect + +We use `DIALECT 2` for KNN vector queries. This is the minimum dialect that supports the `=>[KNN ...]` syntax. + +### Embedding Byte Format + +Valkey VECTOR fields expect raw bytes. For FLOAT64 vectors: +- Each f64 is 8 bytes, little-endian +- Total blob size = `ndims * 8` bytes +- Conversion: `embedding.vec.iter().flat_map(|f| f.to_le_bytes()).collect::>()` + +### Score Semantics + +FT.SEARCH KNN returns a distance score (lower = more similar for COSINE). The rig trait expects similarity scores (higher = better). We must normalize: +- COSINE distance → similarity: `score = 1.0 - distance` + +## Pre-Commit Hooks + +The repo uses `.pre-commit-config.yaml` with the following hooks that run on every commit: + +| Hook | Effect | +|------|--------| +| `trailing-whitespace` | Strips trailing whitespace from all files | +| `end-of-file-fixer` | Ensures files end with a single newline | +| `check-yaml` | Validates YAML syntax | +| `check-added-large-files` | Rejects large binary blobs | +| `check-json` | Validates JSON syntax | +| `check-case-conflict` | Rejects filenames that differ only by case | +| `check-merge-conflict` | Rejects files with merge conflict markers | +| `fmt` | Runs `cargo fmt` — code must be formatted | +| `cargo-check` | Runs `cargo check` — code must compile | +| `clippy` | Runs `clippy` — zero warnings required | +| `commitizen` (commit-msg stage) | Enforces Conventional Commits format | + +**Implications for our work:** + +- All commit messages MUST follow Conventional Commits: `feat(valkey): ...`, `test(valkey): ...`, `docs(valkey): ...` +- Code must pass `cargo fmt`, `cargo check`, and `clippy` before committing +- No trailing whitespace or missing final newlines in any file (including .md, .toml) +- Any JSON fixtures in tests must be valid JSON + +## Contribution Guidelines (from CONTRIBUTING.md) + +- Docstrings on all public items +- Full trait bound syntax (no `impl Trait` in trait bounds where explicit generics are clearer) +- Must include tests or examples +- Conventional Commits format for commit messages +- PR must link to an issue (AEA-422 in our case, but also an upstream GitHub issue) + +## Error Handling Strategy + +All Valkey errors map to `VectorStoreError::DatastoreError(Box)`: + +```rust +fn valkey_to_rig_error(e: redis::RedisError) -> VectorStoreError { + VectorStoreError::DatastoreError(Box::new(e)) +} +``` + +FT.INFO returning "Unknown index name" → `VectorStoreError::DatastoreError("Index not found")` + +## Open Questions + +1. **ID extraction**: When returning results from `top_n`, the document ID is the Hash key suffix (after the prefix). Should we strip the prefix or return the full key? → Strip prefix for consistency with other backends. + +2. **Multiple embeddings per document**: The `InsertDocuments` trait provides `OneOrMany`. Each embedding gets its own Hash key (like MongoDB creates multiple documents). This means one logical document may have N keys. + +3. **Upstream PR strategy**: Should we fork 0xPlaygrounds/rig and PR from there, or create the crate externally first? → Fork and PR, following their contribution guidelines. diff --git a/sdd/design.md b/sdd/design.md new file mode 100644 index 000000000..98e87dee8 --- /dev/null +++ b/sdd/design.md @@ -0,0 +1,165 @@ +# rig-valkey Design Document + +## Overview + +A companion crate `rig-valkey` that implements rig's `VectorStoreIndex` and `InsertDocuments` traits using Valkey's vector search capabilities (FT.CREATE / FT.SEARCH with KNN). + +## Architecture + +``` +┌─────────────────────────────────────────────────┐ +│ User Code │ +│ VectorSearchRequest::builder().query().samples()│ +└────────────────────┬────────────────────────────┘ + │ +┌────────────────────▼────────────────────────────┐ +│ ValkeyVectorStore │ +│ ├── client: redis::aio::MultiplexedConnection │ +│ ├── model: M │ +│ ├── index_name: String │ +│ ├── embedding_field: String │ +│ ├── document_field: String │ +│ └── id_prefix: String │ +└────────────────────┬────────────────────────────┘ + │ +┌────────────────────▼────────────────────────────┐ +│ Valkey Server │ +│ Hash keys: {prefix}:{id} │ +│ Fields: embedding (VECTOR), document (TEXT/JSON)│ +│ FT index over the hash keyspace │ +└─────────────────────────────────────────────────┘ +``` + +## Storage Model + +Each document is stored as a Valkey Hash with the following fields: + +| Hash Field | Type | Content | +|-------------|---------------|--------------------------------------------| +| `embedding` | VECTOR (f64→f32, FLOAT32) | The embedding vector bytes (little-endian) | +| `document` | TEXT | JSON-serialized document payload | +| `embedded_text` | TEXT | The text fragment that was embedded | + +Key naming: `{id_prefix}:{document_id}` where `document_id` is a nanoid or user-supplied ID. + +### FT.CREATE Index Schema (expected to pre-exist) + +``` +FT.CREATE {index_name} + ON HASH PREFIX 1 {id_prefix}: + SCHEMA + embedding VECTOR FLAT 6 + TYPE FLOAT32 + DIM {ndims} + DISTANCE_METRIC COSINE + document TEXT + embedded_text TEXT +``` + +The crate does NOT auto-create the index. It validates the index exists at construction time via `FT.INFO` (following the MongoDB pattern). + +## Public API Surface + +### Struct: `ValkeyVectorStore` + +```rust +pub struct ValkeyVectorStore { + // private fields +} + +impl ValkeyVectorStore { + /// Create a new ValkeyVectorStore. + /// Validates the index exists via FT.INFO. + pub async fn new( + client: redis::aio::MultiplexedConnection, + model: M, + index_name: &str, + config: ValkeyVectorStoreConfig, + ) -> Result; +} +``` + +### Struct: `ValkeyVectorStoreConfig` + +```rust +pub struct ValkeyVectorStoreConfig { + /// Hash key prefix (default: "rig") + pub id_prefix: String, + /// Hash field containing the vector (default: "embedding") + pub embedding_field: String, + /// Hash field containing the serialized document (default: "document") + pub document_field: String, + /// Hash field containing the embedded text fragment (default: "embedded_text") + pub embedded_text_field: String, +} +``` + +### Struct: `ValkeySearchFilter` + +Implements `SearchFilter` for Valkey FT.SEARCH filter syntax. + +```rust +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct ValkeySearchFilter(String); + +impl SearchFilter for ValkeySearchFilter { + type Value = serde_json::Value; + fn eq(key, value) -> Self; // @field:{value} for TAG, @field:[val val] for NUMERIC + fn gt(key, value) -> Self; // @field:[(val +inf] + fn lt(key, value) -> Self; // @field:[(-inf (val] + fn and(self, rhs) -> Self; // (lhs) (rhs) + fn or(self, rhs) -> Self; // (lhs) | (rhs) +} +``` + +### Trait Implementations + +- `VectorStoreIndex for ValkeyVectorStore` — uses `FT.SEARCH` with `KNN` vector query +- `InsertDocuments for ValkeyVectorStore` — uses `HSET` to store document hashes +- `VectorStoreIndexDyn` — provided by blanket impl (Filter type uses `serde_json::Value`) + +## Query Flow (top_n) + +1. Embed query text via `model.embed_text(query)` +2. Serialize embedding to little-endian f32 bytes (f64 values narrowed to f32; Valkey only supports FLOAT32 vectors) +3. Build FT.SEARCH command: + ``` + FT.SEARCH {index_name} + "({filter_expr})=>[KNN {samples} @{embedding_field} $vec AS score]" + PARAMS 2 vec {blob} + LIMIT 0 {samples} + RETURN 3 score document embedded_text + DIALECT 2 + ``` + > **Note:** `SORTBY` is omitted because DIALECT 2 KNN queries return results sorted by distance by default. + + If no filter: use `*` as the base query. +4. Parse results into `Vec<(f64, String, T)>` + +## Insert Flow (insert_documents) + +1. For each `(doc, embeddings)` pair: + - Serialize doc to JSON + - For each embedding in `OneOrMany`: + - Generate a unique key: `{prefix}:{nanoid}` + - `HSET {key} embedding {vec_bytes} document {json} embedded_text {embedding.document}` +2. Use pipelining for batch efficiency + +## Crate Structure + +``` +crates/rig-valkey/ +├── Cargo.toml +├── LICENSE +├── README.md +├── src/ +│ ├── lib.rs # ValkeyVectorStore, config, constructor, trait impls +│ └── filter.rs # ValkeySearchFilter + +tests/integrations/ +└── valkey.rs # testcontainers-based integration tests +``` + +## Feature Flags + +None initially. The crate is opt-in via the workspace `valkey` feature on the root `rig` crate. diff --git a/sdd/evaluation.md b/sdd/evaluation.md new file mode 100644 index 000000000..0262a3999 --- /dev/null +++ b/sdd/evaluation.md @@ -0,0 +1,215 @@ +# rig-valkey Evaluation Harness + +## Test Strategy + +Three layers of testing, matching the patterns established by rig-qdrant and rig-mongodb: + +1. **Unit tests** — Filter serialization, byte conversion, config validation +2. **Integration tests** — Full round-trip against a Valkey container +3. **Compile-time checks** — Trait bound satisfaction, lint compliance + +## Test Infrastructure + +### Valkey Container (testcontainers) + +```rust +use testcontainers::{GenericImage, core::{IntoContainerPort, WaitFor}, runners::AsyncRunner}; + +const VALKEY_PORT: u16 = 6379; + +async fn start_valkey() -> testcontainers::ContainerAsync { + GenericImage::new("valkey/valkey-bundle", "8.0") + .with_wait_for(WaitFor::message_on_stdout("Ready to accept connections")) + .with_exposed_port(VALKEY_PORT.tcp()) + .start() + .await + .expect("Failed to start Valkey container") +} +``` + +**Image**: `valkey/valkey-extensions:8.0` — bundles valkey-search, valkey-json, and valkey-bloom modules. The base `valkey/valkey` image does NOT include the search module. + +### Embedding API Mock (httpmock) + +Mock the OpenAI embeddings endpoint to avoid real API calls in CI: + +```rust +fn mock_embeddings(server: &httpmock::MockServer, input_texts: &[&str], vectors: &[Vec]) { + server.mock(|when, then| { + when.method(httpmock::Method::POST) + .path("/embeddings") + .header("Authorization", "Bearer TEST"); + then.status(200) + .json_body(/* OpenAI-format response with provided vectors */); + }); +} +``` + +This is the exact pattern used by rig-qdrant and rig-mongodb tests. + +### Docker Availability Guard + +```rust +fn skip_if_docker_unavailable(test_name: &str) -> bool { + let docker_socket = std::path::Path::new("/var/run/docker.sock"); + if std::env::var_os("DOCKER_HOST").is_some() || docker_socket.exists() { + return false; + } + eprintln!("skipping {test_name}: Docker is unavailable"); + true +} +``` + +Tests gracefully skip when Docker is not available (e.g., local dev without Docker). + +## Integration Test Plan + +### Test 1: Constructor validates index exists + +``` +Given: A running Valkey instance with NO FT index +When: ValkeyVectorStore::new() is called +Then: Returns Err(VectorStoreError::DatastoreError("Index not found")) +``` + +### Test 2: Constructor succeeds with valid index + +``` +Given: A running Valkey instance with a valid FT index +When: ValkeyVectorStore::new() is called +Then: Returns Ok(ValkeyVectorStore) +``` + +### Test 3: insert_documents stores hashes correctly + +``` +Given: A valid ValkeyVectorStore +When: insert_documents is called with 3 documents +Then: 3 Hash keys exist in Valkey with correct embedding, document, embedded_text fields +``` + +### Test 4: top_n returns correct results (end-to-end) + +``` +Given: 3 documents inserted with distinct embeddings (mocked) +When: top_n is called with a query embedding close to document 3 +Then: Returns document 3 as the top result with score > 0 + Result deserializes correctly into the expected type +``` + +### Test 5: top_n_ids returns IDs only + +``` +Given: Same setup as Test 4 +When: top_n_ids is called +Then: Returns (score, id) tuples without document payloads +``` + +### Test 6: Filter expressions work + +``` +Given: Documents with varying metadata stored as TAG/NUMERIC fields +When: top_n is called with a SearchFilter::eq("category", "science") +Then: Only documents matching the filter are returned +``` + +### Test 7: Threshold filtering + +``` +Given: Documents with varying similarity to query +When: top_n is called with threshold = 0.8 +Then: Only results with similarity >= 0.8 are returned +``` + +### Test 8: ValkeySearchFilter serialization round-trip + +``` +Given: A ValkeySearchFilter constructed via SearchFilter trait methods +When: Serialized to JSON and deserialized back +Then: Produces equivalent filter +``` + +### Test 9: ValkeySearchFilter is usable from crate root + +```rust +// Unit test (no container needed) +#[test] +fn valkey_filter_is_available_from_crate_root() { + let request = VectorSearchRequest::::builder() + .query("test") + .samples(3) + .filter(ValkeySearchFilter::eq("field", json!("value"))) + .build(); + assert!(request.filter().is_some()); +} +``` + +## CI Integration + +### How tests run in upstream CI + +The rig CI workflow runs: +```yaml +cargo nextest run --all-features --retries 2 +``` + +On `ubuntu-latest` with Docker available. Our integration tests will: +- Be gated behind `#[cfg(feature = "valkey")]` in `tests/integrations.rs` +- Use `skip_if_docker_unavailable()` as a safety net +- Pull `valkey/valkey:8.0` container on first run + +### Required additions to workspace + +1. **Root `Cargo.toml`** — Add `rig-valkey` as optional dependency + `valkey` feature +2. **`tests/integrations.rs`** — Add `#[cfg(feature = "valkey")]` module entry +3. **`tests/integrations/valkey.rs`** — The integration test file + +### Clippy + fmt gates + +All code must pass: +```bash +cargo clippy --all-features --all-targets # Zero warnings +cargo fmt -- --check # Formatted +cargo doc --no-deps --all-features # Docs build without warnings +``` + +### Test execution locally + +```bash +# Run just the valkey integration tests +cargo nextest run -p rig-valkey --all-features + +# Or from workspace root with feature +cargo nextest run --features valkey -E 'test(valkey)' +``` + +## Acceptance Criteria Verification Matrix + +| Acceptance Criterion | Verified By | +|---------------------|-------------| +| Crate compiles and passes CI | `cargo clippy --all-features`, `cargo nextest run` | +| VectorStoreIndex with working top_n/top_n_ids | Tests 4, 5 | +| InsertDocuments trait implemented | Test 3 | +| Integration tests against running Valkey | Tests 1-7 (testcontainers) | +| Cookbook examples (valkey-samples repo) | Separate deliverable, not in this PR | + +## Test Data + +Use the same "word definitions" pattern as rig-qdrant tests: + +```rust +#[derive(Embed, Clone, Deserialize, Serialize, Debug)] +struct Word { + id: String, + #[embed] + definition: String, +} +``` + +Three documents with controlled embeddings (via mock) so that similarity ordering is deterministic. + +## Performance Considerations (not blocking, but noted) + +- Pipeline HSET commands for batch inserts (don't await each individually) +- FT.SEARCH with LIMIT to avoid over-fetching +- Connection pooling is the caller's responsibility (they provide the connection) diff --git a/src/lib.rs b/src/lib.rs index dbb1ae78b..da7f36a4b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -138,6 +138,11 @@ pub mod surrealdb { pub use rig_surrealdb::*; } +#[cfg(feature = "valkey")] +pub mod valkey { + pub use rig_valkey::*; +} + #[cfg(feature = "vectorize")] #[cfg_attr(docsrs, doc(cfg(feature = "vectorize")))] pub mod vectorize { diff --git a/tests/integrations.rs b/tests/integrations.rs index ccc219163..043039001 100644 --- a/tests/integrations.rs +++ b/tests/integrations.rs @@ -30,6 +30,9 @@ mod scylladb; #[cfg(feature = "sqlite")] #[path = "integrations/sqlite.rs"] mod sqlite; +#[cfg(feature = "valkey")] +#[path = "integrations/valkey.rs"] +mod valkey; #[cfg(feature = "vectorize")] #[path = "integrations/vectorize.rs"] mod vectorize; diff --git a/tests/integrations/valkey.rs b/tests/integrations/valkey.rs new file mode 100644 index 000000000..363619860 --- /dev/null +++ b/tests/integrations/valkey.rs @@ -0,0 +1,346 @@ +#![allow( + clippy::expect_used, + clippy::indexing_slicing, + clippy::panic, + clippy::unwrap_used, + clippy::unreachable +)] + +use rig::valkey::{ValkeySearchFilter, ValkeyVectorStore, ValkeyVectorStoreConfig}; +use rig::{ + Embed, + client::EmbeddingsClient, + embeddings::EmbeddingsBuilder, + providers::openai, + vector_store::{InsertDocuments, VectorStoreIndex, request::VectorSearchRequest}, +}; +use serde_json::json; +use testcontainers::{ + GenericImage, + core::{IntoContainerPort, WaitFor}, + runners::AsyncRunner, +}; + +const VALKEY_PORT: u16 = 6379; +const INDEX_NAME: &str = "test_index"; +const PREFIX: &str = "rig"; + +fn skip_if_docker_unavailable(test_name: &str) -> bool { + let docker_socket = std::path::Path::new("/var/run/docker.sock"); + if std::env::var_os("DOCKER_HOST").is_some() || docker_socket.exists() { + return false; + } + eprintln!("skipping {test_name}: Docker is unavailable"); + true +} + +#[derive(Embed, Clone, serde::Deserialize, serde::Serialize, Debug, PartialEq)] +struct Word { + id: String, + #[embed] + definition: String, +} + +async fn start_valkey() -> testcontainers::ContainerAsync { + GenericImage::new("valkey/valkey-bundle", "8.0") + .with_wait_for(WaitFor::Duration { + length: std::time::Duration::from_secs(10), + }) + .with_exposed_port(VALKEY_PORT.tcp()) + .start() + .await + .expect("Failed to start Valkey container") +} + +async fn get_connection(host: &str, port: u16) -> redis::aio::MultiplexedConnection { + let client = redis::Client::open(format!("redis://{host}:{port}")) + .expect("Failed to create redis client"); + // Retry connection to handle slow container starts under parallel load + let mut attempts = 0; + loop { + match client.get_multiplexed_async_connection().await { + Ok(conn) => return conn, + Err(e) => { + attempts += 1; + if attempts >= 5 { + panic!("Failed to connect to Valkey after {attempts} attempts: {e}"); + } + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + } + } + } +} + +async fn create_index(conn: &mut redis::aio::MultiplexedConnection) { + let result: Result = redis::cmd("FT.CREATE") + .arg(INDEX_NAME) + .arg("ON") + .arg("HASH") + .arg("PREFIX") + .arg(1) + .arg(format!("{PREFIX}:")) + .arg("SCHEMA") + .arg("embedding") + .arg("VECTOR") + .arg("FLAT") + .arg(6) + .arg("TYPE") + .arg("FLOAT32") + .arg("DIM") + .arg(1536) + .arg("DISTANCE_METRIC") + .arg("COSINE") + .arg("document") + .arg("TEXT") + .arg("embedded_text") + .arg("TEXT") + .query_async(conn) + .await; + + result.expect("Failed to create FT index"); +} + +fn mock_embedding_server() -> httpmock::MockServer { + let server = httpmock::MockServer::start(); + + // Mock for single-input query embedding (matched first due to specificity) + server.mock(|when, then| { + when.method(httpmock::Method::POST) + .path("/embeddings") + .body_includes("What is a linglingdong?"); + then.status(200) + .header("content-type", "application/json") + .json_body(json!({ + "object": "list", + "data": [ + {"object": "embedding", "embedding": vec![0.002_f64; 1536], "index": 0} + ], + "model": "text-embedding-ada-002", + "usage": {"prompt_tokens": 8, "total_tokens": 8} + })); + }); + + // Mock for 3-document bulk embedding + server.mock(|when, then| { + when.method(httpmock::Method::POST) + .path("/embeddings") + .body_includes("flurbo"); + then.status(200) + .header("content-type", "application/json") + .json_body(json!({ + "object": "list", + "data": [ + {"object": "embedding", "embedding": vec![0.1_f64; 1536], "index": 0}, + {"object": "embedding", "embedding": vec![0.2_f64; 1536], "index": 1}, + {"object": "embedding", "embedding": vec![0.002_f64; 1536], "index": 2} + ], + "model": "text-embedding-ada-002", + "usage": {"prompt_tokens": 8, "total_tokens": 8} + })); + }); + + server +} + +fn test_words() -> Vec { + vec![ + Word { + id: "doc0".to_string(), + definition: "Definition of a *flurbo*: A flurbo is a green alien that lives on cold planets".to_string(), + }, + Word { + id: "doc1".to_string(), + definition: "Definition of a *glarb-glarb*: A glarb-glarb is an ancient tool used by the ancestors of the inhabitants of planet Jiro to farm the land.".to_string(), + }, + Word { + id: "doc2".to_string(), + definition: "Definition of a *linglingdong*: A term used by inhabitants of the far side of the moon to describe humans.".to_string(), + }, + ] +} + +struct TestHarness { + _container: testcontainers::ContainerAsync, + store: ValkeyVectorStore, + model: rig::providers::openai::EmbeddingModel, +} + +impl TestHarness { + async fn new() -> Self { + let container = start_valkey().await; + let port = container.get_host_port_ipv4(VALKEY_PORT).await.unwrap(); + let host = container.get_host().await.unwrap().to_string(); + let mut connection = get_connection(&host, port).await; + + create_index(&mut connection).await; + + let server = mock_embedding_server(); + let openai_client = openai::Client::builder() + .api_key("TEST") + .base_url(server.base_url()) + .build() + .unwrap(); + let model = openai_client.embedding_model(openai::TEXT_EMBEDDING_ADA_002); + + let store = ValkeyVectorStore::new( + connection, + model.clone(), + INDEX_NAME, + ValkeyVectorStoreConfig::default(), + ) + .await + .unwrap(); + + Self { + _container: container, + store, + model, + } + } +} + +#[tokio::test] +async fn constructor_fails_without_index() { + if skip_if_docker_unavailable("constructor_fails_without_index") { + return; + } + + let container = start_valkey().await; + let port = container.get_host_port_ipv4(VALKEY_PORT).await.unwrap(); + let host = container.get_host().await.unwrap().to_string(); + let connection = get_connection(&host, port).await; + + let server = mock_embedding_server(); + let openai_client = openai::Client::builder() + .api_key("TEST") + .base_url(server.base_url()) + .build() + .unwrap(); + let model = openai_client.embedding_model(openai::TEXT_EMBEDDING_ADA_002); + + let result = ValkeyVectorStore::new( + connection, + model, + "nonexistent_index", + ValkeyVectorStoreConfig::default(), + ) + .await; + + assert!(result.is_err()); +} + +#[tokio::test] +async fn constructor_succeeds_with_valid_index() { + if skip_if_docker_unavailable("constructor_succeeds_with_valid_index") { + return; + } + + let container = start_valkey().await; + let port = container.get_host_port_ipv4(VALKEY_PORT).await.unwrap(); + let host = container.get_host().await.unwrap().to_string(); + let mut connection = get_connection(&host, port).await; + + create_index(&mut connection).await; + + let server = mock_embedding_server(); + let openai_client = openai::Client::builder() + .api_key("TEST") + .base_url(server.base_url()) + .build() + .unwrap(); + let model = openai_client.embedding_model(openai::TEXT_EMBEDDING_ADA_002); + + let result = ValkeyVectorStore::new( + connection, + model, + INDEX_NAME, + ValkeyVectorStoreConfig::default(), + ) + .await; + + assert!(result.is_ok()); +} + +#[tokio::test] +async fn insert_and_search_test() { + if skip_if_docker_unavailable("insert_and_search_test") { + return; + } + + let harness = TestHarness::new().await; + + let documents = EmbeddingsBuilder::new(harness.model.clone()) + .documents(test_words()) + .unwrap() + .build() + .await + .unwrap(); + + harness.store.insert_documents(documents).await.unwrap(); + + // Small delay for indexing + tokio::time::sleep(std::time::Duration::from_millis(500)).await; + + // Search for linglingdong — its embedding (0.002) is closest to the query (0.002) + let req = VectorSearchRequest::builder() + .query("What is a linglingdong?") + .samples(1) + .build(); + + let results = harness.store.top_n::(req).await.unwrap(); + + assert!(!results.is_empty(), "Expected at least one result"); + + let (score, _id, doc) = &results[0]; + assert!(*score > 0.0, "Score should be positive"); + + // The closest document should be the linglingdong one + let doc_obj: Word = serde_json::from_value(doc.clone()).unwrap(); + assert_eq!(doc_obj.id, "doc2"); +} + +#[tokio::test] +async fn top_n_ids_test() { + if skip_if_docker_unavailable("top_n_ids_test") { + return; + } + + let harness = TestHarness::new().await; + + let documents = EmbeddingsBuilder::new(harness.model.clone()) + .documents(test_words()) + .unwrap() + .build() + .await + .unwrap(); + + harness.store.insert_documents(documents).await.unwrap(); + + tokio::time::sleep(std::time::Duration::from_millis(500)).await; + + let req = VectorSearchRequest::builder() + .query("What is a linglingdong?") + .samples(3) + .build(); + + let results = harness.store.top_n_ids(req).await.unwrap(); + + assert_eq!(results.len(), 3, "Expected 3 results"); + // All scores should be positive (similarity) + for (score, _id) in &results { + assert!(*score > 0.0); + } +} + +#[test] +fn filter_is_available_from_crate_root() { + use rig::vector_store::request::SearchFilter; + + let request = VectorSearchRequest::::builder() + .query("test") + .samples(3) + .filter(ValkeySearchFilter::eq("category", json!("science"))) + .build(); + + assert!(request.filter().is_some()); +}