diff --git a/dflash/scripts/prefix_cache.py b/dflash/scripts/prefix_cache.py index 677da848d..288909d4a 100644 --- a/dflash/scripts/prefix_cache.py +++ b/dflash/scripts/prefix_cache.py @@ -484,6 +484,9 @@ def __init__(self, *, daemon_stdin, await_reply, daemon_lock, self.entries: OrderedDict[bytes, int] = OrderedDict() # hash → slot_id self.next_slot = 0 + # Cumulative hit counter, never decremented. Survives eviction — + # unlike a sum over surviving entries, which trends down under churn. + self._lifetime_hits = 0 try: self.markers = _resolve_chat_markers(tokenizer) except ValueError as e: @@ -537,10 +540,22 @@ def lookup(self, prompt_ids: list[int]) -> tuple[int, int] | None: best = (self.entries[key], cut) self.entries.move_to_end(key) # mark fresh if best is not None: + self._lifetime_hits += 1 print(f"{self.log_prefix} lookup hit slot={best[0]} prefix_len={best[1]} " f"(of {len(prompt_ids)} total)", flush=True) return best + def stats(self) -> dict: + """Snapshot for /props. Lockless: a mutation under daemon_lock can + tear in_use vs lifetime_hits; acceptable for an introspection report.""" + if self.disabled: + return {"capacity": 0, "in_use": 0, "lifetime_hits": 0} + return { + "capacity": self.cap, + "in_use": len(self.entries), + "lifetime_hits": self._lifetime_hits, + } + def mark_all_cleared(self) -> None: """Drop every LRU entry after the daemon emits ``[snap] all-cleared``. @@ -715,6 +730,11 @@ def init_full_cache(self, full_cap: int, # Pending eviction: the LRU entry reserved for the next confirm. self._full_pending_evict_key: bytes | None = None self._full_pending_evict_path: str | None = None + # Cumulative hit + disk-usage snapshots for /props. Recomputed on + # every cache mutation so the introspection endpoint never has to + # walk the filesystem. + self._full_lifetime_hits = 0 + self._full_disk_bytes_snapshot = 0 cache_dir_path = Path(cache_dir) if cache_dir else Path("/tmp/prefix") cache_dir_path.mkdir(parents=True, exist_ok=True) @@ -776,6 +796,36 @@ def _full_entry_artifact_size(self, key: bytes, entry: FullCacheEntry) -> int: continue return total + def _recompute_full_disk_bytes_snapshot(self) -> None: + """Refresh the disk-usage snapshot used by /props. Called from every + full-cache mutation site so reads don't have to stat the filesystem.""" + if getattr(self, "_full_disabled", True): + self._full_disk_bytes_snapshot = 0 + return + self._full_disk_bytes_snapshot = sum( + self._full_entry_artifact_size(k, e) + for k, e in self.full_entries.items() + ) + + def full_stats(self) -> dict: + """Snapshot for /props. Reads cached disk-usage; never walks the + filesystem on a /props request.""" + if getattr(self, "_full_disabled", True): + return { + "enabled": False, + "capacity": 0, + "in_use": 0, + "disk_bytes": 0, + "lifetime_hits": 0, + } + return { + "enabled": True, + "capacity": self._full_cap, + "in_use": len(self.full_entries), + "disk_bytes": self._full_disk_bytes_snapshot, + "lifetime_hits": self._full_lifetime_hits, + } + @staticmethod def _read_full_meta_int(meta: dict, key: str, *, default: int | None = None) -> int | None: value = meta.get(key, default) @@ -858,6 +908,7 @@ def _retire_full_entry(self, key: bytes, entry: FullCacheEntry, pass self._drop_full_metadata(key) self._recompute_full_next_slot() + self._recompute_full_disk_bytes_snapshot() def _enforce_full_budget(self, live_prompt_ids: list[int] | None = None) -> None: budget = int(getattr(self, "_full_budget_bytes", 0) or 0) @@ -992,6 +1043,7 @@ async def rehydrate_full_cache(self, replay_entry) -> int: break self._recompute_full_next_slot() + self._recompute_full_disk_bytes_snapshot() if restored: print(f"{self.log_prefix} full-cache restored {restored} entries " f"from disk", flush=True) @@ -1017,8 +1069,10 @@ def lookup_full(self, prompt_ids: list[int]) -> tuple[int, str, int] | None: if not Path(cur_bin_path).exists(): self.full_entries.pop(key, None) self._drop_full_metadata(key) + self._recompute_full_disk_bytes_snapshot() return None entry.hits += 1 + self._full_lifetime_hits += 1 entry.last_used_ns = time.time_ns() self.full_entries.move_to_end(key) # mark fresh in LRU self._persist_full_metadata(key, entry) @@ -1108,6 +1162,10 @@ def confirm_full_snap(self, slot: int, prompt_ids: list[int], self.full_entries[key] = entry self._persist_full_metadata(key, entry) self._enforce_full_budget(prompt_ids) + # _enforce_full_budget may call _retire_full_entry which refreshes + # the snapshot, but it bails early when budget==0. Refresh here + # unconditionally so the new entry's bytes are always reflected. + self._recompute_full_disk_bytes_snapshot() print(f"{self.log_prefix} full-cache committed slot={slot} " f"cur_ids_len={cur_ids_len} key={key.hex()[:8]}", flush=True) diff --git a/dflash/scripts/server.py b/dflash/scripts/server.py index 76c506c1f..eec2ec7ed 100644 --- a/dflash/scripts/server.py +++ b/dflash/scripts/server.py @@ -23,6 +23,7 @@ import sys import tempfile import time +import tomllib import uuid from pathlib import Path from typing import Any, AsyncIterator @@ -32,6 +33,7 @@ from fastapi import FastAPI, Request from fastapi.middleware.cors import CORSMiddleware # FIX 1: add CORS from fastapi.responses import JSONResponse, StreamingResponse +from fastapi.routing import APIRoute from pydantic import BaseModel from starlette.concurrency import iterate_in_threadpool from transformers import AutoTokenizer @@ -88,6 +90,177 @@ def _extra_daemon_has_target_sharding(extra: list[str] | None) -> bool: _ALLOWED_TEMPLATE_KWARGS = frozenset({"enable_thinking", "tools", "add_generation_prompt"}) +# ─── /props introspection ────────────────────────────────────────── +# Read-only GET /props endpoint returning a JSON snapshot of the live +# Python-server state (model arch, KV/FA config, pflash mode, cache +# occupancy, daemon liveness). Convention follows llama.cpp's /props. + +_PYPROJECT = Path(__file__).resolve().parent.parent / "pyproject.toml" + + +def _resolve_server_version() -> str: + # Source: dflash/pyproject.toml [project] version. importlib.metadata + # is intentionally not used — that project sets [tool.uv] package=false, + # so the package is never installed and the metadata lookup would + # always raise PackageNotFoundError and silently fall through. + try: + with _PYPROJECT.open("rb") as f: + return tomllib.load(f)["project"]["version"] + except (OSError, KeyError): + # File missing or shape unexpected — common case before the + # workspace pyproject is committed. Silent fallback. + return "0.0.0+unknown" + except tomllib.TOMLDecodeError as exc: + # File present but malformed — that's a real config bug worth + # surfacing, not a silent fallback case. + log.warning("could not parse %s for server version: %s", _PYPROJECT, exc) + return "0.0.0+unknown" + + +SERVER_VERSION = _resolve_server_version() + +PROPS_SCHEMA = 1 +# Bump only on breaking changes to /props output: +# - field renamed +# - field removed +# - existing field's semantics change (units, nullability, type) +# Do NOT bump for additive changes (new fields, new sections). + +_API_ENDPOINTS: list[str] = [ + "GET /health", + "GET /props", + "GET /v1/models", + "POST /v1/chat/completions", + "POST /v1/messages", + "POST /v1/responses", +] +# Keep in sync with the @app.{get,post} decorators in build_app(). +# Drift is caught by test_props_endpoints_match_app_routes. + + +def _capabilities(arch: str) -> dict: + """Arch-gated capability booleans for /props. + + Currently consumed only by /props. The Codex /v1/models variant has + its own hardcoded capability fields; wiring it through this helper + is a v2 follow-up (the Codex schema requires non-empty reasoning- + level fields, which would need validation against a real client + before gating them on arch="laguna"). + """ + qwen = arch in _QWEN35_ARCHES + return { + "reasoning_supported": qwen, + "speculative_supported": qwen, + "tools_supported": qwen, + } + + +def _effective_kv_type(axis: str, arch: str) -> str: + """KV K or V type as the C++ daemon actually resolves it. + + Mirrors dflash::resolve_kv_types() in src/kv_quant.cpp (qwen35) and the + laguna-specific path in test/test_dflash.cpp (laguna). Distinct from + _resolve_kv_k_type() — that function uses q8_0 as a stable hash salt for + the prefix cache; this one reports what the daemon allocated. + """ + if arch in _LAGUNA_ARCHES: + # laguna reads only DFLASH27B_KV_K; legacy shorthand and KV_V ignored. + kv = "q8_0" + if os.environ.get("DFLASH27B_KV_K"): + kv = os.environ["DFLASH27B_KV_K"].lower() + return kv + # qwen35: default Q4_0; legacy KV_F16/_KV_Q4/_KV_TQ3 last-wins; per-axis + # KV_K/_KV_V override legacy. + kv = "q4_0" + if os.environ.get("DFLASH27B_KV_F16", "0") != "0": + kv = "f16" + if os.environ.get("DFLASH27B_KV_Q4", "0") != "0": + kv = "q4_0" + if os.environ.get("DFLASH27B_KV_TQ3", "0") != "0": + kv = "tq3_0" + axis_env = f"DFLASH27B_KV_{axis.upper()}" + if os.environ.get(axis_env): + kv = os.environ[axis_env].lower() + return kv + + +def _parse_optional_float(env_name: str) -> float | None: + raw = os.environ.get(env_name) + if raw is None or raw == "": + return None + try: + return float(raw) + except ValueError: + log.warning("ignoring non-numeric %s=%r", env_name, raw) + return None + + +def _runtime_backend(bin_path: Path) -> str: + """Best-effort compute backend tag for /props. + + The Python server does not yet have a daemon build-info handshake, so read + the same CMake cache knob that selected the test_dflash backend. Operators + can override this for unusual deployments. + """ + for env_name in ("DFLASH_RUNTIME_BACKEND", "DFLASH27B_GPU_BACKEND"): + raw = os.environ.get(env_name) + if raw: + return raw.lower() + + candidates = [] + try: + bin_dir = Path(bin_path).resolve().parent + except OSError: + bin_dir = Path(bin_path).parent + candidates.append(bin_dir / "CMakeCache.txt") + candidates.append(ROOT / "build" / "CMakeCache.txt") + + seen: set[Path] = set() + for cache_path in candidates: + if cache_path in seen: + continue + seen.add(cache_path) + try: + text = cache_path.read_text(encoding="utf-8", errors="replace") + except OSError: + continue + match = re.search(r"^DFLASH27B_GPU_BACKEND:[^=]*=(\w+)$", text, re.MULTILINE) + if match: + return match.group(1).lower() + if re.search(r"^GGML_HIP:[^=]*=ON$", text, re.MULTILINE): + return "hip" + if re.search(r"^GGML_CUDA:[^=]*=ON$", text, re.MULTILINE): + return "cuda" + + return "cuda" + + +def _pflash_props(cfg: "PrefillConfig | None") -> dict: + if cfg is None or not cfg.enabled: + return { + "enabled": False, + "mode": "off", + "threshold": None, + "keep_ratio": None, + "drafter_gguf": None, + "skip_park": None, + "bsa_enabled": None, + "bsa_alpha": None, + "lm_head_fix": None, + } + return { + "enabled": True, + "mode": cfg.mode, + "threshold": cfg.threshold, + "keep_ratio": cfg.keep_ratio, + "drafter_gguf": str(cfg.drafter_gguf) if cfg.drafter_gguf else None, + "skip_park": bool(cfg.skip_park), + "bsa_enabled": os.environ.get("DFLASH_FP_USE_BSA", "0") != "0", + "bsa_alpha": _parse_optional_float("DFLASH_FP_ALPHA"), + "lm_head_fix": os.environ.get("DFLASH27B_LM_HEAD_FIX", "0") != "0", + } + + def resolve_draft(root: Path) -> Path: for st in root.rglob("model.safetensors"): return st @@ -781,6 +954,103 @@ def list_models(request: Request): }], } + # ── /props introspection ─────────────────────────────────────── + # Captures Python-server state only; daemon (C++ test_dflash) build + # identity is intentionally out of scope for v1. + + @app.get("/props") + def props(): + caps = _capabilities(arch) + # Cache reads are lockless: a mutation under daemon_lock can tear + # in_use vs lifetime_hits across stats(). Acceptable for /props. + prefix_stats = prefix_cache.stats() + full_stats = prefix_cache.full_stats() + tool_stats = tool_memory.stats() + tokenizer_id = getattr(tokenizer, "name_or_path", None) + if not isinstance(tokenizer_id, str): + tokenizer_id = None + server = { + "name": "luce-dflash", + "version": SERVER_VERSION, + "props_schema": PROPS_SCHEMA, + } + pflash_props = _pflash_props(prefill_cfg) + speculative_enabled = caps["speculative_supported"] + if pflash_props["enabled"]: + speculative_mode = "pflash" + elif speculative_enabled: + speculative_mode = "dflash" + else: + speculative_mode = "off" + reasoning_default = None + reasoning_efforts = ["medium"] if caps["reasoning_supported"] else [] + body = { + "default_generation_settings": { + "n_ctx": max_ctx, + "temperature": 0.0, + "top_p": 1.0, + "top_k": 0, + "min_p": 0.0, + "repeat_penalty": 1.0, + }, + "model_alias": MODEL_NAME, + "model_path": str(target), + "build_info": ( + f"{server['name']} v{server['version']} " + f"props_schema={server['props_schema']}" + ), + "speculative_mode": speculative_mode, + "server": server, + "model": { + "arch": arch, + "draft_path": str(draft) if draft is not None else None, + "tokenizer_id": tokenizer_id, + }, + "runtime": { + "backend": _runtime_backend(bin_path), + "fa_window": _fa_window, + "kv_cache_k": _effective_kv_type("k", arch), + "kv_cache_v": _effective_kv_type("v", arch), + "lazy_draft": bool(lazy_draft), + # extra_daemon_args is only forwarded to the qwen35 daemon + # spawn (see cmd.extend above); the laguna path discards it, + # so report sharding off there regardless of what was passed. + "target_sharding": ( + arch not in _LAGUNA_ARCHES + and _extra_daemon_has_target_sharding(extra_daemon_args) + ), + }, + "reasoning": { + "supported": caps["reasoning_supported"], + "default": reasoning_default, + "supported_efforts": reasoning_efforts, + }, + "speculative": { + "enabled": speculative_enabled, + "ddtree_budget": budget if speculative_enabled else None, + }, + "sampling": { + "capabilities": { + "supports_temperature": True, + "supports_top_p": True, + "supports_top_k": True, + "supports_frequency_penalty": True, + "supports_seed": True, + }, + }, + "pflash": pflash_props, + "prefix_cache": prefix_stats, + "full_cache": full_stats, + "tool_replay": tool_stats, + "daemon": { + "alive": daemon_proc.poll() is None, + }, + "api": { + "endpoints": list(_API_ENDPOINTS), + }, + } + return body + def _ids_to_bin(ids: list[int]) -> Path: fd, path = tempfile.mkstemp(suffix=".bin") with os.fdopen(fd, "wb") as f: diff --git a/dflash/scripts/test_server.py b/dflash/scripts/test_server.py index f685b7060..2ff908630 100644 --- a/dflash/scripts/test_server.py +++ b/dflash/scripts/test_server.py @@ -6,12 +6,16 @@ from unittest.mock import patch, MagicMock import pytest +from fastapi.routing import APIRoute from fastapi.testclient import TestClient from server import ( build_app, MODEL_NAME, parse_tool_calls, parse_reasoning, normalize_stop, first_stop_match, + PROPS_SCHEMA, SERVER_VERSION, + _API_ENDPOINTS, _capabilities, _effective_kv_type, + _resolve_server_version, _runtime_backend, ) @@ -939,3 +943,358 @@ def test_responses_instructions_and_developer_merged(mock_os_read, mock_pipe, assert len(system_msgs) == 1 assert "Top-level instructions." in system_msgs[0]["content"] assert "Developer context." in system_msgs[0]["content"] + + +# ─── GET /props ──────────────────────────────────────────────────── + +_PROPS_TOP_KEYS = { + "default_generation_settings", "model_alias", "model_path", "build_info", + "speculative_mode", "server", "model", "runtime", "reasoning", + "speculative", "sampling", "pflash", "prefix_cache", "full_cache", + "tool_replay", "daemon", "api", +} + + +def _build_props_app(mock_tokenizer, *, arch="qwen35", draft=Path("d.safetensors"), + prefill_cfg=None, extra_daemon_args=None, + prefix_cache_slots=4, prefill_cache_slots=4): + """Build an app with mocked daemon, tuned for /props tests.""" + mock_tokenizer.name_or_path = "Qwen/Qwen3.5-27B" + with patch("server.subprocess.Popen") as mock_popen: + mock_popen.return_value.poll.return_value = None + return build_app( + target=Path("target.gguf"), + draft=draft, + bin_path=Path("test_dflash"), + budget=22, + max_ctx=131072, + tokenizer=mock_tokenizer, + stop_ids={2}, + prefill_cfg=prefill_cfg, + drafter_tokenizer=mock_tokenizer if prefill_cfg else None, + prefix_cache_slots=prefix_cache_slots, + prefill_cache_slots=prefill_cache_slots, + arch=arch, + extra_daemon_args=extra_daemon_args, + ) + + +def test_props_endpoint_shape(client): + response = client.get("/props") + assert response.status_code == 200 + body = response.json() + assert set(body.keys()) == _PROPS_TOP_KEYS + assert body["server"]["props_schema"] == PROPS_SCHEMA + assert body["server"]["version"] == SERVER_VERSION + assert body["server"]["name"] == "luce-dflash" + assert body["model_alias"] == MODEL_NAME + assert body["model_path"] == "target.gguf" + assert body["build_info"] == f"luce-dflash v{SERVER_VERSION} props_schema={PROPS_SCHEMA}" + + +def test_props_llama_compat_fields(client): + body = client.get("/props").json() + assert body["default_generation_settings"] == { + "n_ctx": 131072, + "temperature": 0.0, + "top_p": 1.0, + "top_k": 0, + "min_p": 0.0, + "repeat_penalty": 1.0, + } + assert body["runtime"]["backend"] + assert body["speculative_mode"] == "dflash" + assert body["reasoning"] == { + "supported": True, + "default": None, + "supported_efforts": ["medium"], + } + assert body["sampling"] == { + "capabilities": { + "supports_temperature": True, + "supports_top_p": True, + "supports_top_k": True, + "supports_frequency_penalty": True, + "supports_seed": True, + }, + } + assert "max_ctx" not in body["runtime"] + assert "id" not in body["model"] + assert "target_path" not in body["model"] + assert all(not key.startswith("supports_") for key in body["sampling"]) + assert "default_enabled" not in body["reasoning"] + + +def test_props_version_reads_pyproject(): + # Best effort: when pyproject is reachable the value is the [project] + # version. We don't assert a literal because that drifts with releases. + version = _resolve_server_version() + assert isinstance(version, str) + assert version + if version != "0.0.0+unknown": + # pyproject was readable — version should look like semver-ish. + assert version[0].isdigit() + + +def test_props_version_falls_back_when_pyproject_missing(tmp_path, monkeypatch): + monkeypatch.setattr("server._PYPROJECT", tmp_path / "does_not_exist.toml") + assert _resolve_server_version() == "0.0.0+unknown" + + +def test_runtime_backend_prefers_env(monkeypatch): + monkeypatch.setenv("DFLASH_RUNTIME_BACKEND", "HIP") + assert _runtime_backend(Path("missing")) == "hip" + + +def test_runtime_backend_reads_cmake_cache(tmp_path, monkeypatch): + monkeypatch.delenv("DFLASH_RUNTIME_BACKEND", raising=False) + monkeypatch.delenv("DFLASH27B_GPU_BACKEND", raising=False) + build_dir = tmp_path / "build" + build_dir.mkdir() + (build_dir / "CMakeCache.txt").write_text( + "DFLASH27B_GPU_BACKEND:STRING=hip\n", + encoding="utf-8", + ) + assert _runtime_backend(build_dir / "test_dflash") == "hip" + + +def test_props_arch_qwen35(client): + body = client.get("/props").json() + assert body["model"]["arch"] == "qwen35" + assert body["model"]["draft_path"] is not None + assert body["reasoning"]["supported"] is True + assert body["speculative"]["enabled"] is True + assert body["speculative"]["ddtree_budget"] == 22 + + +def test_props_arch_laguna(mock_tokenizer): + app = _build_props_app(mock_tokenizer, arch="laguna", draft=None) + body = TestClient(app).get("/props").json() + assert body["model"]["arch"] == "laguna" + assert body["model"]["draft_path"] is None + assert body["reasoning"] == { + "supported": False, + "default": None, + "supported_efforts": [], + } + assert body["speculative"]["enabled"] is False + assert body["speculative"]["ddtree_budget"] is None + assert body["speculative_mode"] == "off" + + +def test_props_pflash_disabled(client): + body = client.get("/props").json() + p = body["pflash"] + assert p["enabled"] is False + assert p["mode"] == "off" + for k in ("threshold", "keep_ratio", "drafter_gguf", + "bsa_enabled", "bsa_alpha", "lm_head_fix"): + assert p[k] is None, f"expected null pflash.{k} when disabled" + + +def test_props_pflash_enabled(mock_tokenizer, monkeypatch): + from _prefill_hook import PrefillConfig + cfg = PrefillConfig( + mode="auto", + threshold=32000, + keep_ratio=0.05, + drafter_gguf=Path("/tmp/drafter.gguf"), + drafter_tokenizer_id="Qwen/Qwen3-0.6B", + skip_park=False, + ) + monkeypatch.setenv("DFLASH_FP_USE_BSA", "1") + monkeypatch.setenv("DFLASH_FP_ALPHA", "0.85") + app = _build_props_app(mock_tokenizer, prefill_cfg=cfg) + body = TestClient(app).get("/props").json() + p = body["pflash"] + assert p["enabled"] is True + assert p["mode"] == "auto" + assert p["threshold"] == 32000 + assert p["keep_ratio"] == pytest.approx(0.05) + assert p["drafter_gguf"] == "/tmp/drafter.gguf" + assert p["bsa_enabled"] is True + assert p["bsa_alpha"] == pytest.approx(0.85) + assert body["speculative_mode"] == "pflash" + + +def test_props_target_sharding_disables_caches(mock_tokenizer): + app = _build_props_app( + mock_tokenizer, + extra_daemon_args=["--target-gpus=0,1"], + ) + body = TestClient(app).get("/props").json() + assert body["runtime"]["target_sharding"] is True + assert body["prefix_cache"]["capacity"] == 0 + assert body["full_cache"]["enabled"] is False + + +def test_props_target_sharding_false_on_laguna_even_when_args_passed(mock_tokenizer): + """Laguna's daemon-spawn path discards extra_daemon_args (see server.py + ~line 788 — only the qwen35 branch calls cmd.extend). /props must not + claim sharding is active when the flag was silently dropped.""" + app = _build_props_app( + mock_tokenizer, + arch="laguna", + draft=None, + extra_daemon_args=["--target-gpus=0,1"], + ) + body = TestClient(app).get("/props").json() + assert body["runtime"]["target_sharding"] is False + + +def test_props_endpoints_match_app_routes(client, app): + declared = set(client.get("/props").json()["api"]["endpoints"]) + actual: set[str] = set() + for r in app.routes: + if not isinstance(r, APIRoute): + continue + for m in r.methods: + if m in {"GET", "POST"}: + actual.add(f"{m} {r.path}") + # FastAPI auto-routes we don't care to advertise. + actual -= { + "GET /openapi.json", "GET /docs", "GET /redoc", + "GET /docs/oauth2-redirect", + } + assert declared == actual + + +def test_capabilities_arch_gated(): + assert _capabilities("qwen35") == { + "reasoning_supported": True, + "speculative_supported": True, + "tools_supported": True, + } + assert _capabilities("laguna") == { + "reasoning_supported": False, + "speculative_supported": False, + "tools_supported": False, + } + + +def test_effective_kv_type_qwen35_defaults(monkeypatch): + for env in ("DFLASH27B_KV_F16", "DFLASH27B_KV_Q4", "DFLASH27B_KV_TQ3", + "DFLASH27B_KV_K", "DFLASH27B_KV_V"): + monkeypatch.delenv(env, raising=False) + # qwen35 default per dflash/src/kv_quant.cpp:160 is q4_0 for both axes. + assert _effective_kv_type("k", "qwen35") == "q4_0" + assert _effective_kv_type("v", "qwen35") == "q4_0" + + +def test_effective_kv_type_qwen35_per_axis_override(monkeypatch): + monkeypatch.setenv("DFLASH27B_KV_TQ3", "1") + monkeypatch.setenv("DFLASH27B_KV_V", "q4_0") + monkeypatch.delenv("DFLASH27B_KV_K", raising=False) + assert _effective_kv_type("k", "qwen35") == "tq3_0" + assert _effective_kv_type("v", "qwen35") == "q4_0" + + +def test_effective_kv_type_laguna_ignores_legacy_and_v(monkeypatch): + # Laguna only reads DFLASH27B_KV_K and applies it to both axes. + monkeypatch.setenv("DFLASH27B_KV_TQ3", "1") + monkeypatch.setenv("DFLASH27B_KV_V", "q4_0") + monkeypatch.delenv("DFLASH27B_KV_K", raising=False) + assert _effective_kv_type("k", "laguna") == "q8_0" + assert _effective_kv_type("v", "laguna") == "q8_0" + monkeypatch.setenv("DFLASH27B_KV_K", "q4_0") + assert _effective_kv_type("k", "laguna") == "q4_0" + assert _effective_kv_type("v", "laguna") == "q4_0" + + +def test_prefix_cache_stats_disabled(): + from prefix_cache import PrefixCache + pc = PrefixCache.__new__(PrefixCache) + pc.disabled = True + pc.cap = 0 + assert pc.stats() == {"capacity": 0, "in_use": 0, "lifetime_hits": 0} + + +def test_prefix_cache_lifetime_hits_increments(mock_tokenizer): + """Driving lookup() across an eviction must still see the lifetime + counter — it is NOT a sum over surviving entries.""" + from prefix_cache import PrefixCache, hash_prefix + pc = PrefixCache( + daemon_stdin=MagicMock(), + await_reply=lambda *_a, **_k: None, + daemon_lock=MagicMock(), + tokenizer=mock_tokenizer, + kv_k_type="q8_0", + fa_window=2048, + cap=2, + ) + # Force-disable boundary detection so we can register synthetic entries. + pc.markers = {"family": "manual", "sys_role_prefix": (), + "end_msg_seqs": [], "next_role_starts": []} + # Inject two entries directly and hit each twice. + ids_a = [1, 2, 3, 4] + ids_b = [5, 6, 7, 8] + key_a = hash_prefix(ids_a, pc.kv_k_type, pc.fa_window) + key_b = hash_prefix(ids_b, pc.kv_k_type, pc.fa_window) + pc.entries[key_a] = 0 + pc.entries[key_b] = 1 + # Stub out the boundary detector to return both candidate cuts. + pc._all_boundaries = lambda ids: [len(ids)] + pc.lookup(ids_a) + pc.lookup(ids_a) + pc.lookup(ids_b) + assert pc.stats()["lifetime_hits"] == 3 + # Evict everything; counter persists. + pc.entries.clear() + assert pc.stats()["in_use"] == 0 + assert pc.stats()["lifetime_hits"] == 3 + + +def test_full_cache_disk_bytes_snapshot_updates_on_mutation(mock_tokenizer, tmp_path): + """confirm_full_snap and _retire_full_entry must refresh the snapshot + so /props never has to walk the filesystem.""" + from prefix_cache import PrefixCache + pc = PrefixCache( + daemon_stdin=MagicMock(), + await_reply=lambda *_a, **_k: None, + daemon_lock=MagicMock(), + tokenizer=mock_tokenizer, + kv_k_type="q8_0", + fa_window=2048, + cap=1, + ) + pc.markers = {"family": "manual", "sys_role_prefix": (), + "end_msg_seqs": [], "next_role_starts": []} + pc.init_full_cache(full_cap=2, cache_dir=str(tmp_path)) + assert pc.full_stats()["disk_bytes"] == 0 + + # Source bin file the daemon would have written. + src = tmp_path / "cur.bin" + src.write_bytes(b"\x01\x00\x00\x00" * 16) + + ids = [10, 20, 30] + slot, _ = pc.prepare_full_snap(ids) + pc.confirm_full_snap(slot, ids, src, cur_ids_len=16) + snap = pc.full_stats() + assert snap["enabled"] is True + assert snap["in_use"] == 1 + assert snap["disk_bytes"] > 0 + after_add = snap["disk_bytes"] + + # Direct retire of the entry should refresh the snapshot to 0. + (key, entry), = pc.full_entries.items() + pc._retire_full_entry(key, entry, remove_files=True) + after_evict = pc.full_stats() + assert after_evict["in_use"] == 0 + assert after_evict["disk_bytes"] == 0 + assert after_evict["disk_bytes"] < after_add + + +def test_tool_memory_stats(): + from tool_memory import ToolMemory + tm = ToolMemory(max_entries=100, max_bytes=4096) + s = tm.stats() + assert s == { + "max_entries": 100, + "max_bytes": 4096, + "current_entries": 0, + "current_bytes": 0, + } + tm.remember(["call_a"], "hello") + s = tm.stats() + assert s["current_entries"] == 1 + assert s["current_bytes"] == len(b"hello") diff --git a/dflash/scripts/tool_memory.py b/dflash/scripts/tool_memory.py index 58bb4a837..bc62a0164 100644 --- a/dflash/scripts/tool_memory.py +++ b/dflash/scripts/tool_memory.py @@ -122,3 +122,15 @@ def _extract_call_id(item: Any) -> str | None: return call_id if isinstance(call_id, str) and call_id else None call_id = getattr(item, "id", None) return call_id if isinstance(call_id, str) and call_id else None + + def stats(self) -> dict: + # Two successive reads under the GIL. A mutation between them can + # tear the entries/bytes pair; for /props introspection the report + # is allowed to be off by one entry, which is preferable to taking + # a lock the rest of this class doesn't carry. + return { + "max_entries": self.max_entries, + "max_bytes": self.max_bytes, + "current_entries": len(self.by_id), + "current_bytes": self.total_bytes, + }