diff --git a/packages/leann-core/src/leann/api.py b/packages/leann-core/src/leann/api.py index e22633ad..f702cc7f 100644 --- a/packages/leann-core/src/leann/api.py +++ b/packages/leann-core/src/leann/api.py @@ -30,6 +30,15 @@ logger = logging.getLogger(__name__) +# Passage ID schemes recorded in .meta.json["passage_id_scheme"]. +# - "sequential": today's default; IDs are str(insertion_index) (api.py:add_text). +# - "content-hash": planned in #329; IDs are sha256(text)[:16], stable across +# file moves and reorderings. +# Older indexes have no passage_id_scheme field — readers must default to +# "sequential" when the key is absent. See #329 for the rollout plan. +PASSAGE_ID_SCHEME_SEQUENTIAL = "sequential" +PASSAGE_ID_SCHEME_CONTENT_HASH = "content-hash" + def get_registered_backends() -> list[str]: """Get list of registered backend names.""" @@ -376,6 +385,7 @@ def __init__( embedding_options: Optional[dict[str, Any]] = None, prebuild_bm25: bool = False, bm25_backend: str = "fts5", + passage_id_scheme: str = PASSAGE_ID_SCHEME_SEQUENTIAL, **backend_kwargs, ): if bm25_backend != "fts5": @@ -383,7 +393,22 @@ def __init__( bm25_backend = "fts5" self.bm25_backend = bm25_backend self.prebuild_bm25 = prebuild_bm25 or bm25_backend == "fts5" + if passage_id_scheme not in ( + PASSAGE_ID_SCHEME_SEQUENTIAL, + PASSAGE_ID_SCHEME_CONTENT_HASH, + ): + raise ValueError( + f"Unknown passage_id_scheme: {passage_id_scheme!r}. " + f"Expected one of: {PASSAGE_ID_SCHEME_SEQUENTIAL!r}, " + f"{PASSAGE_ID_SCHEME_CONTENT_HASH!r}." + ) + self.passage_id_scheme = passage_id_scheme self.backend_name = backend_name + if backend_name == "diskann" and passage_id_scheme == PASSAGE_ID_SCHEME_CONTENT_HASH: + raise ValueError( + "passage_id_scheme='content-hash' is not supported by the DiskANN backend yet " + "because DiskANN search returns numeric labels without a persisted passage ID map." + ) # Normalize incompatible combinations early (for consistent metadata) if backend_name == "hnsw": is_recompute = backend_kwargs.get("is_recompute", True) @@ -477,10 +502,37 @@ def __init__( self.backend_kwargs = backend_kwargs self.chunks: list[dict[str, Any]] = [] + @staticmethod + def _make_unique_passage_id(base_id: str, reserved_ids: set[str]) -> str: + if base_id not in reserved_ids: + return base_id + suffix = 1 + while f"{base_id}-{suffix}" in reserved_ids: + suffix += 1 + return f"{base_id}-{suffix}" + + def _generate_passage_id(self, text: str, reserved_ids: Optional[set[str]] = None) -> str: + """Generate a passage ID per the configured scheme. + + sequential: str(insertion index) — fast, position-dependent, current default. + content-hash: sha256(text)[:16] — content-stable across file moves and + reorderings when text is unique. Duplicate text receives a numeric suffix + so the passage store, ID maps, and vector labels stay one-to-one. + """ + if self.passage_id_scheme == PASSAGE_ID_SCHEME_CONTENT_HASH: + import hashlib + + base_id = hashlib.sha256(text.encode("utf-8")).hexdigest()[:16] + return self._make_unique_passage_id(base_id, reserved_ids or set()) + return str(len(self.chunks)) + def add_text(self, text: str, metadata: Optional[dict[str, Any]] = None): if metadata is None: metadata = {} - passage_id = metadata.get("id", str(len(self.chunks))) + if "id" in metadata and metadata["id"] is not None: + passage_id = str(metadata["id"]) + else: + passage_id = self._generate_passage_id(text, {chunk["id"] for chunk in self.chunks}) chunk_data = {"id": passage_id, "text": text, "metadata": metadata} self.chunks.append(chunk_data) @@ -570,12 +622,13 @@ def build_index(self, index_path: str): builder_instance.build(embeddings, string_ids, index_path, **current_backend_kwargs) leann_meta_path = index_dir / f"{index_name}.meta.json" meta_data = { - "version": "1.0", + "version": "1.1", "backend_name": self.backend_name, "embedding_model": self.embedding_model, "dimensions": self.dimensions, "backend_kwargs": self.backend_kwargs, "embedding_mode": self.embedding_mode, + "passage_id_scheme": self.passage_id_scheme, "passage_sources": [ { "type": "jsonl", @@ -611,8 +664,7 @@ def _build_bm25_fts5(self, index_dir: Path, index_name: str) -> None: """Build a SQLite FTS5 BM25 index alongside the vector index. Queries via SQLite's bm25() function — memory-bounded at search time - (the term/posting data lives on disk, not in RAM). Replaces - BM25Scorer's full-corpus-in-memory model for paper-scale corpora. + because the term/posting data lives on disk, not in RAM. """ db_path = index_dir / f"{index_name}.bm25.sqlite" index = Fts5BM25Index(str(db_path)) @@ -714,12 +766,13 @@ def build_index_from_arrays(self, index_path: str, ids: list, embeddings: np.nda # Create metadata file leann_meta_path = index_dir / f"{index_name}.meta.json" meta_data = { - "version": "1.0", + "version": "1.1", "backend_name": self.backend_name, "embedding_model": self.embedding_model, "dimensions": self.dimensions, "backend_kwargs": self.backend_kwargs, "embedding_mode": self.embedding_mode, + "passage_id_scheme": self.passage_id_scheme, "passage_sources": [ { "type": "jsonl", @@ -831,6 +884,7 @@ def update_index(self, index_path: str, remove_passage_ids: Optional[list[str]] with open(offset_file, "rb") as f: offset_map: dict[str, int] = pickle.load(f) existing_ids = set(offset_map.keys()) + index_passage_id_scheme = meta.get("passage_id_scheme", PASSAGE_ID_SCHEME_SEQUENTIAL) # IVF: optional delete (for reindex / file-change: remove then re-insert) if remove_passage_ids and backend_name == "ivf": @@ -881,14 +935,30 @@ def update_index(self, index_path: str, remove_passage_ids: Optional[list[str]] ) valid_chunks: list[dict[str, Any]] = [] + reserved_ids = set(existing_ids) for chunk in self.chunks: text = chunk.get("text", "") if not isinstance(text, str) or not text.strip(): continue metadata = chunk.setdefault("metadata", {}) - passage_id = chunk.get("id") or metadata.get("id") - if passage_id and passage_id in existing_ids: + explicit_id = ( + str(metadata["id"]) if "id" in metadata and metadata["id"] is not None else None + ) + if explicit_id is not None: + passage_id = explicit_id + elif index_passage_id_scheme == PASSAGE_ID_SCHEME_CONTENT_HASH: + import hashlib + + base_id = hashlib.sha256(text.encode("utf-8")).hexdigest()[:16] + passage_id = self._make_unique_passage_id(base_id, reserved_ids) + else: + passage_id = None + + if passage_id is not None and passage_id in reserved_ids: raise ValueError(f"Passage ID '{passage_id}' already exists in the index.") + if passage_id is not None: + chunk["id"] = passage_id + reserved_ids.add(passage_id) valid_chunks.append(chunk) if not valid_chunks: @@ -925,9 +995,14 @@ def update_index(self, index_path: str, remove_passage_ids: Optional[list[str]] # IVF: add_vectors then append passages/offset (no ZMQ/server) if backend_name == "ivf": for i, chunk in enumerate(valid_chunks): - pid = chunk.get("id") or chunk.get("metadata", {}).get("id") - if not pid: - pid = str(len(offset_map) + i) + if index_passage_id_scheme == PASSAGE_ID_SCHEME_CONTENT_HASH: + pid = str(chunk["id"]) if "id" in chunk else None + else: + explicit_metadata_id = chunk.get("metadata", {}).get("id") + pid = str(explicit_metadata_id) if explicit_metadata_id is not None else None + if pid is None: + pid = self._make_unique_passage_id(str(len(offset_map) + i), reserved_ids) + reserved_ids.add(pid) chunk.setdefault("metadata", {})["id"] = pid chunk["id"] = pid passage_ids = [c["id"] for c in valid_chunks] @@ -1012,11 +1087,12 @@ def update_index(self, index_path: str, remove_passage_ids: Optional[list[str]] passage_meta_mode = meta.get("embedding_mode", self.embedding_mode) passage_provider_options = meta.get("embedding_options", self.embedding_options) - base_id = index.ntotal - for offset, chunk in enumerate(valid_chunks): - new_id = str(base_id + offset) - chunk.setdefault("metadata", {})["id"] = new_id - chunk["id"] = new_id + if index_passage_id_scheme != PASSAGE_ID_SCHEME_CONTENT_HASH: + base_id = index.ntotal + for offset, chunk in enumerate(valid_chunks): + new_id = str(base_id + offset) + chunk.setdefault("metadata", {})["id"] = new_id + chunk["id"] = new_id # Append passages/offsets before we attempt index.add so the ZMQ server # can resolve newly assigned IDs during recompute. Keep rollback hooks diff --git a/packages/leann-core/src/leann/cli.py b/packages/leann-core/src/leann/cli.py index 0d50cf34..c07b9510 100644 --- a/packages/leann-core/src/leann/cli.py +++ b/packages/leann-core/src/leann/cli.py @@ -347,6 +347,16 @@ def create_parser(self) -> argparse.ArgumentParser: default=True, help="Fall back to traditional chunking if AST chunking fails (default: True)", ) + build_parser.add_argument( + "--id-scheme", + choices=["sequential", "content-hash"], + default="sequential", + help=( + "How passage IDs are assigned. 'sequential' (default) keys by insertion " + "order; 'content-hash' uses sha256(text)[:16], stable across file moves " + "and reorderings. See #329." + ), + ) # Watch command watch_parser = subparsers.add_parser( @@ -1891,6 +1901,7 @@ def _make_incremental_builder(self, args) -> "LeannBuilder": is_compact=args.compact, is_recompute=args.recompute, num_threads=args.num_threads, + passage_id_scheme=getattr(args, "id_scheme", "sequential"), ) def _incremental_add_only( @@ -2384,6 +2395,7 @@ async def build_index(self, args): is_compact=args.compact, is_recompute=args.recompute, num_threads=args.num_threads, + passage_id_scheme=getattr(args, "id_scheme", "sequential"), ) for chunk in all_texts: diff --git a/tests/test_passage_id_scheme.py b/tests/test_passage_id_scheme.py new file mode 100644 index 00000000..51257bef --- /dev/null +++ b/tests/test_passage_id_scheme.py @@ -0,0 +1,147 @@ +import hashlib +import json +import pickle +import sys +from types import ModuleType +from typing import Any, cast + +import numpy as np +import pytest +from leann.api import LeannBuilder +from leann.registry import BACKEND_REGISTRY + + +def _content_id(text: str) -> str: + return hashlib.sha256(text.encode("utf-8")).hexdigest()[:16] + + +@pytest.fixture(autouse=True) +def _register_minimal_backends(monkeypatch): + registry = dict(BACKEND_REGISTRY) + registry["hnsw"] = cast(Any, object()) + registry["ivf"] = cast(Any, object()) + monkeypatch.setattr("leann.api.BACKEND_REGISTRY", registry) + + +def _write_minimal_ivf_index(tmp_path, index_name: str, passages: list[dict]) -> str: + index_path = tmp_path / index_name + passages_file = tmp_path / f"{index_name}.passages.jsonl" + offset_file = tmp_path / f"{index_name}.passages.idx" + meta_file = tmp_path / f"{index_name}.meta.json" + index_file = tmp_path / f"{index_path.stem}.index" + + offset_map = {} + with open(passages_file, "w", encoding="utf-8") as f: + for passage in passages: + offset_map[passage["id"]] = f.tell() + json.dump(passage, f) + f.write("\n") + with open(offset_file, "wb") as f: + pickle.dump(offset_map, f) + with open(meta_file, "w", encoding="utf-8") as f: + json.dump( + { + "backend_name": "ivf", + "backend_kwargs": {"distance_metric": "mips"}, + "dimensions": 2, + "passage_id_scheme": "content-hash", + "total_passages": len(passages), + }, + f, + ) + index_file.write_bytes(b"fake-index") + return str(index_path) + + +def _install_fake_ivf_backend(monkeypatch, captured_passage_ids: list[list[str]]) -> None: + class FakeIvfModule(ModuleType): + def add_vectors(self, _index_path, _embeddings, passage_ids): + captured_passage_ids.append(list(passage_ids)) + + fake_ivf = FakeIvfModule("leann_backend_ivf") + monkeypatch.setitem(sys.modules, "leann_backend_ivf", fake_ivf) + + +def test_builder_content_hash_passage_ids_are_unique_for_duplicate_text(): + builder = LeannBuilder(backend_name="hnsw", passage_id_scheme="content-hash") + + builder.add_text("same text", metadata={"source": "a.txt"}) + builder.add_text("same text", metadata={"source": "b.txt"}) + builder.add_text("same text", metadata={"source": "c.txt"}) + builder.add_text("different text", metadata={"source": "d.txt"}) + + same_id = _content_id("same text") + assert [chunk["id"] for chunk in builder.chunks] == [ + same_id, + f"{same_id}-1", + f"{same_id}-2", + _content_id("different text"), + ] + + +def test_builder_respects_explicit_metadata_id_with_content_hash_scheme(): + builder = LeannBuilder(backend_name="hnsw", passage_id_scheme="content-hash") + + builder.add_text("same text", metadata={"id": "explicit-id", "source": "a.txt"}) + + assert builder.chunks[0]["id"] == "explicit-id" + + +def test_builder_preserves_falsy_explicit_metadata_ids(): + builder = LeannBuilder(backend_name="hnsw", passage_id_scheme="content-hash") + + builder.add_text("zero id", metadata={"id": 0, "source": "a.txt"}) + builder.add_text("empty id", metadata={"id": "", "source": "b.txt"}) + + assert builder.chunks[0]["id"] == "0" + assert builder.chunks[1]["id"] == "" + + +def test_builder_rejects_content_hash_diskann_until_id_map_exists(): + with pytest.raises(ValueError, match="not supported by the DiskANN backend"): + LeannBuilder(backend_name="diskann", passage_id_scheme="content-hash") + + +def test_ivf_update_content_hash_suffixes_existing_duplicate(tmp_path, monkeypatch): + base_id = _content_id("same text") + index_path = _write_minimal_ivf_index( + tmp_path, + "docs.leann", + [{"id": base_id, "text": "same text", "metadata": {"source": "old.txt"}}], + ) + captured_passage_ids = [] + _install_fake_ivf_backend(monkeypatch, captured_passage_ids) + monkeypatch.setattr( + "leann.api.compute_embeddings", + lambda *_args, **_kwargs: np.ones((1, 2), dtype=np.float32), + ) + + builder = LeannBuilder(backend_name="ivf", passage_id_scheme="content-hash") + builder.add_text("same text", metadata={"source": "new.txt"}) + builder.update_index(index_path) + + suffixed_id = f"{base_id}-1" + assert captured_passage_ids == [[suffixed_id]] + with open(tmp_path / "docs.leann.passages.idx", "rb") as f: + offset_map = pickle.load(f) + assert set(offset_map) == {base_id, suffixed_id} + + +def test_ivf_update_preserves_falsy_explicit_metadata_ids(tmp_path, monkeypatch): + index_path = _write_minimal_ivf_index(tmp_path, "docs.leann", []) + captured_passage_ids = [] + _install_fake_ivf_backend(monkeypatch, captured_passage_ids) + monkeypatch.setattr( + "leann.api.compute_embeddings", + lambda *_args, **_kwargs: np.ones((2, 2), dtype=np.float32), + ) + + builder = LeannBuilder(backend_name="ivf", passage_id_scheme="content-hash") + builder.add_text("zero id", metadata={"id": 0}) + builder.add_text("empty id", metadata={"id": ""}) + builder.update_index(index_path) + + assert captured_passage_ids == [["0", ""]] + with open(tmp_path / "docs.leann.passages.idx", "rb") as f: + offset_map = pickle.load(f) + assert set(offset_map) == {"0", ""}