Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 91 additions & 15 deletions packages/leann-core/src/leann/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,15 @@

logger = logging.getLogger(__name__)

# Passage ID schemes recorded in <index>.meta.json["passage_id_scheme"].
# - "sequential": today's default; IDs are str(insertion_index) (api.py:add_text).
# - "content-hash": planned in #329; IDs are sha256(text)[:16], stable across
# file moves and reorderings.
# Older indexes have no passage_id_scheme field — readers must default to
# "sequential" when the key is absent. See #329 for the rollout plan.
PASSAGE_ID_SCHEME_SEQUENTIAL = "sequential"
PASSAGE_ID_SCHEME_CONTENT_HASH = "content-hash"


def get_registered_backends() -> list[str]:
"""Get list of registered backend names."""
Expand Down Expand Up @@ -376,14 +385,30 @@ def __init__(
embedding_options: Optional[dict[str, Any]] = None,
prebuild_bm25: bool = False,
bm25_backend: str = "fts5",
passage_id_scheme: str = PASSAGE_ID_SCHEME_SEQUENTIAL,
**backend_kwargs,
):
if bm25_backend != "fts5":
logger.warning(f"bm25_backend={bm25_backend!r} is deprecated; using 'fts5'.")
bm25_backend = "fts5"
self.bm25_backend = bm25_backend
self.prebuild_bm25 = prebuild_bm25 or bm25_backend == "fts5"
if passage_id_scheme not in (
PASSAGE_ID_SCHEME_SEQUENTIAL,
PASSAGE_ID_SCHEME_CONTENT_HASH,
):
raise ValueError(
f"Unknown passage_id_scheme: {passage_id_scheme!r}. "
f"Expected one of: {PASSAGE_ID_SCHEME_SEQUENTIAL!r}, "
f"{PASSAGE_ID_SCHEME_CONTENT_HASH!r}."
)
self.passage_id_scheme = passage_id_scheme
self.backend_name = backend_name
if backend_name == "diskann" and passage_id_scheme == PASSAGE_ID_SCHEME_CONTENT_HASH:
raise ValueError(
"passage_id_scheme='content-hash' is not supported by the DiskANN backend yet "
"because DiskANN search returns numeric labels without a persisted passage ID map."
)
# Normalize incompatible combinations early (for consistent metadata)
if backend_name == "hnsw":
is_recompute = backend_kwargs.get("is_recompute", True)
Expand Down Expand Up @@ -477,10 +502,37 @@ def __init__(
self.backend_kwargs = backend_kwargs
self.chunks: list[dict[str, Any]] = []

@staticmethod
def _make_unique_passage_id(base_id: str, reserved_ids: set[str]) -> str:
if base_id not in reserved_ids:
return base_id
suffix = 1
while f"{base_id}-{suffix}" in reserved_ids:
suffix += 1
return f"{base_id}-{suffix}"

def _generate_passage_id(self, text: str, reserved_ids: Optional[set[str]] = None) -> str:
"""Generate a passage ID per the configured scheme.

sequential: str(insertion index) — fast, position-dependent, current default.
content-hash: sha256(text)[:16] — content-stable across file moves and
reorderings when text is unique. Duplicate text receives a numeric suffix
so the passage store, ID maps, and vector labels stay one-to-one.
"""
if self.passage_id_scheme == PASSAGE_ID_SCHEME_CONTENT_HASH:
import hashlib

base_id = hashlib.sha256(text.encode("utf-8")).hexdigest()[:16]
return self._make_unique_passage_id(base_id, reserved_ids or set())
return str(len(self.chunks))

def add_text(self, text: str, metadata: Optional[dict[str, Any]] = None):
if metadata is None:
metadata = {}
passage_id = metadata.get("id", str(len(self.chunks)))
if "id" in metadata and metadata["id"] is not None:
passage_id = str(metadata["id"])
else:
passage_id = self._generate_passage_id(text, {chunk["id"] for chunk in self.chunks})
chunk_data = {"id": passage_id, "text": text, "metadata": metadata}
self.chunks.append(chunk_data)

Expand Down Expand Up @@ -570,12 +622,13 @@ def build_index(self, index_path: str):
builder_instance.build(embeddings, string_ids, index_path, **current_backend_kwargs)
leann_meta_path = index_dir / f"{index_name}.meta.json"
meta_data = {
"version": "1.0",
"version": "1.1",
"backend_name": self.backend_name,
"embedding_model": self.embedding_model,
"dimensions": self.dimensions,
"backend_kwargs": self.backend_kwargs,
"embedding_mode": self.embedding_mode,
"passage_id_scheme": self.passage_id_scheme,
"passage_sources": [
{
"type": "jsonl",
Expand Down Expand Up @@ -611,8 +664,7 @@ def _build_bm25_fts5(self, index_dir: Path, index_name: str) -> None:
"""Build a SQLite FTS5 BM25 index alongside the vector index.

Queries via SQLite's bm25() function — memory-bounded at search time
(the term/posting data lives on disk, not in RAM). Replaces
BM25Scorer's full-corpus-in-memory model for paper-scale corpora.
because the term/posting data lives on disk, not in RAM.
"""
db_path = index_dir / f"{index_name}.bm25.sqlite"
index = Fts5BM25Index(str(db_path))
Expand Down Expand Up @@ -714,12 +766,13 @@ def build_index_from_arrays(self, index_path: str, ids: list, embeddings: np.nda
# Create metadata file
leann_meta_path = index_dir / f"{index_name}.meta.json"
meta_data = {
"version": "1.0",
"version": "1.1",
"backend_name": self.backend_name,
"embedding_model": self.embedding_model,
"dimensions": self.dimensions,
"backend_kwargs": self.backend_kwargs,
"embedding_mode": self.embedding_mode,
"passage_id_scheme": self.passage_id_scheme,
"passage_sources": [
{
"type": "jsonl",
Expand Down Expand Up @@ -831,6 +884,7 @@ def update_index(self, index_path: str, remove_passage_ids: Optional[list[str]]
with open(offset_file, "rb") as f:
offset_map: dict[str, int] = pickle.load(f)
existing_ids = set(offset_map.keys())
index_passage_id_scheme = meta.get("passage_id_scheme", PASSAGE_ID_SCHEME_SEQUENTIAL)

# IVF: optional delete (for reindex / file-change: remove then re-insert)
if remove_passage_ids and backend_name == "ivf":
Expand Down Expand Up @@ -881,14 +935,30 @@ def update_index(self, index_path: str, remove_passage_ids: Optional[list[str]]
)

valid_chunks: list[dict[str, Any]] = []
reserved_ids = set(existing_ids)
for chunk in self.chunks:
text = chunk.get("text", "")
if not isinstance(text, str) or not text.strip():
continue
metadata = chunk.setdefault("metadata", {})
passage_id = chunk.get("id") or metadata.get("id")
if passage_id and passage_id in existing_ids:
explicit_id = (
str(metadata["id"]) if "id" in metadata and metadata["id"] is not None else None
)
if explicit_id is not None:
passage_id = explicit_id
elif index_passage_id_scheme == PASSAGE_ID_SCHEME_CONTENT_HASH:
import hashlib

base_id = hashlib.sha256(text.encode("utf-8")).hexdigest()[:16]
passage_id = self._make_unique_passage_id(base_id, reserved_ids)
else:
passage_id = None

if passage_id is not None and passage_id in reserved_ids:
raise ValueError(f"Passage ID '{passage_id}' already exists in the index.")
if passage_id is not None:
chunk["id"] = passage_id
reserved_ids.add(passage_id)
valid_chunks.append(chunk)

if not valid_chunks:
Expand Down Expand Up @@ -925,9 +995,14 @@ def update_index(self, index_path: str, remove_passage_ids: Optional[list[str]]
# IVF: add_vectors then append passages/offset (no ZMQ/server)
if backend_name == "ivf":
for i, chunk in enumerate(valid_chunks):
pid = chunk.get("id") or chunk.get("metadata", {}).get("id")
if not pid:
pid = str(len(offset_map) + i)
if index_passage_id_scheme == PASSAGE_ID_SCHEME_CONTENT_HASH:
pid = str(chunk["id"]) if "id" in chunk else None
else:
explicit_metadata_id = chunk.get("metadata", {}).get("id")
pid = str(explicit_metadata_id) if explicit_metadata_id is not None else None
if pid is None:
pid = self._make_unique_passage_id(str(len(offset_map) + i), reserved_ids)
reserved_ids.add(pid)
chunk.setdefault("metadata", {})["id"] = pid
chunk["id"] = pid
passage_ids = [c["id"] for c in valid_chunks]
Expand Down Expand Up @@ -1012,11 +1087,12 @@ def update_index(self, index_path: str, remove_passage_ids: Optional[list[str]]
passage_meta_mode = meta.get("embedding_mode", self.embedding_mode)
passage_provider_options = meta.get("embedding_options", self.embedding_options)

base_id = index.ntotal
for offset, chunk in enumerate(valid_chunks):
new_id = str(base_id + offset)
chunk.setdefault("metadata", {})["id"] = new_id
chunk["id"] = new_id
if index_passage_id_scheme != PASSAGE_ID_SCHEME_CONTENT_HASH:
base_id = index.ntotal
for offset, chunk in enumerate(valid_chunks):
new_id = str(base_id + offset)
chunk.setdefault("metadata", {})["id"] = new_id
chunk["id"] = new_id

# Append passages/offsets before we attempt index.add so the ZMQ server
# can resolve newly assigned IDs during recompute. Keep rollback hooks
Expand Down
12 changes: 12 additions & 0 deletions packages/leann-core/src/leann/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,16 @@ def create_parser(self) -> argparse.ArgumentParser:
default=True,
help="Fall back to traditional chunking if AST chunking fails (default: True)",
)
build_parser.add_argument(
"--id-scheme",
choices=["sequential", "content-hash"],
default="sequential",
help=(
"How passage IDs are assigned. 'sequential' (default) keys by insertion "
"order; 'content-hash' uses sha256(text)[:16], stable across file moves "
"and reorderings. See #329."
),
)

# Watch command
watch_parser = subparsers.add_parser(
Expand Down Expand Up @@ -1891,6 +1901,7 @@ def _make_incremental_builder(self, args) -> "LeannBuilder":
is_compact=args.compact,
is_recompute=args.recompute,
num_threads=args.num_threads,
passage_id_scheme=getattr(args, "id_scheme", "sequential"),
)

def _incremental_add_only(
Expand Down Expand Up @@ -2384,6 +2395,7 @@ async def build_index(self, args):
is_compact=args.compact,
is_recompute=args.recompute,
num_threads=args.num_threads,
passage_id_scheme=getattr(args, "id_scheme", "sequential"),
)

for chunk in all_texts:
Expand Down
147 changes: 147 additions & 0 deletions tests/test_passage_id_scheme.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
import hashlib
import json
import pickle
import sys
from types import ModuleType
from typing import Any, cast

import numpy as np
import pytest
from leann.api import LeannBuilder
from leann.registry import BACKEND_REGISTRY


def _content_id(text: str) -> str:
return hashlib.sha256(text.encode("utf-8")).hexdigest()[:16]


@pytest.fixture(autouse=True)
def _register_minimal_backends(monkeypatch):
registry = dict(BACKEND_REGISTRY)
registry["hnsw"] = cast(Any, object())
registry["ivf"] = cast(Any, object())
monkeypatch.setattr("leann.api.BACKEND_REGISTRY", registry)


def _write_minimal_ivf_index(tmp_path, index_name: str, passages: list[dict]) -> str:
index_path = tmp_path / index_name
passages_file = tmp_path / f"{index_name}.passages.jsonl"
offset_file = tmp_path / f"{index_name}.passages.idx"
meta_file = tmp_path / f"{index_name}.meta.json"
index_file = tmp_path / f"{index_path.stem}.index"

offset_map = {}
with open(passages_file, "w", encoding="utf-8") as f:
for passage in passages:
offset_map[passage["id"]] = f.tell()
json.dump(passage, f)
f.write("\n")
with open(offset_file, "wb") as f:
pickle.dump(offset_map, f)
with open(meta_file, "w", encoding="utf-8") as f:
json.dump(
{
"backend_name": "ivf",
"backend_kwargs": {"distance_metric": "mips"},
"dimensions": 2,
"passage_id_scheme": "content-hash",
"total_passages": len(passages),
},
f,
)
index_file.write_bytes(b"fake-index")
return str(index_path)


def _install_fake_ivf_backend(monkeypatch, captured_passage_ids: list[list[str]]) -> None:
class FakeIvfModule(ModuleType):
def add_vectors(self, _index_path, _embeddings, passage_ids):
captured_passage_ids.append(list(passage_ids))

fake_ivf = FakeIvfModule("leann_backend_ivf")
monkeypatch.setitem(sys.modules, "leann_backend_ivf", fake_ivf)


def test_builder_content_hash_passage_ids_are_unique_for_duplicate_text():
builder = LeannBuilder(backend_name="hnsw", passage_id_scheme="content-hash")

builder.add_text("same text", metadata={"source": "a.txt"})
builder.add_text("same text", metadata={"source": "b.txt"})
builder.add_text("same text", metadata={"source": "c.txt"})
builder.add_text("different text", metadata={"source": "d.txt"})

same_id = _content_id("same text")
assert [chunk["id"] for chunk in builder.chunks] == [
same_id,
f"{same_id}-1",
f"{same_id}-2",
_content_id("different text"),
]


def test_builder_respects_explicit_metadata_id_with_content_hash_scheme():
builder = LeannBuilder(backend_name="hnsw", passage_id_scheme="content-hash")

builder.add_text("same text", metadata={"id": "explicit-id", "source": "a.txt"})

assert builder.chunks[0]["id"] == "explicit-id"


def test_builder_preserves_falsy_explicit_metadata_ids():
builder = LeannBuilder(backend_name="hnsw", passage_id_scheme="content-hash")

builder.add_text("zero id", metadata={"id": 0, "source": "a.txt"})
builder.add_text("empty id", metadata={"id": "", "source": "b.txt"})

assert builder.chunks[0]["id"] == "0"
assert builder.chunks[1]["id"] == ""


def test_builder_rejects_content_hash_diskann_until_id_map_exists():
with pytest.raises(ValueError, match="not supported by the DiskANN backend"):
LeannBuilder(backend_name="diskann", passage_id_scheme="content-hash")


def test_ivf_update_content_hash_suffixes_existing_duplicate(tmp_path, monkeypatch):
base_id = _content_id("same text")
index_path = _write_minimal_ivf_index(
tmp_path,
"docs.leann",
[{"id": base_id, "text": "same text", "metadata": {"source": "old.txt"}}],
)
captured_passage_ids = []
_install_fake_ivf_backend(monkeypatch, captured_passage_ids)
monkeypatch.setattr(
"leann.api.compute_embeddings",
lambda *_args, **_kwargs: np.ones((1, 2), dtype=np.float32),
)

builder = LeannBuilder(backend_name="ivf", passage_id_scheme="content-hash")
builder.add_text("same text", metadata={"source": "new.txt"})
builder.update_index(index_path)

suffixed_id = f"{base_id}-1"
assert captured_passage_ids == [[suffixed_id]]
with open(tmp_path / "docs.leann.passages.idx", "rb") as f:
offset_map = pickle.load(f)
assert set(offset_map) == {base_id, suffixed_id}


def test_ivf_update_preserves_falsy_explicit_metadata_ids(tmp_path, monkeypatch):
index_path = _write_minimal_ivf_index(tmp_path, "docs.leann", [])
captured_passage_ids = []
_install_fake_ivf_backend(monkeypatch, captured_passage_ids)
monkeypatch.setattr(
"leann.api.compute_embeddings",
lambda *_args, **_kwargs: np.ones((2, 2), dtype=np.float32),
)

builder = LeannBuilder(backend_name="ivf", passage_id_scheme="content-hash")
builder.add_text("zero id", metadata={"id": 0})
builder.add_text("empty id", metadata={"id": ""})
builder.update_index(index_path)

assert captured_passage_ids == [["0", ""]]
with open(tmp_path / "docs.leann.passages.idx", "rb") as f:
offset_map = pickle.load(f)
assert set(offset_map) == {"0", ""}
Loading