diff --git a/AGENTS.md b/AGENTS.md index fcb0998..5007c9d 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -28,7 +28,7 @@ cargo clippy --all-targets -- -D warnings Carryover is active. Before responding: -1. Read `.carryover/handoff.md` for the prior session summary. -2. Summarize it back to the user in 1-2 sentences. -3. Ask what they want to do next — do not assume continuation. +1. Read `.carryover/handoff.md` for the full session context and progress log. +2. Find the `## What to do next` section and read it aloud to the user in 1-2 sentences. +3. Ask the user if they want to continue from there or do something different — do not assume continuation. diff --git a/CHANGELOG.md b/CHANGELOG.md index 5da3a04..2c666c3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,23 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html) once v0.1.0 ships. +## 0.1.4 — 2026-04-30 + +### Fixed + +- **Cursor adapter rewritten for new schema.** Cursor migrated conversation data from `globalStorage/state.vscdb` to per-workspace `state.vscdb` files. The adapter now reads `composer.composerHeaders` from the global DB and `aiService.prompts` from each workspace DB. Old-schema fallback preserved for pre-migration installs. +- **Codex adapter rewritten for new schema.** New format stores user/assistant text in `event_msg.payload.{type, message}` with the session_id only in the leading `session_meta` line. Adapter now captures both, plus reads `~/.codex/history.jsonl` for real-time user prompts before the rollout file is flushed. +- **Project-dir routing.** Cursor and Codex cursors persist `project_dir` so fs-watcher events route the handoff to the correct project directory. Watched paths now include `~/.config/Cursor/User/workspaceStorage` and `~/.codex/history.jsonl`. +- **Progress log accumulates across all sessions.** No longer wipes content on session change — never resets, only appends. Entries deduped by full-line content. +- **Skip-on-error in Claude/Codex adapters.** A single corrupt JSON line no longer blocks the entire transcript ingest. + +### Added + +- **Task / Next action accumulation.** Both sections now keep a timestamped history (newest first) instead of overwriting on each ingest. Dedupe checks the full list, not just the latest entry. +- **`## Session activity` section.** Captures concrete file changes via `git diff --stat HEAD` (git projects) or recent file mtimes (non-git). Fills the gap for Cursor where AI responses aren't stored locally. Preserved across ingests when the recent-window scan is empty. +- **Strict response rules in handoff preamble.** AI agents are instructed to read silently, not narrate the file structure, not recap files the user already knows, not report empty fields, and reply in two short paragraphs in the user's tone. Preamble auto-refreshes on every daemon start so rule updates propagate immediately. +- **Fallback for empty `Next action`.** When no assistant text is captured (Cursor case), falls back to `"Continue: "` so the section is never empty. + ## 0.1.3 — 2026-04-28 ### Fixed diff --git a/CLAUDE.md b/CLAUDE.md index 7acd4c2..c728a1b 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -1,6 +1,6 @@ Carryover is active. Before responding: -1. Read `.carryover/handoff.md` for the prior session summary. -2. Summarize it back to the user in 1-2 sentences. -3. Ask what they want to do next — do not assume continuation. +1. Read `.carryover/handoff.md` for the full session context and progress log. +2. Find the `## What to do next` section and read it aloud to the user in 1-2 sentences. +3. Ask the user if they want to continue from there or do something different — do not assume continuation. diff --git a/Cargo.lock b/Cargo.lock index d0d167b..8ceddb6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -192,7 +192,7 @@ checksum = "1e748733b7cbc798e1434b6ac524f0c1ff2ab456fe201501e6497c8417a4fc33" [[package]] name = "carryover" -version = "0.1.3" +version = "0.1.4" dependencies = [ "anyhow", "assert_cmd", diff --git a/Cargo.toml b/Cargo.toml index 433aa90..5924c81 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "carryover" -version = "0.1.3" +version = "0.1.4" edition = "2021" license = "Apache-2.0" description = "Zero-LLM-token context-handoff daemon — resume any AI session across Claude Code, Cursor, and Codex." diff --git a/README.md b/README.md index a1e488e..e686469 100644 --- a/README.md +++ b/README.md @@ -101,6 +101,8 @@ carryoverd uninstall # remove hooks; ledger preserved by default (--purge to w ``` > **Tip — resuming a session:** Carryover writes a pointer block into your project's `AGENTS.md` and `CLAUDE.md`. The AI reads it automatically at the start of a new conversation. To trigger it, open a new session and say something like **"let's start"**, **"continue"**, or **"what's next"** — that's enough for Claude Code, Cursor, or Codex to read the handoff and ask where you left off. Just saying "hi" won't trigger it. +> +> **Tip — already in a chat:** A long-running Cursor/Claude/Codex window won't re-read the pointer mid-conversation. Carryover keeps writing fresh updates to `.carryover/handoff.md` as you work, but the AI in that window doesn't see them until you tell it to. To pull in the latest state, just say **"read .carryover/handoff.md"** or **"check the carryover handoff for context"** — the AI will load the file and pick up everything that happened since the chat started, including work from your other tools. ## What works in v0.1 diff --git a/npm-package/package.json b/npm-package/package.json index e3b4a8d..9e62ca4 100644 --- a/npm-package/package.json +++ b/npm-package/package.json @@ -1,6 +1,6 @@ { "name": "carryover", - "version": "0.1.3", + "version": "0.1.4", "description": "Zero-LLM-token context-handoff daemon — resume any AI session across Claude Code, Cursor, and Codex.", "homepage": "https://github.com/carryover-dev/carryover", "repository": { diff --git a/src/adapters/claude.rs b/src/adapters/claude.rs index 005f816..74a5a0f 100644 --- a/src/adapters/claude.rs +++ b/src/adapters/claude.rs @@ -179,17 +179,16 @@ impl Adapter for ClaudeAdapter { } /// Parse raw records into LedgerRow. Skips non-conversation types. - /// Fail-fast: first malformed record aborts the batch. + /// Skip-on-error: malformed records (corrupt JSON lines) are silently + /// skipped so a single bad line doesn't block the entire transcript. fn parse(&self, records: Vec) -> Result, AdapterError> { let mut rows = Vec::with_capacity(records.len()); for rec in records { - let v: serde_json::Value = - serde_json::from_slice(&rec.payload).map_err(|e| AdapterError::Parse { - offset: rec.offset, - context: "invalid JSON in transcript line", - source: e, - })?; + let v: serde_json::Value = match serde_json::from_slice(&rec.payload) { + Ok(v) => v, + Err(_) => continue, // skip corrupt JSON line + }; // Filter housekeeping rows. let row_type = v.get("type").and_then(|t| t.as_str()).unwrap_or(""); diff --git a/src/adapters/codex.rs b/src/adapters/codex.rs index 1b33a73..3f4cf2a 100644 --- a/src/adapters/codex.rs +++ b/src/adapters/codex.rs @@ -28,12 +28,22 @@ const MAX_READ_BYTES_PER_POLL: u64 = 64 * 1024 * 1024; /// Read-position cursor for a single Codex transcript file. #[derive(Clone, Debug, Default, Serialize, Deserialize)] pub struct CodexCursor { - /// Path to the JSONL transcript file. + /// Path to the JSONL transcript file (rollout file). pub file_path: PathBuf, - /// Byte offset of the first byte NOT yet consumed (exclusive lower bound). + /// Byte offset of the first byte NOT yet consumed in the rollout file. pub byte_offset: u64, /// Highest `seq` value seen in `event_msg` rows so far. pub last_event_seq: i64, + /// Byte offset of the first byte NOT yet consumed in `~/.codex/history.jsonl`. + /// Codex appends user prompts here in real-time, before the rollout file + /// gets flushed — reading this catches prompts the user just submitted. + #[serde(default)] + pub history_offset: u64, + /// Project directory of the active Codex session. Read from the rollout + /// file's `session_meta.payload.cwd` so fs-watcher events can route the + /// handoff back to the correct project. + #[serde(default)] + pub project_dir: Option, } // --------------------------------------------------------------------------- @@ -115,6 +125,8 @@ impl Adapter for CodexAdapter { file_path: file_path.clone(), byte_offset: 0, last_event_seq: since.last_event_seq, + history_offset: since.history_offset, + project_dir: since.project_dir.clone(), }, )); } @@ -140,31 +152,85 @@ impl Adapter for CodexAdapter { let mut last_event_seq = since.last_event_seq; - let records: Vec = line_records - .into_iter() - .map(|(end_offset, bytes)| { - // Track last_event_seq from event_msg rows as they come in. - if let Ok(v) = serde_json::from_slice::(&bytes) { - if v.get("type").and_then(|t| t.as_str()) == Some("event_msg") { - if let Some(seq) = v.get("seq").and_then(|s| s.as_i64()) { - if seq > last_event_seq { - last_event_seq = seq; - } + // Codex transcripts begin with a session_meta line that carries the + // session_id. When reading incrementally (offset > 0) we skip past it + // and parse() loses session context. Inject a synthetic session_meta + // record at the top of the batch ONLY when there are real records to + // process — never inject for an empty batch (preserves idempotency). + let mut records: Vec = Vec::with_capacity(line_records.len() + 1); + if since.byte_offset > 0 && !line_records.is_empty() { + if let Some(meta_line) = peek_first_line(file_path) { + records.push(RawRecord { + tool: self.name().to_string(), + payload: meta_line.into_bytes(), + offset: 0, + }); + } + } + + records.extend(line_records.into_iter().map(|(end_offset, bytes)| { + // Track last_event_seq from event_msg rows as they come in. + if let Ok(v) = serde_json::from_slice::(&bytes) { + if v.get("type").and_then(|t| t.as_str()) == Some("event_msg") { + if let Some(seq) = v.get("seq").and_then(|s| s.as_i64()) { + if seq > last_event_seq { + last_event_seq = seq; } } } - RawRecord { - tool: self.name().to_string(), - payload: bytes, - offset: end_offset, + } + RawRecord { + tool: self.name().to_string(), + payload: bytes, + offset: end_offset, + } + })); + + // Also read ~/.codex/history.jsonl for real-time user prompts. Codex + // appends to this file on every prompt submission, BEFORE the rollout + // file is flushed. Without this, recent prompts are missed. + // Skip in tests: when sessions_root is overridden, the adapter is + // pointing at a fixture and shouldn't read the dev machine's history. + let mut new_history_offset = since.history_offset; + if self.sessions_root.is_none() { + if let Some(home) = dirs::home_dir() { + let history_path = home.join(".codex").join("history.jsonl"); + if history_path.exists() { + if let Ok((hist_lines, advanced)) = + read_complete_lines(&history_path, since.history_offset, self.name()) + { + new_history_offset = advanced; + for (end_offset, bytes) in hist_lines { + // Wrap in a marker so parse() knows it's history-format. + // Format: {"_codex_history": true, "line": } + let mut wrapped = b"{\"_codex_history\":true,\"line\":".to_vec(); + // Escape the original bytes as JSON string content. + let original = String::from_utf8_lossy(&bytes); + let escaped = serde_json::to_string(&original.trim()) + .unwrap_or_else(|_| "\"\"".to_string()); + wrapped.extend_from_slice(escaped.as_bytes()); + wrapped.push(b'}'); + records.push(RawRecord { + tool: self.name().to_string(), + payload: wrapped, + offset: end_offset, + }); + } + } } - }) - .collect(); + } + } + + // Peek the rollout's session_meta line for cwd → routes handoff to + // the correct project on subsequent fs-watcher events. + let new_project_dir = peek_session_cwd(file_path).or(since.project_dir.clone()); let advanced = CodexCursor { file_path: file_path.clone(), byte_offset: new_offset, last_event_seq, + history_offset: new_history_offset, + project_dir: new_project_dir, }; Ok((records, advanced)) @@ -183,71 +249,149 @@ impl Adapter for CodexAdapter { /// Missing session_meta is recorded in the daemon event log (v0.2). fn parse(&self, records: Vec) -> Result, AdapterError> { let mut rows = Vec::with_capacity(records.len()); + // Captured from session_meta.payload.id; applied to subsequent event_msg + // lines that don't carry session_id at the top level (new Codex schema). + let mut current_session_id: String = String::new(); for rec in records { - let v: serde_json::Value = - serde_json::from_slice(&rec.payload).map_err(|e| AdapterError::Parse { - offset: rec.offset, - context: "invalid JSON in transcript line", - source: e, - })?; + let v: serde_json::Value = match serde_json::from_slice(&rec.payload) { + Ok(v) => v, + Err(_) => continue, // skip corrupt JSON line + }; + + // History-format wrapper: {"_codex_history": true, "line": ""} + // Each line is `{session_id, ts, text}` from ~/.codex/history.jsonl. + if v.get("_codex_history").and_then(|b| b.as_bool()) == Some(true) { + let inner_str = v.get("line").and_then(|l| l.as_str()).unwrap_or(""); + if inner_str.is_empty() { + continue; + } + let inner: serde_json::Value = match serde_json::from_str(inner_str) { + Ok(x) => x, + Err(_) => continue, + }; + let sid = inner + .get("session_id") + .and_then(|s| s.as_str()) + .unwrap_or("") + .to_string(); + let ts_secs = inner.get("ts").and_then(|t| t.as_i64()).unwrap_or(0); + let text = inner + .get("text") + .and_then(|t| t.as_str()) + .unwrap_or("") + .to_string(); + if sid.is_empty() || text.is_empty() { + continue; + } + rows.push(LedgerRow { + session_id: sid, + tool: "codex".to_string(), + ts: ts_secs * 1000, // history.jsonl uses unix seconds + role: "user".to_string(), + content: text, + tool_calls_json: None, + files_touched_json: None, + parent_id: None, + }); + continue; + } let row_type = v.get("type").and_then(|t| t.as_str()).unwrap_or(""); match row_type { "session_meta" => { - // Consumed for context; does not produce a LedgerRow. - // session_id is carried per-event_msg for robustness. + // New schema: payload.id is the session id; old schema may put + // it at top-level "session_id". + if let Some(sid) = v + .get("payload") + .and_then(|p| p.get("id")) + .and_then(|i| i.as_str()) + .or_else(|| v.get("session_id").and_then(|s| s.as_str())) + { + current_session_id = sid.to_string(); + } continue; } "event_msg" => { - // Required: session_id - let session_id = v - .get("session_id") - .and_then(|s| s.as_str()) - .ok_or_else(|| AdapterError::Parse { - offset: rec.offset, - context: "missing session_id field", - source: make_missing_field_error(), - })? - .to_string(); - - // Required: ts (unix epoch ms as i64) - let ts = parse_timestamp(v.get("ts"), rec.offset)?; - - // Required: role - let role = v - .get("role") - .and_then(|r| r.as_str()) - .ok_or_else(|| AdapterError::Parse { - offset: rec.offset, - context: "missing role field", - source: make_missing_field_error(), - })? - .to_string(); - - // content: string or array (Codex always uses strings in v0.1) - let content_val = v.get("content"); - let content = match content_val { + // Try new schema first: payload.{type, message} + let payload = v.get("payload"); + let payload_type = payload + .and_then(|p| p.get("type")) + .and_then(|t| t.as_str()) + .unwrap_or(""); + let role_from_payload = match payload_type { + "user_message" => Some("user"), + "agent_message" => Some("assistant"), + _ => None, + }; + + let session_id_top = v.get("session_id").and_then(|s| s.as_str()).unwrap_or(""); + + if let Some(role) = role_from_payload { + // New schema path + let session_id = if !session_id_top.is_empty() { + session_id_top.to_string() + } else if !current_session_id.is_empty() { + current_session_id.clone() + } else { + continue; // can't tag a row without a session + }; + let content = payload + .and_then(|p| p.get("message")) + .and_then(|m| m.as_str()) + .unwrap_or("") + .to_string(); + if content.is_empty() { + continue; + } + let ts = match parse_iso_timestamp(v.get("timestamp")) { + Some(t) => t, + None => match parse_timestamp(v.get("ts"), rec.offset) { + Ok(t) => t, + Err(_) => continue, + }, + }; + rows.push(LedgerRow { + session_id, + tool: "codex".to_string(), + ts, + role: role.to_string(), + content, + tool_calls_json: None, + files_touched_json: None, + parent_id: None, + }); + continue; + } + + // Old schema fallback: top-level role/content/session_id + let session_id = if !session_id_top.is_empty() { + session_id_top.to_string() + } else if !current_session_id.is_empty() { + current_session_id.clone() + } else { + continue; + }; + let ts = match parse_timestamp(v.get("ts"), rec.offset) { + Ok(t) => t, + Err(_) => continue, + }; + let role = match v.get("role").and_then(|r| r.as_str()) { + Some(r) => r.to_string(), + None => continue, + }; + let content = match v.get("content") { None => String::new(), Some(serde_json::Value::String(s)) => s.clone(), - Some(arr) => { - serde_json::to_string(arr).map_err(|e| AdapterError::Parse { - offset: rec.offset, - context: "failed to serialize content array", - source: e, - })? - } + Some(arr) => match serde_json::to_string(arr) { + Ok(s) => s, + Err(_) => continue, + }, }; - let tool_calls_val = v.get("tool_calls"); let tool_calls_json = extract_tool_calls(tool_calls_val); let files_touched_json = extract_files_touched(tool_calls_val); - - // Codex v0.1 fixtures don't expose a parent chain. - // A future PR can extend this when Codex surfaces parent ids. - let parent_id: Option = None; - rows.push(LedgerRow { session_id, tool: "codex".to_string(), @@ -256,13 +400,10 @@ impl Adapter for CodexAdapter { content, tool_calls_json, files_touched_json, - parent_id, + parent_id: None, }); } - _ => { - // Unknown record type — silently skip for forward compatibility. - continue; - } + _ => continue, } } @@ -356,6 +497,47 @@ fn parse_timestamp(val: Option<&serde_json::Value>, offset: u64) -> Result Option { + use std::io::{BufRead, BufReader}; + let f = std::fs::File::open(path).ok()?; + let mut reader = BufReader::new(f); + let mut line = String::new(); + reader.read_line(&mut line).ok()?; + let trimmed = line.trim(); + if trimmed.is_empty() { + None + } else { + Some(trimmed.to_string()) + } +} + +/// Read the first line of a Codex rollout file and extract `payload.cwd` +/// from `session_meta`. Returns None if the file is missing/empty/malformed. +fn peek_session_cwd(path: &Path) -> Option { + use std::io::{BufRead, BufReader}; + let f = std::fs::File::open(path).ok()?; + let mut reader = BufReader::new(f); + let mut line = String::new(); + reader.read_line(&mut line).ok()?; + let v: serde_json::Value = serde_json::from_str(line.trim()).ok()?; + if v.get("type").and_then(|t| t.as_str()) != Some("session_meta") { + return None; + } + v.get("payload") + .and_then(|p| p.get("cwd")) + .and_then(|c| c.as_str()) + .map(String::from) +} + +/// Parse an ISO-8601 timestamp string (e.g. "2026-04-29T12:11:52.589Z") into +/// unix epoch milliseconds. Returns None if the value is missing or unparseable. +fn parse_iso_timestamp(val: Option<&serde_json::Value>) -> Option { + let s = val.and_then(|v| v.as_str())?; + let dt = chrono::DateTime::parse_from_rfc3339(s).ok()?; + Some(dt.timestamp_millis()) +} + /// Extract `tool_calls` array if present, returning JSON-serialized form. fn extract_tool_calls(tool_calls: Option<&serde_json::Value>) -> Option { let arr = tool_calls?.as_array()?; @@ -420,6 +602,8 @@ mod tests { file_path: fixture_path(name), byte_offset: 0, last_event_seq: 0, + history_offset: 0, + project_dir: None, } } @@ -575,6 +759,8 @@ mod tests { file_path: PathBuf::from("/etc/hostname"), byte_offset: 0, last_event_seq: 0, + history_offset: 0, + project_dir: None, }; let err = a.read_new_records(&bad_cursor).unwrap_err(); assert!( @@ -616,6 +802,8 @@ mod tests { file_path: file_path.clone(), byte_offset: 0, last_event_seq: 0, + history_offset: 0, + project_dir: None, }; // Because the cap truncates at the boundary, the partial tail beyond // the cap (or the actual partial line within the capped window) must diff --git a/src/adapters/cursor.rs b/src/adapters/cursor.rs index 3b80dc5..9586d8a 100644 --- a/src/adapters/cursor.rs +++ b/src/adapters/cursor.rs @@ -1,43 +1,78 @@ //! Cursor (the AI editor) transcript adapter. //! -//! Reads `~/.config/Cursor/User/globalStorage/state.vscdb` (Linux) or -//! `~/Library/Application Support/Cursor/User/globalStorage/state.vscdb` (macOS) -//! and parses the AI conversation rows into `LedgerRow`. +//! Reads conversation data from two Cursor storage locations: +//! - Global DB: `~/.config/Cursor/User/globalStorage/state.vscdb` (Linux) +//! `~/Library/Application Support/Cursor/User/globalStorage/state.vscdb` (macOS) +//! - Per-workspace DBs: `workspaceStorage//state.vscdb` (sibling of globalStorage) //! -//! When the live `state.vscdb` is locked (Cursor is running), opening it -//! directly may return `SQLITE_BUSY`. The adapter falls back to copying -//! `state.vscdb` plus its `-wal`/`-shm` sidecars into a tempdir and reading the -//! copy. The copy is auto-cleaned on tempdir drop. +//! Cursor migrated its storage schema. The adapter auto-detects which is in use: +//! - New schema: `composer.composerHeaders` in global DB + per-workspace prompt arrays +//! - Old schema: `aiService.generations`, `aiService.prompts`, `composer.composerData` +//! all in the global DB +//! +//! When the live DB is locked (Cursor is running), opening it directly may return +//! `SQLITE_BUSY`. The adapter falls back to copying the DB + its WAL/SHM sidecars +//! into a tempdir and reading from the copy. use crate::adapters::{Adapter, AdapterError, RawRecord}; use crate::storage::LedgerRow; use rusqlite::{Connection, OpenFlags, OptionalExtension}; use serde::{Deserialize, Serialize}; +use std::collections::HashMap; use std::path::{Path, PathBuf}; use std::time::Duration; // --------------------------------------------------------------------------- -// Read cap +// Constants // --------------------------------------------------------------------------- -/// Per-poll read cap for a single ItemTable value. Same rationale as the -/// Claude adapter: prevents OOM if a transcript value grows unbounded. +/// Per-poll read cap for a single ItemTable value. const MAX_BYTES_PER_VALUE: usize = 64 * 1024 * 1024; +/// Maximum workspace DBs to scan per poll to bound I/O on large installs. +const MAX_WORKSPACES_PER_POLL: usize = 20; + // --------------------------------------------------------------------------- // Cursor (bookmark) // --------------------------------------------------------------------------- -/// Read-position bookmark for a single Cursor `state.vscdb` file. +/// Read-position bookmark for the Cursor adapter. +/// +/// Supports both the new schema (seen_prompts per workspace) and the old +/// schema (last_msg_id string watermark). Old stored cursors that have +/// `db_path`/`last_rowid` fields deserialize cleanly because serde ignores +/// unknown fields by default. #[derive(Clone, Debug, Default, Serialize, Deserialize)] pub struct CursorCursor { - /// Path to the live `state.vscdb` on the user's machine. - pub db_path: PathBuf, - /// Monotonic record count consumed so far. Advanced by `records.len()` on - /// each successful read. - pub last_rowid: i64, - /// Stable per-message ID of the highest record consumed. Used to filter - /// duplicates on subsequent reads. + /// Unix ms of the most recently seen composer's `lastUpdatedAt`. + #[serde(default)] + pub last_updated_at_ms: i64, + + /// Per-workspace-id: number of prompts already emitted. + /// Key = workspace hash, Value = count of consumed prompts. + #[serde(default)] + pub seen_prompts: HashMap, + + /// Per-workspace-id: text of the most recently emitted prompt. Used to + /// recover the resume point even when Cursor truncates the prompts array + /// (the array is bounded — when the user types a new prompt, the oldest + /// gets dropped, so `seen_count == array_length` no longer means + /// "caught up"). On read we look for this text in the current array; if + /// found, we resume from the next index. If absent (truncated), we fall + /// back to emitting the full array — duplicates are filtered downstream + /// by progress-log dedupe and Task accumulation. + #[serde(default)] + pub last_prompt_text: HashMap, + + /// Most recently active project path. Written here so that + /// `infer_project_dir_from_cursor` in the pipeline can route fs-watcher + /// events to the right `.carryover/` directory without re-reading headers. + #[serde(default)] + pub project_dir: Option, + + /// Legacy watermark for old-schema reads. Preserved so that old stored + /// cursors can resume without re-emitting everything on first upgrade. + #[serde(default)] pub last_msg_id: String, } @@ -47,25 +82,25 @@ pub struct CursorCursor { /// Cursor transcript adapter. pub struct CursorAdapter { - /// Optional override for the directory containing `state.vscdb`. + /// Optional override for the globalStorage directory. /// `None` means use the OS-default path. pub db_root: Option, } impl CursorAdapter { - /// Default constructor — uses the OS-default `state.vscdb` location. + /// Default constructor — uses the OS-default globalStorage path. pub fn new() -> Self { Self { db_root: None } } - /// Test override: set the *directory* that contains `state.vscdb`. + /// Test override: set the *globalStorage directory* (parent of `state.vscdb`). pub fn with_db_root(root: PathBuf) -> Self { Self { db_root: Some(root), } } - /// Return the configured `db_root`, or the OS-default globalStorage path. + /// Return the configured db_root, or the OS-default globalStorage path. fn resolve_db_root(&self) -> Result { if let Some(p) = &self.db_root { return Ok(p.clone()); @@ -95,17 +130,25 @@ impl CursorAdapter { #[cfg(not(any(target_os = "linux", target_os = "macos")))] { - // Windows + others: v0.4+ work. Stub returns PathNotFound. Err(AdapterError::PathNotFound(PathBuf::from( "Cursor globalStorage (unsupported platform for v0.1)", ))) } } - /// Path to the `state.vscdb` file. + /// Path to the global `state.vscdb`. fn db_path(&self) -> Result { Ok(self.resolve_db_root()?.join("state.vscdb")) } + + /// workspaceStorage root — a sibling directory of globalStorage. + fn workspace_storage_root(&self) -> Result { + let global_root = self.resolve_db_root()?; + let parent = global_root + .parent() + .ok_or_else(|| AdapterError::PathNotFound(global_root.clone()))?; + Ok(parent.join("workspaceStorage")) + } } impl Default for CursorAdapter { @@ -127,15 +170,8 @@ impl Adapter for CursorAdapter { fn detect(&self) -> Result, AdapterError> { match self.db_path() { - Ok(p) => { - if p.exists() { - Ok(Some(p)) - } else { - Ok(None) - } - } - // Platform not supported or $HOME missing → not detected. - Err(_) => Ok(None), + Ok(p) if p.exists() => Ok(Some(p)), + _ => Ok(None), } } @@ -143,107 +179,18 @@ impl Adapter for CursorAdapter { &self, since: &CursorCursor, ) -> Result<(Vec, CursorCursor), AdapterError> { - // --------------------------------------------------------------- - // Containment check - // --------------------------------------------------------------- - // `since.db_path` is persisted to the SQLite cursors table and could be - // tampered with. Without this check a tampered cursor could read - // arbitrary files on the host. canonicalize() resolves symlinks before - // the prefix check, closing symlink-escape paths as well. - if !since.db_path.as_os_str().is_empty() { - let db_root = self.resolve_db_root()?; - let canonical_root = db_root - .canonicalize() - .map_err(|_| AdapterError::PathNotFound(db_root.clone()))?; - // Parent of state.vscdb must equal canonical_root. - let canonical_db = since - .db_path - .canonicalize() - .map_err(|_| AdapterError::PathNotFound(since.db_path.clone()))?; - let db_parent = canonical_db - .parent() - .ok_or_else(|| AdapterError::PathNotFound(since.db_path.clone()))?; - if !db_parent.starts_with(&canonical_root) { - return Err(AdapterError::PathNotFound(since.db_path.clone())); - } + let global_db = self.db_path()?; + if !global_db.exists() { + return Ok((vec![], since.clone())); } - // --------------------------------------------------------------- - // Resolve the DB path to use - // --------------------------------------------------------------- - let live_db = if since.db_path.as_os_str().is_empty() { - self.db_path()? - } else { - since.db_path.clone() - }; - - // --------------------------------------------------------------- - // Open with retries + WAL-copy fallback - // --------------------------------------------------------------- - // We try to open the live DB up to 3 times with increasing backoff. - // On a busy/locked error we fall back to a tempdir copy. - // - // `_temp_guard` MUST stay alive for the entire body of this method. - // When it drops at end-of-scope (after the queries below complete) - // the tempdir + copied DB + sidecars are removed automatically. - // This prevents tempdir leakage across busy-poll cycles. - let (conn, _temp_guard) = open_with_fallback(&live_db)?; - - // --------------------------------------------------------------- - // Query each of the three known keys - // --------------------------------------------------------------- - let mut raw_msgs: Vec = Vec::new(); - - raw_msgs.extend(read_generations(&conn)?); - raw_msgs.extend(read_prompts(&conn)?); - raw_msgs.extend(read_composer_data(&conn)?); - - // --------------------------------------------------------------- - // Filter messages already consumed - // --------------------------------------------------------------- - let filtered: Vec = raw_msgs - .into_iter() - .filter(|m| m.msg_id > since.last_msg_id) - .collect(); - - // --------------------------------------------------------------- - // Build RawRecords - // --------------------------------------------------------------- - let records: Vec = filtered - .iter() - .enumerate() - .map(|(i, m)| { - let payload = serde_json::to_vec(&m.value).unwrap_or_default(); - // Per-record monotonic offset within the batch so downstream - // error attribution and dedup keys are unique per row. - let offset = (since.last_rowid + 1 + i as i64) as u64; - RawRecord { - tool: "cursor".to_string(), - payload, - offset, - } - }) - .collect(); - - // --------------------------------------------------------------- - // Advance cursor - // --------------------------------------------------------------- - let new_last_msg_id = filtered - .iter() - .map(|m| m.msg_id.as_str()) - .max() - .map(|s| s.to_string()) - .unwrap_or_else(|| since.last_msg_id.clone()); - - let new_rowid = since.last_rowid + filtered.len() as i64; + let ws_root = self.workspace_storage_root()?; + let (conn, _guard) = open_with_fallback(&global_db)?; - let advanced = CursorCursor { - db_path: live_db, - last_rowid: new_rowid, - last_msg_id: new_last_msg_id, - }; - - Ok((records, advanced)) + match read_item_value(&conn, "composer.composerHeaders")? { + Some(headers_text) => read_new_schema(since, &headers_text, &ws_root), + None => read_old_schema(&conn, since), + } } fn parse(&self, records: Vec) -> Result, AdapterError> { @@ -257,7 +204,6 @@ impl Adapter for CursorAdapter { source: e, })?; - // Required: sessionId let session_id = v .get("sessionId") .and_then(|s| s.as_str()) @@ -268,10 +214,8 @@ impl Adapter for CursorAdapter { })? .to_string(); - // Required: ts (unix epoch ms) let ts = extract_ts(&v, rec.offset)?; - // Required: role let role = v .get("role") .and_then(|r| r.as_str()) @@ -282,7 +226,6 @@ impl Adapter for CursorAdapter { })? .to_string(); - // content: string preferred, fall back to JSON serialization let content = match v.get("content") { None => String::new(), Some(serde_json::Value::String(s)) => s.clone(), @@ -310,207 +253,316 @@ impl Adapter for CursorAdapter { } // --------------------------------------------------------------------------- -// Internal helpers +// New-schema reader // --------------------------------------------------------------------------- -/// A parsed message from any of the three ItemTable keys, normalized to a -/// common shape before being turned into a `RawRecord`. -#[derive(Debug)] -struct ParsedMsg { - /// Stable string ID used to deduplicate across reads. - msg_id: String, - /// JSON value to be stored as `RawRecord::payload`. - value: serde_json::Value, +struct ComposerHeader { + composer_id: String, + workspace_id: String, + fs_path: String, + last_updated_at_ms: i64, } -/// Attempt a single read-only open of `db_path`. -fn attempt_open(db_path: &Path) -> Result { - let conn = Connection::open_with_flags( - db_path, - OpenFlags::SQLITE_OPEN_READ_ONLY - | OpenFlags::SQLITE_OPEN_NO_MUTEX - | OpenFlags::SQLITE_OPEN_URI, - )?; - conn.busy_timeout(Duration::from_millis(200))?; - Ok(conn) -} +fn parse_composer_headers(text: &str) -> Result, AdapterError> { + let obj: serde_json::Value = serde_json::from_str(text).map_err(|e| AdapterError::Parse { + offset: 0, + context: "composer.composerHeaders is not valid JSON", + source: e, + })?; -/// Copy `db_path` and its `-wal`/`-shm` sidecars to a fresh tempdir. -/// Returns `(TempDir, path_to_copied_db)`. The `TempDir` must be kept alive -/// for the duration of the connection; it cleans up on drop. -/// -/// On unix the tempdir is created with mode 0o700 and every copied file is -/// chmod'd to 0o600. Without these the tempdir inherits 0o755 (umask) and -/// the copied DB inherits the source mode (often 0o644), making the copied -/// transcript readable by every local user — unacceptable since transcripts -/// may contain pasted secrets. -fn copy_db_to_temp(db_path: &Path) -> Result<(tempfile::TempDir, PathBuf), AdapterError> { - let tmp = build_owner_only_tempdir()?; - let dst = tmp.path().join("state.vscdb"); + let arr = match obj.get("allComposers").and_then(|v| v.as_array()) { + Some(a) => a, + None => return Ok(vec![]), + }; - std::fs::copy(db_path, &dst).map_err(AdapterError::WalCopyFailed)?; - chmod_owner_only(&dst)?; + let mut out = Vec::with_capacity(arr.len()); + for item in arr { + let composer_id = match item.get("composerId").and_then(|v| v.as_str()) { + Some(s) if !s.is_empty() => s.to_string(), + _ => continue, + }; - // Copy sidecars if present; missing is fine. - let wal = db_path.with_extension("vscdb-wal"); - if wal.exists() { - let dst_wal = tmp.path().join("state.vscdb-wal"); - std::fs::copy(&wal, &dst_wal).map_err(AdapterError::WalCopyFailed)?; - chmod_owner_only(&dst_wal)?; - } - let shm = db_path.with_extension("vscdb-shm"); - if shm.exists() { - let dst_shm = tmp.path().join("state.vscdb-shm"); - std::fs::copy(&shm, &dst_shm).map_err(AdapterError::WalCopyFailed)?; - chmod_owner_only(&dst_shm)?; + let ws_ident = match item.get("workspaceIdentifier") { + Some(v) => v, + None => continue, + }; + + let workspace_id = match ws_ident.get("id").and_then(|v| v.as_str()) { + Some(s) if !s.is_empty() => s.to_string(), + _ => continue, + }; + + let fs_path = ws_ident + .get("uri") + .and_then(|u| u.get("fsPath")) + .and_then(|v| v.as_str()) + .unwrap_or("") + .to_string(); + + let last_updated_at_ms = item + .get("lastUpdatedAt") + .and_then(|v| v.as_i64()) + .unwrap_or(0); + + out.push(ComposerHeader { + composer_id, + workspace_id, + fs_path, + last_updated_at_ms, + }); } - Ok((tmp, dst)) + Ok(out) } -/// Create a tempdir with mode 0o700 on unix. On non-unix targets we fall -/// back to the platform default (Windows ACLs grant owner-only access by -/// default for user temp directories). -fn build_owner_only_tempdir() -> Result { - #[cfg(unix)] - { - use std::os::unix::fs::PermissionsExt; - tempfile::Builder::new() - .permissions(std::fs::Permissions::from_mode(0o700)) - .tempdir() - .map_err(AdapterError::WalCopyFailed) - } - #[cfg(not(unix))] - { - tempfile::tempdir().map_err(AdapterError::WalCopyFailed) - } +/// Workspace IDs are hex-ish hashes; reject anything with path-traversal chars. +fn is_safe_workspace_id(id: &str) -> bool { + !id.is_empty() + && id.len() <= 64 + && id + .chars() + .all(|c| c.is_ascii_alphanumeric() || c == '-' || c == '_') } -/// chmod a single file to 0o600 on unix; no-op elsewhere. -fn chmod_owner_only(path: &Path) -> Result<(), AdapterError> { - #[cfg(unix)] - { - use std::os::unix::fs::PermissionsExt; - std::fs::set_permissions(path, std::fs::Permissions::from_mode(0o600)) - .map_err(AdapterError::WalCopyFailed)?; - } - #[cfg(not(unix))] - { - let _ = path; - } - Ok(()) +/// Read new-style workspace prompts (plain `{text, commandType}` objects). +/// +/// Returns prompts after the resume point. The resume point is determined by +/// (in priority order): +/// 1. `last_text`: find the last occurrence of this text in the array, resume +/// from the next index. Survives Cursor's bounded-array truncation. +/// 2. `skip_fallback`: legacy index-based skip (used on first upgrade when +/// `last_text` is None). +/// +/// On truncation (last_text not found in array), emits the full array — the +/// progress-log and Task accumulators dedupe by content downstream. +fn read_workspace_prompts( + conn: &Connection, + last_text: Option<&str>, + skip_fallback: usize, +) -> Result<(Vec, Option), AdapterError> { + let text = match read_item_value(conn, "aiService.prompts")? { + None => return Ok((vec![], None)), + Some(t) => t, + }; + + let arr: Vec = + serde_json::from_str(&text).map_err(|e| AdapterError::Parse { + offset: 0, + context: "aiService.prompts is not a JSON array", + source: e, + })?; + + let resume_index = match last_text { + Some(t) => arr + .iter() + .rposition(|item| item.get("text").and_then(|v| v.as_str()) == Some(t)) + .map(|i| i + 1) + .unwrap_or(0), // not found = truncated, re-emit all (dedup handles) + None => skip_fallback.min(arr.len()), + }; + + // Capture the last prompt's text (for the next read's watermark) BEFORE + // we move arr into the iterator. + let new_last_text = arr + .last() + .and_then(|item| item.get("text").and_then(|v| v.as_str())) + .map(String::from); + + let out: Vec = arr + .into_iter() + .skip(resume_index) + .filter_map(|item| { + item.get("text") + .and_then(|v| v.as_str()) + .filter(|s| !s.is_empty()) + .map(|s| s.to_string()) + }) + .collect(); + + Ok((out, new_last_text)) } -/// Return true if a rusqlite error indicates the DB is busy / locked. -fn is_busy(err: &rusqlite::Error) -> bool { - use rusqlite::ffi::ErrorCode; - matches!( - err, - rusqlite::Error::SqliteFailure(e, _) - if e.code == ErrorCode::DatabaseBusy || e.code == ErrorCode::SystemIoFailure - ) +/// Read generation timestamps; index-aligned with prompts. +fn read_workspace_generation_timestamps(conn: &Connection) -> Result, AdapterError> { + let text = match read_item_value(conn, "aiService.generations")? { + None => return Ok(vec![]), + Some(t) => t, + }; + + let arr: Vec = + serde_json::from_str(&text).map_err(|e| AdapterError::Parse { + offset: 0, + context: "aiService.generations is not a JSON array", + source: e, + })?; + + Ok(arr + .into_iter() + .filter_map(|item| item.get("unixMs").and_then(|v| v.as_i64())) + .collect()) } -/// Open `db_path` with retries (50ms / 200ms / 500ms backoff) and WAL-copy -/// fallback on busy. Returns `(Connection, Option)`. The caller must -/// keep the returned `TempDir` alive for the entire connection lifetime; -/// dropping the tuple cleans up the copy automatically. -fn open_with_fallback( - db_path: &Path, -) -> Result<(Connection, Option), AdapterError> { - let delays = [50u64, 200, 500]; +fn read_new_schema( + since: &CursorCursor, + headers_text: &str, + ws_root: &Path, +) -> Result<(Vec, CursorCursor), AdapterError> { + let mut composers = parse_composer_headers(headers_text)?; + if composers.is_empty() { + return Ok((vec![], since.clone())); + } - for delay_ms in delays { - match attempt_open(db_path) { - Ok(conn) => return Ok((conn, None)), - Err(e) if is_busy(&e) => { - std::thread::sleep(Duration::from_millis(delay_ms)); - } - Err(e) => return Err(AdapterError::Sqlite(e)), + // Most-recently-updated first; cap to bound I/O on large installs. + composers.sort_unstable_by_key(|c| std::cmp::Reverse(c.last_updated_at_ms)); + composers.truncate(MAX_WORKSPACES_PER_POLL); + + // Most recently updated composer's path is the active project. + let new_project_dir = composers + .first() + .map(|c| c.fs_path.clone()) + .filter(|s| !s.is_empty()); + + let mut new_seen_prompts = since.seen_prompts.clone(); + let mut new_last_prompt_text = since.last_prompt_text.clone(); + let mut new_last_updated_at_ms = since.last_updated_at_ms; + let mut all_msgs: Vec = Vec::new(); + + for composer in &composers { + if !is_safe_workspace_id(&composer.workspace_id) { + continue; + } + + let ws_db = ws_root.join(&composer.workspace_id).join("state.vscdb"); + if !ws_db.exists() { + continue; + } + + // Use new_* (already updated by earlier composers in this batch) so + // that two composers sharing the same workspace don't both re-read it. + let skip_fallback = new_seen_prompts + .get(&composer.workspace_id) + .copied() + .unwrap_or(0); + let last_text = new_last_prompt_text + .get(&composer.workspace_id) + .map(|s| s.as_str()); + + let (ws_conn, _ws_guard) = match open_with_fallback(&ws_db) { + Ok(c) => c, + Err(_) => continue, + }; + + let (new_prompts, latest_array_text) = + read_workspace_prompts(&ws_conn, last_text, skip_fallback)?; + + // Update last_prompt_text watermark to the LAST item in the array + // (regardless of whether we emitted any new prompts) so future reads + // can find the resume point even if Cursor truncates. + if let Some(t) = latest_array_text { + new_last_prompt_text.insert(composer.workspace_id.clone(), t); + } + + if new_prompts.is_empty() { + continue; + } + + let gen_timestamps = read_workspace_generation_timestamps(&ws_conn)?; + let fallback_ts = composer.last_updated_at_ms; + + for (i, text) in new_prompts.iter().enumerate() { + let abs_idx = skip_fallback + i; + let ts = gen_timestamps.get(abs_idx).copied().unwrap_or(fallback_ts); + all_msgs.push(ParsedMsg { + msg_id: format!("{}:prompt:{}", composer.composer_id, abs_idx), + value: serde_json::json!({ + "sessionId": composer.composer_id, + "role": "user", + "content": text, + "ts": ts, + }), + }); + } + + new_seen_prompts.insert( + composer.workspace_id.clone(), + skip_fallback + new_prompts.len(), + ); + if composer.last_updated_at_ms > new_last_updated_at_ms { + new_last_updated_at_ms = composer.last_updated_at_ms; } } - // All retries exhausted with busy errors — fall back to a file copy that - // lives in an owner-only tempdir. The TempDir guard is returned so it - // drops alongside the Connection at the caller's end-of-scope. - let (tmp, copied_path) = copy_db_to_temp(db_path)?; - let conn = attempt_open(&copied_path).map_err(AdapterError::Sqlite)?; - Ok((conn, Some(tmp))) + let records: Vec = all_msgs + .iter() + .enumerate() + .map(|(i, m)| RawRecord { + tool: "cursor".to_string(), + payload: serde_json::to_vec(&m.value).unwrap_or_default(), + offset: i as u64 + 1, + }) + .collect(); + + let advanced = CursorCursor { + last_updated_at_ms: new_last_updated_at_ms, + seen_prompts: new_seen_prompts, + last_prompt_text: new_last_prompt_text, + project_dir: new_project_dir, + last_msg_id: String::new(), + }; + + Ok((records, advanced)) } -/// Read and check the byte length of a value from `ItemTable`. -fn read_item_value(conn: &Connection, key: &str) -> Result, AdapterError> { - let mut stmt = conn - .prepare("SELECT value FROM ItemTable WHERE key = ?1") - .map_err(AdapterError::Sqlite)?; +// --------------------------------------------------------------------------- +// Old-schema reader (pre-migration fallback) +// --------------------------------------------------------------------------- - // Use query_row with get_ref so we can handle both TEXT and BLOB columns. - // The Python fixture writes TEXT (json.dumps); real Cursor may use BLOB. - let result: Option = stmt - .query_row([key], |row| { - use rusqlite::types::ValueRef; - let bytes: Vec = match row.get_ref(0)? { - ValueRef::Text(t) => t.to_vec(), - ValueRef::Blob(b) => b.to_vec(), - ValueRef::Null => return Ok(None), - _ => { - return Err(rusqlite::Error::InvalidColumnType( - 0, - "value".to_string(), - rusqlite::types::Type::Blob, - )) - } - }; - Ok(Some(bytes)) - }) - .optional() - .map_err(AdapterError::Sqlite)? - .flatten() - .map(|bytes| { - if bytes.len() > MAX_BYTES_PER_VALUE { - // Signal size cap via a sentinel; checked below. - Err(bytes) - } else { - Ok(bytes) - } - }) - .transpose() - .map_err(|oversized| { - let _ = oversized; - let dummy_err = serde_json::from_str::("").unwrap_err(); - AdapterError::Parse { - offset: 0, - context: "value exceeds size cap", - source: dummy_err, - } - })? - .map(|bytes| { - String::from_utf8(bytes).map_err(|e| AdapterError::Parse { - offset: 0, - context: "ItemTable value is not valid UTF-8", - source: serde_json::from_str::(&e.to_string()).unwrap_err(), - }) +fn read_old_schema( + conn: &Connection, + since: &CursorCursor, +) -> Result<(Vec, CursorCursor), AdapterError> { + let mut raw_msgs: Vec = Vec::new(); + raw_msgs.extend(read_generations(conn)?); + raw_msgs.extend(read_prompts(conn)?); + raw_msgs.extend(read_composer_data(conn)?); + + let filtered: Vec = raw_msgs + .into_iter() + .filter(|m| m.msg_id > since.last_msg_id) + .collect(); + + let new_last_msg_id = filtered + .iter() + .map(|m| m.msg_id.as_str()) + .max() + .map(|s| s.to_string()) + .unwrap_or_else(|| since.last_msg_id.clone()); + + let records: Vec = filtered + .iter() + .enumerate() + .map(|(i, m)| RawRecord { + tool: "cursor".to_string(), + payload: serde_json::to_vec(&m.value).unwrap_or_default(), + offset: i as u64 + 1, }) - .transpose()?; + .collect(); + + let advanced = CursorCursor { + last_updated_at_ms: since.last_updated_at_ms, + seen_prompts: since.seen_prompts.clone(), + last_prompt_text: since.last_prompt_text.clone(), + project_dir: since.project_dir.clone(), + last_msg_id: new_last_msg_id, + }; - Ok(result) + Ok((records, advanced)) } -/// Parse `aiService.generations` → `ParsedMsg` list. -/// -/// Each generation object has shape: -/// ```json -/// { -/// "generationId": "...", -/// "sessionId": "...", -/// "requestTs": 1234567890000, -/// "responseTs": 1234567891500, -/// "userMessage": "...", -/// "assistantMessage": "..." -/// } -/// ``` -/// We emit two `ParsedMsg` per generation (one user turn, one assistant turn), -/// because each generation encodes both the prompt and the response. +// --------------------------------------------------------------------------- +// Old-schema key parsers +// --------------------------------------------------------------------------- + fn read_generations(conn: &Connection) -> Result, AdapterError> { let text = match read_item_value(conn, "aiService.generations")? { None => return Ok(vec![]), @@ -540,7 +592,6 @@ fn read_generations(conn: &Connection) -> Result, AdapterError> { .to_string(); let request_ts = gen.get("requestTs").and_then(|v| v.as_i64()).unwrap_or(0); - let response_ts = gen.get("responseTs").and_then(|v| v.as_i64()).unwrap_or(0); let user_msg = gen @@ -548,14 +599,12 @@ fn read_generations(conn: &Connection) -> Result, AdapterError> { .and_then(|v| v.as_str()) .unwrap_or("") .to_string(); - let asst_msg = gen .get("assistantMessage") .and_then(|v| v.as_str()) .unwrap_or("") .to_string(); - // User turn out.push(ParsedMsg { msg_id: format!("{gen_id}:user"), value: serde_json::json!({ @@ -565,8 +614,6 @@ fn read_generations(conn: &Connection) -> Result, AdapterError> { "ts": request_ts, }), }); - - // Assistant turn out.push(ParsedMsg { msg_id: format!("{gen_id}:assistant"), value: serde_json::json!({ @@ -581,18 +628,6 @@ fn read_generations(conn: &Connection) -> Result, AdapterError> { Ok(out) } -/// Parse `aiService.prompts` → `ParsedMsg` list. -/// -/// Each prompt object has shape: -/// ```json -/// { -/// "promptId": "...", -/// "sessionId": "...", -/// "ts": 1234567890000, -/// "text": "...", -/// "files": [...] -/// } -/// ``` fn read_prompts(conn: &Connection) -> Result, AdapterError> { let text = match read_item_value(conn, "aiService.prompts")? { None => return Ok(vec![]), @@ -609,11 +644,11 @@ fn read_prompts(conn: &Connection) -> Result, AdapterError> { let mut out = Vec::with_capacity(arr.len()); for prompt in arr { - let prompt_id = prompt - .get("promptId") - .and_then(|v| v.as_str()) - .unwrap_or("") - .to_string(); + // Old-schema prompts have a promptId field; new-schema prompts do not. + let prompt_id = match prompt.get("promptId").and_then(|v| v.as_str()) { + Some(id) if !id.is_empty() => id.to_string(), + _ => continue, // new-schema prompt — skip in old fallback + }; let session_id = prompt .get("sessionId") @@ -643,10 +678,6 @@ fn read_prompts(conn: &Connection) -> Result, AdapterError> { Ok(out) } -/// Parse `composer.composerData` → `ParsedMsg` list. -/// -/// The value is a JSON object with a `composers` array. Each composer has a -/// `messages` array with `{role, content, ts}` objects. fn read_composer_data(conn: &Connection) -> Result, AdapterError> { let text = match read_item_value(conn, "composer.composerData")? { None => return Ok(vec![]), @@ -714,7 +745,161 @@ fn read_composer_data(conn: &Connection) -> Result, AdapterError> Ok(out) } -/// Extract `ts` from a parsed message value. +// --------------------------------------------------------------------------- +// Shared helpers +// --------------------------------------------------------------------------- + +#[derive(Debug)] +struct ParsedMsg { + msg_id: String, + value: serde_json::Value, +} + +fn attempt_open(db_path: &Path) -> Result { + let conn = Connection::open_with_flags( + db_path, + OpenFlags::SQLITE_OPEN_READ_ONLY + | OpenFlags::SQLITE_OPEN_NO_MUTEX + | OpenFlags::SQLITE_OPEN_URI, + )?; + conn.busy_timeout(Duration::from_millis(200))?; + Ok(conn) +} + +fn copy_db_to_temp(db_path: &Path) -> Result<(tempfile::TempDir, PathBuf), AdapterError> { + let tmp = build_owner_only_tempdir()?; + let dst = tmp.path().join("state.vscdb"); + + std::fs::copy(db_path, &dst).map_err(AdapterError::WalCopyFailed)?; + chmod_owner_only(&dst)?; + + let wal = db_path.with_extension("vscdb-wal"); + if wal.exists() { + let dst_wal = tmp.path().join("state.vscdb-wal"); + std::fs::copy(&wal, &dst_wal).map_err(AdapterError::WalCopyFailed)?; + chmod_owner_only(&dst_wal)?; + } + let shm = db_path.with_extension("vscdb-shm"); + if shm.exists() { + let dst_shm = tmp.path().join("state.vscdb-shm"); + std::fs::copy(&shm, &dst_shm).map_err(AdapterError::WalCopyFailed)?; + chmod_owner_only(&dst_shm)?; + } + + Ok((tmp, dst)) +} + +fn build_owner_only_tempdir() -> Result { + #[cfg(unix)] + { + use std::os::unix::fs::PermissionsExt; + tempfile::Builder::new() + .permissions(std::fs::Permissions::from_mode(0o700)) + .tempdir() + .map_err(AdapterError::WalCopyFailed) + } + #[cfg(not(unix))] + { + tempfile::tempdir().map_err(AdapterError::WalCopyFailed) + } +} + +fn chmod_owner_only(path: &Path) -> Result<(), AdapterError> { + #[cfg(unix)] + { + use std::os::unix::fs::PermissionsExt; + std::fs::set_permissions(path, std::fs::Permissions::from_mode(0o600)) + .map_err(AdapterError::WalCopyFailed)?; + } + #[cfg(not(unix))] + { + let _ = path; + } + Ok(()) +} + +fn is_busy(err: &rusqlite::Error) -> bool { + use rusqlite::ffi::ErrorCode; + matches!( + err, + rusqlite::Error::SqliteFailure(e, _) + if e.code == ErrorCode::DatabaseBusy || e.code == ErrorCode::SystemIoFailure + ) +} + +fn open_with_fallback( + db_path: &Path, +) -> Result<(Connection, Option), AdapterError> { + let delays = [50u64, 200, 500]; + + for delay_ms in delays { + match attempt_open(db_path) { + Ok(conn) => return Ok((conn, None)), + Err(e) if is_busy(&e) => { + std::thread::sleep(Duration::from_millis(delay_ms)); + } + Err(e) => return Err(AdapterError::Sqlite(e)), + } + } + + let (tmp, copied_path) = copy_db_to_temp(db_path)?; + let conn = attempt_open(&copied_path).map_err(AdapterError::Sqlite)?; + Ok((conn, Some(tmp))) +} + +fn read_item_value(conn: &Connection, key: &str) -> Result, AdapterError> { + let mut stmt = conn + .prepare("SELECT value FROM ItemTable WHERE key = ?1") + .map_err(AdapterError::Sqlite)?; + + let result: Option = stmt + .query_row([key], |row| { + use rusqlite::types::ValueRef; + let bytes: Vec = match row.get_ref(0)? { + ValueRef::Text(t) => t.to_vec(), + ValueRef::Blob(b) => b.to_vec(), + ValueRef::Null => return Ok(None), + _ => { + return Err(rusqlite::Error::InvalidColumnType( + 0, + "value".to_string(), + rusqlite::types::Type::Blob, + )) + } + }; + Ok(Some(bytes)) + }) + .optional() + .map_err(AdapterError::Sqlite)? + .flatten() + .map(|bytes| { + if bytes.len() > MAX_BYTES_PER_VALUE { + Err(bytes) + } else { + Ok(bytes) + } + }) + .transpose() + .map_err(|_oversized| { + let dummy_err = serde_json::from_str::("").unwrap_err(); + AdapterError::Parse { + offset: 0, + context: "value exceeds size cap", + source: dummy_err, + } + })? + .map(|bytes| { + String::from_utf8(bytes).map_err(|e| AdapterError::Parse { + offset: 0, + context: "ItemTable value is not valid UTF-8", + source: serde_json::from_str::(&e.to_string()).unwrap_err(), + }) + }) + .transpose()?; + + Ok(result) +} + fn extract_ts(v: &serde_json::Value, offset: u64) -> Result { match v.get("ts") { Some(serde_json::Value::Number(n)) => n.as_i64().ok_or_else(|| AdapterError::Parse { @@ -730,8 +915,6 @@ fn extract_ts(v: &serde_json::Value, offset: u64) -> Result { } } -/// Produce a synthetic `serde_json::Error` for use in `AdapterError::Parse` -/// where the true error is a missing field rather than a JSON syntax error. fn make_missing_field_error() -> serde_json::Error { serde_json::from_str::("").unwrap_err() } @@ -747,239 +930,200 @@ mod tests { const FIXTURE_DIR: &str = concat!(env!("CARGO_MANIFEST_DIR"), "/tests/fixtures/cursor"); - fn fixture_db_path() -> PathBuf { - PathBuf::from(FIXTURE_DIR).join("1-state.vscdb") + // --- New schema helpers --- + + fn new_schema_adapter() -> CursorAdapter { + CursorAdapter::with_db_root(PathBuf::from(FIXTURE_DIR).join("globalStorage")) } - /// Build an adapter that points at the fixture directory so the - /// containment check accepts the fixture path. - fn fixture_adapter() -> CursorAdapter { - CursorAdapter::with_db_root(PathBuf::from(FIXTURE_DIR)) + fn empty_cursor_new() -> CursorCursor { + CursorCursor::default() } - fn empty_cursor(db_path: PathBuf) -> CursorCursor { - CursorCursor { - db_path, - last_rowid: 0, - last_msg_id: String::new(), - } + // --- Old schema helpers --- + + fn old_schema_adapter() -> CursorAdapter { + CursorAdapter::with_db_root(PathBuf::from(FIXTURE_DIR).join("oldSchema")) } - // 1. parses_aiservice_generations + fn empty_cursor_old() -> CursorCursor { + CursorCursor::default() + } + + // ----------------------------------------------------------------------- + // New schema tests + // ----------------------------------------------------------------------- + #[test] - fn parses_aiservice_generations() { - let adapter = fixture_adapter(); - let since = empty_cursor(fixture_db_path()); - let (records, _cursor) = adapter.read_new_records(&since).unwrap(); + fn new_schema_parses_workspace_prompts() { + let adapter = new_schema_adapter(); + let (records, _cursor) = adapter.read_new_records(&empty_cursor_new()).unwrap(); let rows = adapter.parse(records).unwrap(); - // Fixture has 3 generations × 2 turns = 6 rows from this key alone (plus prompts + composers). - let assistant_rows: Vec<_> = rows.iter().filter(|r| r.role == "assistant").collect(); + // Fixture has 2 prompts in ws1 + 1 prompt in ws2 = 3 total. + assert_eq!(rows.len(), 3, "expected 3 user rows from two workspaces"); assert!( - !assistant_rows.is_empty(), - "expected at least one assistant row from generations" + rows.iter().all(|r| r.role == "user"), + "all rows must be user" ); - // All assistant rows have a session_id and tool = "cursor" - for row in &assistant_rows { - assert!(!row.session_id.is_empty(), "session_id must not be empty"); - assert_eq!(row.tool, "cursor"); - } - // Spot-check: first generation is in session-001 + assert!(rows.iter().all(|r| !r.session_id.is_empty())); + + // Content spot-check + let contents: Vec<&str> = rows.iter().map(|r| r.content.as_str()).collect(); assert!( - rows.iter().any(|r| r.session_id == "cursor-session-001"), - "expected cursor-session-001 in results" + contents.contains(&"How do I implement binary search?"), + "expected binary search prompt" ); - } - - // 2. parses_aiservice_prompts - #[test] - fn parses_aiservice_prompts() { - let adapter = fixture_adapter(); - let since = empty_cursor(fixture_db_path()); - let (records, _cursor) = adapter.read_new_records(&since).unwrap(); - let rows = adapter.parse(records).unwrap(); - - // Fixture has 3 prompts, all with role "user" - let user_rows: Vec<_> = rows.iter().filter(|r| r.role == "user").collect(); assert!( - !user_rows.is_empty(), - "expected at least one user row from prompts" + contents.contains(&"Explain Docker networking"), + "expected docker prompt" ); - // At least one user row must contain "CSS" content from the fixture - let css_row = user_rows - .iter() - .any(|r| r.content.contains("CSS") || r.content.contains("center a div")); - assert!(css_row, "expected CSS prompt content in user rows"); } - // 3. merges_three_keys #[test] - fn merges_three_keys() { - let adapter = fixture_adapter(); - let since = empty_cursor(fixture_db_path()); - let (records, _cursor) = adapter.read_new_records(&since).unwrap(); - let rows = adapter.parse(records).unwrap(); + fn new_schema_cursor_includes_project_dir() { + let adapter = new_schema_adapter(); + let (_records, cursor) = adapter.read_new_records(&empty_cursor_new()).unwrap(); - // Fixture: 3 generations×2 + 3 prompts + 2 composers×2msgs = 6+3+4 = 13 - // But some may deduplicate by msg_id across keys (they have distinct IDs in fixture). - // We simply assert we have enough to cover all three key sources. - assert!( - rows.len() >= 6, - "expected at least 6 rows (3 keys merged), got {}", - rows.len() - ); - // Multiple sessions present - let sessions: std::collections::HashSet<_> = - rows.iter().map(|r| r.session_id.as_str()).collect(); - assert!( - sessions.len() >= 2, - "expected rows from at least 2 sessions" + // Most-recently-updated composer is composer-new-0001 (ts=1700300010000), + // which lives in /synthetic/project-alpha. + assert_eq!( + cursor.project_dir.as_deref(), + Some("/synthetic/project-alpha"), + "project_dir must point to the most-recently-updated composer's fsPath" ); } - // 4. cursor_advances_monotonically #[test] - fn cursor_advances_monotonically() { - let adapter = fixture_adapter(); - let since0 = empty_cursor(fixture_db_path()); + fn new_schema_cursor_advances_monotonically() { + let adapter = new_schema_adapter(); + let since0 = empty_cursor_new(); let (records1, cursor1) = adapter.read_new_records(&since0).unwrap(); assert!(!records1.is_empty(), "first read must return records"); - assert!( - cursor1.last_rowid >= since0.last_rowid, - "cursor must not regress" - ); - // Second read with advanced cursor should return no new records. + // Second read: seen_prompts are updated, so no new records. let (records2, cursor2) = adapter.read_new_records(&cursor1).unwrap(); assert!( records2.is_empty(), "second read with advanced cursor must return nothing" ); assert_eq!( - cursor2.last_rowid, cursor1.last_rowid, - "cursor must be stable when no new records" + cursor2.seen_prompts, cursor1.seen_prompts, + "seen_prompts must be stable when no new records" ); - assert_eq!(cursor2.last_msg_id, cursor1.last_msg_id); } - // 5. wal_lock_falls_back_to_copy - // - // Reliably triggering SQLITE_BUSY via a write-lock from another thread is - // difficult with read-only opens (WAL mode allows concurrent readers). - // We verify the fallback machinery compiles and the is_busy() helper works - // correctly. The full lock-path is exercised by the integration harness. #[test] - #[ignore = "SQLITE_BUSY cannot be reliably triggered for read-only opens in WAL mode; see is_busy_detects_database_busy_code below"] - fn wal_lock_falls_back_to_copy() { - let dir = tempfile::tempdir().unwrap(); - let src_db = dir.path().join("state.vscdb"); - - // Copy the fixture into the tempdir (acting as the "live" DB). - std::fs::copy(fixture_db_path(), &src_db).unwrap(); - - // Open an exclusive write transaction in a sibling thread. - let src_db_clone = src_db.clone(); - let handle = std::thread::spawn(move || { - let conn = - Connection::open_with_flags(&src_db_clone, OpenFlags::SQLITE_OPEN_READ_WRITE) - .unwrap(); - conn.execute_batch("BEGIN EXCLUSIVE;").unwrap(); - // Hold the lock for 2 seconds so the main thread hits SQLITE_BUSY. - std::thread::sleep(Duration::from_secs(2)); - conn.execute_batch("ROLLBACK;").unwrap(); - }); + fn new_schema_uses_generation_timestamps() { + let adapter = new_schema_adapter(); + let (records, _cursor) = adapter.read_new_records(&empty_cursor_new()).unwrap(); + let rows = adapter.parse(records).unwrap(); - // Small delay to let the exclusive lock be acquired. - std::thread::sleep(Duration::from_millis(50)); + // ws1 prompt[0] should use gen[0].unixMs = 1700300001000 + let binary_search_row = rows + .iter() + .find(|r| r.content == "How do I implement binary search?") + .expect("binary search prompt must be present"); + assert_eq!( + binary_search_row.ts, 1700300001000, + "prompt ts should come from corresponding generation unixMs" + ); + } - let adapter = CursorAdapter::with_db_root(dir.path().to_path_buf()); - let since = empty_cursor(src_db.clone()); - let result = adapter.read_new_records(&since); + #[test] + fn new_schema_two_sessions() { + let adapter = new_schema_adapter(); + let (records, _cursor) = adapter.read_new_records(&empty_cursor_new()).unwrap(); + let rows = adapter.parse(records).unwrap(); - // Whether the fallback succeeded or the read-only open bypassed the lock, - // we must NOT get a SqliteBusy error. - assert!( - result.is_ok(), - "WAL fallback must succeed; got: {:?}", - result.err() + let sessions: std::collections::HashSet<&str> = + rows.iter().map(|r| r.session_id.as_str()).collect(); + assert_eq!( + sessions.len(), + 2, + "rows must span two composer sessions (ws1 and ws2)" ); + assert!(sessions.contains("composer-new-0001")); + assert!(sessions.contains("composer-new-0002")); + } - handle.join().unwrap(); + #[test] + fn new_schema_unsafe_workspace_id_rejected() { + // is_safe_workspace_id must reject path-traversal strings. + assert!(!is_safe_workspace_id("../etc/passwd")); + assert!(!is_safe_workspace_id("ws/evil")); + assert!(!is_safe_workspace_id("")); + assert!(is_safe_workspace_id("wsaaa111bbb222cc")); + assert!(is_safe_workspace_id("f206e451-ab12-cd34")); } - // Unit test on the is_busy() helper (runs without a real lock). + // ----------------------------------------------------------------------- + // Old schema tests + // ----------------------------------------------------------------------- + #[test] - fn is_busy_detects_database_busy_code() { - use rusqlite::ffi::{Error as SqliteErr, ErrorCode}; - let busy_err = rusqlite::Error::SqliteFailure( - SqliteErr { - code: ErrorCode::DatabaseBusy, - extended_code: 5, - }, - None, - ); - assert!(is_busy(&busy_err), "DatabaseBusy must be detected as busy"); + fn old_schema_parses_generations_and_prompts() { + let adapter = old_schema_adapter(); + let since = empty_cursor_old(); + let (records, _cursor) = adapter.read_new_records(&since).unwrap(); + let rows = adapter.parse(records).unwrap(); - let io_err = rusqlite::Error::SqliteFailure( - SqliteErr { - code: ErrorCode::SystemIoFailure, - extended_code: 10, - }, - None, - ); - assert!(is_busy(&io_err), "SystemIoFailure must be detected as busy"); + // Old fixture: 3 generations×2 + 3 prompts (old-style with promptId) + 2×2 composer msgs = 13 + assert!(rows.len() >= 6, "expected at least 6 rows from old schema"); - let other_err = rusqlite::Error::SqliteFailure( - SqliteErr { - code: ErrorCode::NotADatabase, - extended_code: 26, - }, - None, + let sessions: std::collections::HashSet<&str> = + rows.iter().map(|r| r.session_id.as_str()).collect(); + assert!( + sessions.len() >= 2, + "expected rows from at least 2 sessions" ); - assert!(!is_busy(&other_err), "NotADatabase must NOT be busy"); + assert!(sessions.contains("cursor-session-001")); } - // 6. containment_check_rejects_paths_outside_root #[test] - fn containment_check_rejects_paths_outside_root() { - let dir = tempfile::tempdir().unwrap(); - let adapter = CursorAdapter::with_db_root(dir.path().to_path_buf()); + fn old_schema_cursor_advances_monotonically() { + let adapter = old_schema_adapter(); + let since0 = empty_cursor_old(); - // We need the db_root to exist (for canonicalize). - // dir is already created by tempdir(). + let (records1, cursor1) = adapter.read_new_records(&since0).unwrap(); + assert!(!records1.is_empty(), "first read must return records"); - // Point db_path at /tmp itself (a real path, but not inside db_root). - // Use a path that definitely exists so canonicalize succeeds. - let outside_path = PathBuf::from("/tmp"); + let (records2, cursor2) = adapter.read_new_records(&cursor1).unwrap(); + assert!(records2.is_empty(), "second read must return nothing"); + assert_eq!(cursor2.last_msg_id, cursor1.last_msg_id); + } - let since = CursorCursor { - db_path: outside_path, - last_rowid: 0, - last_msg_id: String::new(), - }; + // ----------------------------------------------------------------------- + // Shared / regression tests + // ----------------------------------------------------------------------- - let err = adapter.read_new_records(&since).unwrap_err(); + #[test] + fn detect_returns_path_when_global_db_present() { + let adapter = new_schema_adapter(); + let result = adapter.detect().unwrap(); assert!( - matches!(err, AdapterError::PathNotFound(_)), - "expected PathNotFound for path outside root, got: {:?}", - err + result.is_some(), + "detect must return Some when state.vscdb exists" ); } - // 7. read_cap_rejects_oversized_value + #[test] + fn detect_returns_none_when_absent() { + let dir = tempfile::tempdir().unwrap(); + let adapter = CursorAdapter::with_db_root(dir.path().to_path_buf()); + assert!(adapter.detect().unwrap().is_none()); + } + #[test] fn read_cap_rejects_oversized_value() { let dir = tempfile::tempdir().unwrap(); let db_path = dir.path().join("state.vscdb"); - // Create a minimal DB with an oversized value. { - let conn = Connection::open(&db_path).unwrap(); + let conn = rusqlite::Connection::open(&db_path).unwrap(); conn.execute_batch("CREATE TABLE ItemTable (key TEXT PRIMARY KEY, value BLOB);") .unwrap(); - - // 65 MB of JSON-ish bytes. let oversized: Vec = vec![b'x'; 65 * 1024 * 1024]; conn.execute( "INSERT INTO ItemTable (key, value) VALUES (?1, ?2)", @@ -989,42 +1133,58 @@ mod tests { } let adapter = CursorAdapter::with_db_root(dir.path().to_path_buf()); - let since = empty_cursor(db_path); - let err = adapter.read_new_records(&since).unwrap_err(); - + let err = adapter.read_new_records(&empty_cursor_old()).unwrap_err(); match err { AdapterError::Parse { context, .. } => { - assert_eq!( - context, "value exceeds size cap", - "expected size-cap context" - ); + assert_eq!(context, "value exceeds size cap"); } - other => panic!("expected AdapterError::Parse, got {:?}", other), + other => panic!("expected AdapterError::Parse, got {other:?}"), } } - // 8. detect_returns_db_path_when_present #[test] - fn detect_returns_db_path_when_present() { - let dir = tempfile::tempdir().unwrap(); - // Create the state.vscdb file. - std::fs::write(dir.path().join("state.vscdb"), b"").unwrap(); - let adapter = CursorAdapter::with_db_root(dir.path().to_path_buf()); - let result = adapter.detect().unwrap(); - assert!(result.is_some(), "detect must return Some when file exists"); - assert_eq!(result.unwrap(), dir.path().join("state.vscdb")); + fn is_busy_detects_database_busy_code() { + use rusqlite::ffi::{Error as SqliteErr, ErrorCode}; + let busy = rusqlite::Error::SqliteFailure( + SqliteErr { + code: ErrorCode::DatabaseBusy, + extended_code: 5, + }, + None, + ); + assert!(is_busy(&busy)); + + let io = rusqlite::Error::SqliteFailure( + SqliteErr { + code: ErrorCode::SystemIoFailure, + extended_code: 10, + }, + None, + ); + assert!(is_busy(&io)); + + let other = rusqlite::Error::SqliteFailure( + SqliteErr { + code: ErrorCode::NotADatabase, + extended_code: 26, + }, + None, + ); + assert!(!is_busy(&other)); } - // 9. detect_returns_none_when_absent #[test] - fn detect_returns_none_when_absent() { - let dir = tempfile::tempdir().unwrap(); - // No state.vscdb in dir. - let adapter = CursorAdapter::with_db_root(dir.path().to_path_buf()); - let result = adapter.detect().unwrap(); - assert!( - result.is_none(), - "detect must return None when file is absent" - ); + fn old_cursor_json_deserializes_to_new_struct() { + // Old stored cursors have db_path/last_rowid/last_msg_id; new struct must + // deserialize them without panicking (serde ignores unknown fields). + let old_json = r#"{ + "db_path": "/home/user/.config/Cursor/User/globalStorage/state.vscdb", + "last_rowid": 42, + "last_msg_id": "gen-synthetic-0003:assistant" + }"#; + let cursor: CursorCursor = serde_json::from_str(old_json).expect("must deserialize"); + assert_eq!(cursor.last_msg_id, "gen-synthetic-0003:assistant"); + assert_eq!(cursor.last_updated_at_ms, 0); + assert!(cursor.seen_prompts.is_empty()); } } diff --git a/src/cli/start.rs b/src/cli/start.rs index 776fece..9064ed9 100644 --- a/src/cli/start.rs +++ b/src/cli/start.rs @@ -36,6 +36,10 @@ pub async fn run() -> Result<()> { let home_dir = dirs::home_dir().context("resolve home directory")?; let pipeline = Arc::new(Pipeline::build(&config, ledger, home_dir)); + // Refresh the preamble of every known handoff so template/rule updates + // propagate even when no new prompts arrive after a daemon restart. + pipeline.refresh_all_preambles(); + // Spawn fs watcher (best-effort — if no tool installed, log + continue). let watcher = match FsWatcher::spawn_for_all_tools(watcher_tx) { Ok(w) => { diff --git a/src/daemon/pipeline.rs b/src/daemon/pipeline.rs index 9706049..39ba8b1 100644 --- a/src/daemon/pipeline.rs +++ b/src/daemon/pipeline.rs @@ -17,6 +17,7 @@ use crate::adapters::AdapterKind; use crate::cli::config::Config; use crate::daemon::{fs_watcher::WatchEvent, hook_endpoint::HookEvent}; use crate::distill::{ + cursor_activity::extract_cursor_activity, failed_approaches::extract_failed_approaches, git_context::extract_git_context, next_action::extract_next_action, @@ -178,12 +179,62 @@ impl Pipeline { } else { cursor_json } + } else if tool == "codex" { + // Codex stores each session in its own .jsonl. The cursor may be + // empty (first run) or stale (pointing at an older session). If + // either, find the newest codex transcript whose session_meta.cwd + // matches the project_dir and reseed. + let cursor_fp = serde_json::from_str::(&cursor_json) + .ok() + .and_then(|v| { + v.get("file_path") + .and_then(|f| f.as_str()) + .map(|s| s.to_string()) + }) + .unwrap_or_default(); + let newest = find_codex_session_transcript(&self.home_dir, project_dir); + let needs_reseed = match (&cursor_fp.is_empty(), &newest) { + (true, _) => true, + (false, Some(newest_path)) + if cursor_fp != newest_path.to_string_lossy().as_ref() => + { + true + } + _ => false, + }; + if needs_reseed { + if let Some(transcript) = newest { + serde_json::json!({ + "file_path": transcript.to_string_lossy(), + "byte_offset": 0, + "last_event_seq": 0, + }) + .to_string() + } else { + cursor_json + } + } else { + cursor_json + } } else { cursor_json }; let (raw_records, new_cursor_json) = adapter.read_new_records_erased(&cursor_json)?; if raw_records.is_empty() { + // No new prompts, but the cursor (e.g. composer.lastUpdatedAt) may + // have advanced — Cursor often updates this after the AI responds + // and creates files. Save the advanced cursor and refresh the + // Session activity section so newly-created files are picked up. + if new_cursor_json != cursor_json && !new_cursor_json.is_empty() { + let _ = self.ledger.save_cursor(tool, session_id, &new_cursor_json); + } + // Resolve project_dir from adapter's new cursor (most reliable). + let refresh_dir = adapter_project_dir(&new_cursor_json, &self.home_dir) + .unwrap_or_else(|| project_dir.to_path_buf()); + if refresh_dir != self.home_dir { + let _ = refresh_session_activity(&refresh_dir); + } return Ok(()); } @@ -192,8 +243,35 @@ impl Pipeline { self.ledger .save_cursor(tool, session_id, &new_cursor_json)?; + // If the adapter reports a project_dir in its new cursor, prefer that + // over the project_dir argument. The Cursor adapter knows the active + // workspace from composer.composerHeaders, which is more reliable than + // the cached "default" cursor (which can be stale or empty on first + // watch event after restart). + let project_dir_owned: PathBuf; + let project_dir: &Path = if let Some(adapter_dir) = + serde_json::from_str::(&new_cursor_json) + .ok() + .and_then(|v| { + v.get("project_dir") + .and_then(|d| d.as_str()) + .map(|s| s.to_string()) + }) { + let candidate = PathBuf::from(&adapter_dir); + if candidate.is_dir() && candidate != self.home_dir { + project_dir_owned = candidate; + &project_dir_owned + } else { + project_dir + } + } else { + project_dir + }; + // Cache project_dir for the fs-watcher path: hook events have the real // cwd but watch events use session_id="default" and must look it up. + // MERGE into the existing "default" cursor — overwriting would clobber + // file_path/byte_offset and force a re-read from byte 0 next time. if session_id != "default" { let canonical_home = self .home_dir @@ -203,11 +281,19 @@ impl Pipeline { .canonicalize() .unwrap_or_else(|_| project_dir.to_path_buf()); if canonical_project != canonical_home { - let meta = serde_json::json!({ - "project_dir": project_dir.to_string_lossy() - }) - .to_string(); - let _ = self.ledger.save_cursor(tool, "default", &meta); + let existing = self + .ledger + .load_cursor(tool, "default") + .ok() + .flatten() + .unwrap_or_default(); + let mut v: serde_json::Value = if existing.is_empty() { + serde_json::json!({}) + } else { + serde_json::from_str(&existing).unwrap_or_else(|_| serde_json::json!({})) + }; + v["project_dir"] = serde_json::json!(project_dir.to_string_lossy()); + let _ = self.ledger.save_cursor(tool, "default", &v.to_string()); } } @@ -236,6 +322,8 @@ impl Pipeline { }; let next_action = extract_next_action(rows); + let task = extract_task(rows); + let timestamp_iso = Utc::now().format("%Y-%m-%dT%H:%M:%SZ").to_string(); // Build accumulated progress log: read existing file, append new entries. let progress_path = project_dir.join(".carryover").join("progress.md"); @@ -248,17 +336,40 @@ impl Pipeline { real_session_id, ); + // Read existing handoff to accumulate Task and Next action history. + let handoff_path = project_dir.join(".carryover").join("handoff.md"); + let existing_handoff = std::fs::read_to_string(&handoff_path).unwrap_or_default(); + let accumulated_task = + accumulate_section(&existing_handoff, "## Task", &task, ×tamp_iso); + let accumulated_next_action = accumulate_section( + &existing_handoff, + "## Next action", + &next_action, + ×tamp_iso, + ); + + // Session activity: prefer fresh scan, but preserve existing when scan + // returns empty (e.g. files outside the recent-window). Never delete + // what's already there. + let fresh_activity = extract_cursor_activity(project_dir); + let session_activity = if fresh_activity.is_empty() { + preserved_session_activity(&existing_handoff) + } else { + fresh_activity + }; + let distilled = Distilled { source_tool: tool.to_string(), session_id: session_id.to_string(), - timestamp_iso: Utc::now().format("%Y-%m-%dT%H:%M:%SZ").to_string(), - task: extract_task(rows), + timestamp_iso, + task: accumulated_task, open_questions: extract_open_questions(rows), - next_action: next_action.clone(), + next_action: accumulated_next_action, recent_files: extract_recent_files(rows), failed_approaches: extract_failed_approaches(rows), git_context: extract_git_context(rows, Some(Path::new(&self.home_dir))), progress_log, + session_activity, }; let ctx = PublishContext { @@ -288,6 +399,321 @@ impl Pipeline { } } +impl Pipeline { + /// Refresh the preamble (top-of-file resume protocol header + strict + /// response rules) of every known project's `.carryover/handoff.md`. + /// + /// Called on daemon start so that template/rule changes propagate to + /// existing handoff files without waiting for a new prompt. + pub fn refresh_all_preambles(&self) { + let projects = self.known_project_dirs(); + let now = chrono::Utc::now().format("%Y-%m-%dT%H:%M:%SZ").to_string(); + for (project_dir, source_tool) in projects { + let handoff_path = project_dir.join(".carryover").join("handoff.md"); + if !handoff_path.exists() { + continue; + } + let _ = rewrite_preamble(&handoff_path, &source_tool, &self.resume_mode, &now); + } + } + + /// Collect every (project_dir, source_tool) pair we have a cursor for. + fn known_project_dirs(&self) -> Vec<(PathBuf, String)> { + let mut out = Vec::new(); + for tool in self.adapters.keys() { + if let Ok(Some(cursor_json)) = self.ledger.load_cursor(tool, "default") { + if let Ok(v) = serde_json::from_str::(&cursor_json) { + if let Some(dir_str) = v.get("project_dir").and_then(|d| d.as_str()) { + let p = PathBuf::from(dir_str); + if p.is_dir() { + out.push((p, tool.clone())); + } + } + } + } + } + out + } +} + +/// Replace everything before (and including) the first `\n---\n` separator +/// with a freshly-rendered preamble. Idempotent — only writes if content changes. +fn rewrite_preamble( + handoff_path: &Path, + source_tool: &str, + resume_mode: &str, + timestamp_iso: &str, +) -> anyhow::Result<()> { + let content = std::fs::read_to_string(handoff_path)?; + let separator = "\n---\n"; + let after = match content.find(separator) { + Some(i) => &content[i + separator.len()..], + None => return Ok(()), // file is too malformed to safely edit + }; + let new_preamble = crate::publish::render_preamble(source_tool, resume_mode, timestamp_iso); + let updated = format!("{new_preamble}\n{after}"); + if updated != content { + std::fs::write(handoff_path, updated)?; + } + Ok(()) +} + +/// Read the existing `## Session activity` block from a handoff file and +/// return its bullet-line entries (one entry per element). Used to preserve +/// the previous activity snapshot when a fresh scan returns empty. +fn preserved_session_activity(existing_handoff: &str) -> Vec { + let body = extract_section_content(existing_handoff, "## Session activity"); + body.lines() + .map(|s| s.to_string()) + .filter(|s| !s.trim().is_empty()) + .collect() +} + +/// Extract project_dir field from a serialized cursor JSON. +/// Returns None if the field is missing, empty, or points at home_dir / a +/// non-existent directory. +fn adapter_project_dir(cursor_json: &str, home_dir: &Path) -> Option { + let v: serde_json::Value = serde_json::from_str(cursor_json).ok()?; + let dir = v.get("project_dir").and_then(|d| d.as_str())?; + let candidate = PathBuf::from(dir); + if candidate.is_dir() && candidate != home_dir { + Some(candidate) + } else { + None + } +} + +/// Refresh just the `## Session activity` section of an existing handoff.md +/// without re-running the full distill. Catches files created by the AI tool +/// AFTER its prompt was captured (e.g., Cursor creates a file 5 seconds after +/// you submit). Idempotent — only writes if the new section differs. +fn refresh_session_activity(project_dir: &Path) -> anyhow::Result<()> { + let handoff_path = project_dir.join(".carryover").join("handoff.md"); + if !handoff_path.exists() { + return Ok(()); + } + + let content = std::fs::read_to_string(&handoff_path)?; + let new_activity = crate::distill::cursor_activity::extract_cursor_activity(project_dir); + + // If the fresh scan is empty (e.g. nothing modified in the last 6h), do + // NOT wipe the existing section — leave the previous snapshot intact. + if new_activity.is_empty() { + return Ok(()); + } + + let mut new_section_body = String::from("## Session activity\n"); + for line in &new_activity { + new_section_body.push_str(line); + new_section_body.push('\n'); + } + + let updated = replace_section(&content, "## Session activity", &new_section_body); + if updated != content { + std::fs::write(&handoff_path, updated)?; + } + Ok(()) +} + +/// Replace the contents of a `## Header` section in `text` with `new_section_body`. +/// If the section doesn't exist and `new_section_body` is non-empty, insert it +/// before the next section after `## Task` (best-effort). +fn replace_section(text: &str, header: &str, new_section_body: &str) -> String { + let header_line = format!("{header}\n"); + if let Some(start) = text.find(&header_line) { + // Find the end of this section: next "## " on its own line, or EOF. + let after = start + header_line.len(); + let rest = &text[after..]; + let end = rest + .match_indices("\n## ") + .next() + .map(|(i, _)| after + i + 1) + .unwrap_or(text.len()); + + let mut out = String::new(); + out.push_str(&text[..start]); + if !new_section_body.is_empty() { + out.push_str(new_section_body); + // Ensure trailing blank line before next section. + if !out.ends_with("\n\n") { + out.push('\n'); + } + } + out.push_str(&text[end..]); + out + } else if !new_section_body.is_empty() { + // No existing section — insert before "## Next action" if present, else append. + let insert_at = text + .find("## Next action") + .or_else(|| text.find("## Progress log")) + .unwrap_or(text.len()); + let mut out = String::new(); + out.push_str(&text[..insert_at]); + out.push_str(new_section_body); + out.push('\n'); + out.push_str(&text[insert_at..]); + out + } else { + text.to_string() + } +} + +/// Append a new value to an accumulating section of the handoff (Task / Next action). +/// +/// Reads the existing section content from `existing_handoff`, prepends the new +/// timestamped entry, and returns the combined block. Skips the append if the +/// new value matches the most recent entry (dedupe by content). Empty new +/// values are dropped — they don't create entries. +/// +/// Output format: +/// ```text +/// - [2026-04-29T16:30:00Z] latest task +/// - [2026-04-29T15:45:00Z] earlier task +/// ``` +fn accumulate_section( + existing_handoff: &str, + header: &str, + new_value: &str, + timestamp_iso: &str, +) -> String { + let new_value = new_value.trim(); + let raw_existing = extract_section_content(existing_handoff, header); + + // Keep ONLY clean bullet entries (drop orphan plain-text from old formats). + let bullet_lines: Vec<&str> = raw_existing + .lines() + .filter(|l| l.starts_with("- [")) + .collect(); + let cleaned_existing = bullet_lines.join("\n"); + + // Empty / sentinel new values: keep existing bullets (don't append a no-op). + if new_value.is_empty() || new_value.starts_with("` and the next `## ` header (or end). +fn extract_section_content(text: &str, header: &str) -> String { + let needle = format!("{header}\n"); + let start = match text.find(&needle) { + Some(i) => i + needle.len(), + None => return String::new(), + }; + let rest = &text[start..]; + // Find the next "## " on its own line. + let end = rest + .match_indices("\n## ") + .next() + .map(|(i, _)| i) + .unwrap_or(rest.len()); + rest[..end].trim().to_string() +} + +/// Find the newest Codex session transcript whose session_meta.cwd matches +/// `project_dir`. Codex writes transcripts to +/// `~/.codex/sessions////rollout-*.jsonl` — one file per +/// session. Each starts with a `session_meta` line that includes `cwd`. +/// +/// Walk recently-modified .jsonl files, peek the first line, return the +/// newest one whose cwd matches. Falls back to newest globally if no match. +fn find_codex_session_transcript(home_dir: &Path, project_dir: &Path) -> Option { + let sessions_root = home_dir.join(".codex").join("sessions"); + if !sessions_root.is_dir() { + return None; + } + + // Collect candidate jsonl files with mtimes. + let mut candidates: Vec<(std::time::SystemTime, PathBuf)> = Vec::new(); + walk_codex_sessions(&sessions_root, 0, &mut candidates); + if candidates.is_empty() { + return None; + } + candidates.sort_by_key(|c| std::cmp::Reverse(c.0)); + // Limit how many we peek to avoid heavy I/O on long-lived installs. + candidates.truncate(20); + + let target = project_dir + .canonicalize() + .unwrap_or_else(|_| project_dir.to_path_buf()); + let target_str = target.to_string_lossy().to_string(); + + // Prefer transcripts where session_meta.cwd matches. + for (_, path) in &candidates { + if let Some(cwd) = peek_codex_session_cwd(path) { + if cwd == target_str { + return Some(path.clone()); + } + } + } + + // No cwd-match — fall back to the newest transcript globally. + candidates.into_iter().next().map(|(_, p)| p) +} + +fn walk_codex_sessions(cur: &Path, depth: usize, out: &mut Vec<(std::time::SystemTime, PathBuf)>) { + if depth > 4 { + return; + } + let read_dir = match std::fs::read_dir(cur) { + Ok(rd) => rd, + Err(_) => return, + }; + for entry in read_dir.flatten() { + let path = entry.path(); + let meta = match entry.metadata() { + Ok(m) => m, + Err(_) => continue, + }; + if meta.is_dir() { + walk_codex_sessions(&path, depth + 1, out); + } else if meta.is_file() && path.extension().and_then(|e| e.to_str()) == Some("jsonl") { + if let Ok(mtime) = meta.modified() { + out.push((mtime, path)); + } + } + } +} + +/// Read the first line of a Codex session jsonl, parse session_meta, return cwd. +fn peek_codex_session_cwd(path: &Path) -> Option { + use std::io::{BufRead, BufReader}; + let f = std::fs::File::open(path).ok()?; + let mut reader = BufReader::new(f); + let mut line = String::new(); + reader.read_line(&mut line).ok()?; + let v: serde_json::Value = serde_json::from_str(line.trim()).ok()?; + if v.get("type").and_then(|t| t.as_str()) != Some("session_meta") { + return None; + } + v.get("payload") + .and_then(|p| p.get("cwd")) + .and_then(|c| c.as_str()) + .map(String::from) +} + /// Find the newest Claude transcript for a specific project directory. /// Claude encodes project paths as the full path with '/' replaced by '-'. /// Returns the newest .jsonl in that project subdir, or None if not found. diff --git a/src/distill/cursor_activity.rs b/src/distill/cursor_activity.rs new file mode 100644 index 0000000..1a81ae7 --- /dev/null +++ b/src/distill/cursor_activity.rs @@ -0,0 +1,275 @@ +//! `cursor_activity` extractor — captures concrete artifacts of an AI session +//! (file edits, line deltas) so the handoff has a record of *what was done* +//! without depending on the AI's response text being stored locally. +//! +//! Two strategies: +//! 1. **Git project**: run `git diff --stat HEAD` to get per-file line deltas. +//! 2. **Non-git project**: list files modified within a recent window via +//! filesystem mtimes. +//! +//! Activates only for the source tool that needs it — Cursor — since Cursor +//! does not persist AI responses on disk. Other adapters get response text +//! directly from their transcripts. + +use std::path::Path; +use std::process::Command; +use std::time::{Duration, SystemTime}; + +pub const MAX_ACTIVITY_FILES: usize = 15; +const RECENT_WINDOW_SECS: u64 = 6 * 3600; // 6 hours +const MAX_TRAVERSAL_DEPTH: usize = 6; +const MAX_FILES_SCANNED: usize = 5000; + +const IGNORE_DIRS: &[&str] = &[ + ".git", + "node_modules", + "target", + "dist", + "build", + ".next", + ".nuxt", + "vendor", + ".venv", + "venv", + "__pycache__", + ".cache", + ".carryover", + ".omc", + ".claude", + ".vscode", + ".idea", + ".DS_Store", +]; + +/// Build the activity section content for the handoff. +/// +/// `project_dir` is the AI session's working directory. Returns one bullet per +/// changed file (e.g. `- src/main.rs: +12 -3` for git, `- src/main.rs (15s ago)` +/// for non-git). Empty Vec = no recent activity to report. +pub fn extract_cursor_activity(project_dir: &Path) -> Vec { + if let Some(git_lines) = git_diff_summary(project_dir) { + return git_lines; + } + list_recent_modified(project_dir) +} + +/// Run `git diff --stat HEAD` and parse the per-file lines. +/// Returns None if not a git repo, no commits yet, or git errored. +fn git_diff_summary(project_dir: &Path) -> Option> { + if !project_dir.join(".git").exists() { + return None; + } + + let output = Command::new("git") + .args(["diff", "--stat", "HEAD", "--no-color"]) + .current_dir(project_dir) + .output() + .ok()?; + + if !output.status.success() { + return None; + } + + let stdout = String::from_utf8_lossy(&output.stdout); + // git diff --stat output: + // src/main.rs | 12 +++++++++--- + // src/lib.rs | 3 +++ + // 2 files changed, 15 insertions(+), 3 deletions(-) + let mut lines: Vec = Vec::new(); + for raw in stdout.lines() { + let trimmed = raw.trim(); + if trimmed.is_empty() { + continue; + } + // The summary line ("N files changed, ...") is also useful — keep last. + if trimmed.contains(" file") && trimmed.contains(" changed") { + lines.push(format!("Summary: {trimmed}")); + continue; + } + // Per-file lines have format: "path | N ±±±" + if let Some((path_raw, stats)) = trimmed.split_once('|') { + let path = path_raw.trim(); + let stats = stats.trim(); + // Extract +/- counts: "12 +++++++++---" + let plus = stats.chars().filter(|c| *c == '+').count(); + let minus = stats.chars().filter(|c| *c == '-').count(); + let count = stats.split_whitespace().next().unwrap_or("?"); + lines.push(format!("- {path}: {count} lines (+{plus} -{minus})")); + } + if lines.len() > MAX_ACTIVITY_FILES { + break; + } + } + + if lines.is_empty() { + None + } else { + Some(lines) + } +} + +/// Walk project_dir and collect files modified within RECENT_WINDOW_SECS. +fn list_recent_modified(project_dir: &Path) -> Vec { + let now = SystemTime::now(); + let cutoff = now + .checked_sub(Duration::from_secs(RECENT_WINDOW_SECS)) + .unwrap_or(now); + + let mut entries: Vec<(SystemTime, String)> = Vec::new(); + let mut scanned = 0usize; + walk( + project_dir, + project_dir, + 0, + &cutoff, + &mut entries, + &mut scanned, + ); + + entries.sort_by_key(|e| std::cmp::Reverse(e.0)); + entries.truncate(MAX_ACTIVITY_FILES); + + entries + .into_iter() + .map(|(mtime, path)| { + let age = now + .duration_since(mtime) + .map(|d| format_age(d.as_secs())) + .unwrap_or_else(|_| "just now".to_string()); + format!("- {path} ({age})") + }) + .collect() +} + +fn walk( + root: &Path, + cur: &Path, + depth: usize, + cutoff: &SystemTime, + out: &mut Vec<(SystemTime, String)>, + scanned: &mut usize, +) { + if depth > MAX_TRAVERSAL_DEPTH || *scanned >= MAX_FILES_SCANNED { + return; + } + + let read_dir = match std::fs::read_dir(cur) { + Ok(rd) => rd, + Err(_) => return, + }; + + for entry in read_dir.flatten() { + if *scanned >= MAX_FILES_SCANNED { + return; + } + *scanned += 1; + + let path = entry.path(); + let name = match path.file_name().and_then(|n| n.to_str()) { + Some(n) => n, + None => continue, + }; + + if IGNORE_DIRS.contains(&name) { + continue; + } + + let meta = match entry.metadata() { + Ok(m) => m, + Err(_) => continue, + }; + + // Don't follow symlinks — could escape the project. + if meta.file_type().is_symlink() { + continue; + } + + if meta.is_dir() { + walk(root, &path, depth + 1, cutoff, out, scanned); + continue; + } + + if !meta.is_file() { + continue; + } + + let mtime = match meta.modified() { + Ok(t) => t, + Err(_) => continue, + }; + if mtime < *cutoff { + continue; + } + + let rel = match path.strip_prefix(root) { + Ok(r) => r.to_string_lossy().into_owned(), + Err(_) => path.to_string_lossy().into_owned(), + }; + out.push((mtime, rel)); + } +} + +fn format_age(secs: u64) -> String { + if secs < 60 { + format!("{secs}s ago") + } else if secs < 3600 { + format!("{}m ago", secs / 60) + } else { + format!("{}h ago", secs / 3600) + } +} + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +#[cfg(test)] +mod tests { + use super::*; + use std::fs; + use tempfile::tempdir; + + #[test] + fn empty_project_returns_empty() { + let dir = tempdir().unwrap(); + let result = extract_cursor_activity(dir.path()); + assert!(result.is_empty()); + } + + #[test] + fn non_git_lists_recently_modified() { + let dir = tempdir().unwrap(); + fs::write(dir.path().join("hello.txt"), "hi").unwrap(); + fs::write(dir.path().join("world.html"), "").unwrap(); + + let result = extract_cursor_activity(dir.path()); + assert_eq!(result.len(), 2); + assert!(result.iter().any(|l| l.contains("hello.txt"))); + assert!(result.iter().any(|l| l.contains("world.html"))); + } + + #[test] + fn ignores_node_modules_and_git() { + let dir = tempdir().unwrap(); + fs::create_dir_all(dir.path().join("node_modules/pkg")).unwrap(); + fs::write(dir.path().join("node_modules/pkg/index.js"), "noise").unwrap(); + fs::create_dir_all(dir.path().join(".git")).unwrap(); + fs::write(dir.path().join(".git/HEAD"), "ref: ...").unwrap(); + fs::write(dir.path().join("real.txt"), "signal").unwrap(); + + let result = extract_cursor_activity(dir.path()); + assert!(result.iter().any(|l| l.contains("real.txt"))); + assert!(!result.iter().any(|l| l.contains("node_modules"))); + assert!(!result.iter().any(|l| l.contains(".git"))); + } + + #[test] + fn caps_total_files() { + let dir = tempdir().unwrap(); + for i in 0..(MAX_ACTIVITY_FILES + 5) { + fs::write(dir.path().join(format!("f{i}.txt")), "x").unwrap(); + } + let result = extract_cursor_activity(dir.path()); + assert!(result.len() <= MAX_ACTIVITY_FILES); + } +} diff --git a/src/distill/mod.rs b/src/distill/mod.rs index 5613a7f..9db98da 100644 --- a/src/distill/mod.rs +++ b/src/distill/mod.rs @@ -11,6 +11,7 @@ //! - failed_approaches: surfaces tool errors and retry patterns //! - git_context: captures HEAD sha and diff stat for the project directory +pub mod cursor_activity; pub mod failed_approaches; pub mod git_context; pub mod next_action; diff --git a/src/distill/next_action.rs b/src/distill/next_action.rs index 6620389..57750fa 100644 --- a/src/distill/next_action.rs +++ b/src/distill/next_action.rs @@ -55,6 +55,28 @@ pub fn extract_next_action(rows: &[LedgerRow]) -> String { return truncate_at_word(text, MAX_NEXT_ACTION_CHARS); } + + // No assistant text found (e.g., Cursor sessions don't store responses + // locally). Fall back to the latest user prompt so the handoff still has + // a meaningful "Next action" — the user's most recent intent. + for row in rows.iter().rev() { + if row.role != "user" { + continue; + } + let trimmed = row.content.trim(); + if trimmed.is_empty() || trimmed.starts_with('[') { + continue; // skip tool-result wrappers + } + let first_line = trimmed.lines().find(|l| !l.trim().is_empty()).unwrap_or(""); + if first_line.is_empty() { + continue; + } + return format!( + "Continue: {}", + truncate_at_word(first_line.trim(), MAX_NEXT_ACTION_CHARS - 10) + ); + } + NO_NEXT_ACTION_SENTINEL.to_string() } @@ -189,8 +211,21 @@ mod tests { } #[test] - fn falls_back_to_sentinel_when_no_assistant() { + fn falls_back_to_user_prompt_when_no_assistant() { + // For Cursor-style sessions where AI responses aren't stored locally, + // we fall back to the latest user prompt prefixed with "Continue: ". let rows = vec![make_row("user", "what should I do?")]; + let result = extract_next_action(&rows); + assert!( + result.starts_with("Continue: "), + "expected user-prompt fallback, got: {result}" + ); + assert!(result.contains("what should I do?")); + } + + #[test] + fn returns_sentinel_when_no_user_or_assistant() { + let rows = vec![make_row("system", "boot")]; assert_eq!(extract_next_action(&rows), NO_NEXT_ACTION_SENTINEL); } diff --git a/src/distill/progress_log.rs b/src/distill/progress_log.rs index d0e7931..e4c802d 100644 --- a/src/distill/progress_log.rs +++ b/src/distill/progress_log.rs @@ -10,14 +10,6 @@ pub const MAX_ENTRY_CHARS: usize = 150; const SESSION_WATERMARK_PREFIX: &str = ""; -/// Return the session id stored in the watermark comment on line 1, or None. -fn extract_session_watermark(existing: &str) -> Option<&str> { - let first_line = existing.lines().next()?; - first_line - .strip_prefix(SESSION_WATERMARK_PREFIX)? - .strip_suffix(SESSION_WATERMARK_SUFFIX) -} - /// Strip the watermark line (line 1) if present, returning the rest. fn strip_watermark(existing: &str) -> &str { let first_line = existing.lines().next().unwrap_or(""); @@ -31,29 +23,6 @@ fn strip_watermark(existing: &str) -> &str { } } -/// Build a one-liner summary of the previous session for use as a divider. -fn prev_session_summary(existing: &str) -> String { - let stripped = strip_watermark(existing); - let date = stripped - .lines() - .find(|l| l.starts_with("- [")) - .and_then(|l| l.strip_prefix("- [").and_then(|s| s.split('T').next())) - .unwrap_or("unknown"); - let task = stripped - .lines() - .find(|l| l.contains("] [user] ")) - .and_then(|l| l.split("] [user] ").nth(1)) - .unwrap_or(""); - if task.is_empty() { - format!("_Previous session ({date})_") - } else { - format!( - "_Previous session ({date}): {}_", - truncate_at_word(task, 80) - ) - } -} - /// Build one formatted progress entry per user/assistant turn in `rows`. /// /// Format: `- [ISO_TS] [role] first meaningful line of content` @@ -134,56 +103,44 @@ fn ms_to_iso(ts_ms: i64) -> String { /// Merge `new_entries` into `existing_log`, deduplicating by timestamp. /// +/// Always preserves existing entries — never resets on session change. +/// The session watermark is updated each call so future sessions can detect +/// the boundary if needed, but it does not cause a content wipe. +/// /// Returns the complete `.carryover/progress.md` contents: header, -/// entries (old + new, sorted), and a `## What to do next` footer. +/// entries (accumulated across all sessions), and a `## What to do next` footer. pub fn build_progress_log( existing: &str, new_entries: &[String], next_action: &str, session_id: &str, ) -> String { - // Detect whether we're in a new session. - let stored_session = extract_session_watermark(existing); - let is_new_session = match stored_session { - Some(id) => id != session_id && !session_id.is_empty(), - None => false, // no watermark = legacy file, keep accumulating - }; - - // Build the base entries block. - let base: String = if is_new_session { - format!( - "# Carryover Progress Log\n{}\n", - prev_session_summary(existing) - ) + // Always keep existing entries regardless of session changes. + let stripped = strip_watermark(existing); + let entries_block = if let Some(idx) = stripped.find("\n## What to do next") { + stripped[..idx].trim_end() } else { - let stripped = strip_watermark(existing); - let entries_block = if let Some(idx) = stripped.find("\n## What to do next") { - stripped[..idx].trim_end() - } else { - stripped.trim_end() - }; - if entries_block.is_empty() { - "# Carryover Progress Log\n".to_string() - } else { - format!("{entries_block}\n") - } + stripped.trim_end() }; - - // Timestamp watermark for deduplication (only meaningful when same session). - let last_ts: Option = if is_new_session { - None + let base = if entries_block.is_empty() { + "# Carryover Progress Log\n".to_string() } else { - base.lines() - .rev() - .filter(|l| l.starts_with("- [")) - .find_map(|l| { - l.strip_prefix("- [") - .and_then(|s| s.split(']').next()) - .map(|s| s.to_string()) - }) + format!("{entries_block}\n") }; - // Only append entries newer than the watermark. + // Deduplicate: only append entries newer than the last recorded timestamp. + let last_ts: Option = base + .lines() + .rev() + .filter(|l| l.starts_with("- [")) + .find_map(|l| { + l.strip_prefix("- [") + .and_then(|s| s.split(']').next()) + .map(|s| s.to_string()) + }); + + // Only append entries newer than the watermark, deduplicated by full line. + let mut seen_lines = std::collections::HashSet::new(); let to_append: Vec<&str> = new_entries .iter() .filter(|e| { @@ -191,10 +148,11 @@ pub fn build_progress_log( .strip_prefix("- [") .and_then(|s| s.split(']').next()) .unwrap_or(""); - match &last_ts { + let ts_ok = match &last_ts { Some(last) => entry_ts > last.as_str(), None => true, - } + }; + ts_ok && seen_lines.insert(e.as_str()) }) .map(|s| s.as_str()) .collect(); diff --git a/src/publish/handoff.rs b/src/publish/handoff.rs index ab9057f..3768d7b 100644 --- a/src/publish/handoff.rs +++ b/src/publish/handoff.rs @@ -17,10 +17,19 @@ pub struct Distilled { pub failed_approaches: Vec, // empty for non-coding sessions pub git_context: String, // sentinel for non-git pub progress_log: String, // accumulated progress log from progress.md + /// Concrete artifacts of the latest activity — git diff stats or recent + /// file mtimes. Populated for tools that don't store AI responses on disk + /// (e.g. Cursor) so the next session sees what was actually done. + #[serde(default)] + pub session_activity: Vec, } -/// Render the 50-line handoff payload. Hard cap enforced. -pub fn render_handoff(d: &Distilled, resume_mode: &str) -> String { +/// Build the handoff preamble (everything before `---` separator). +/// +/// Pure of session content — depends only on tool name, mode, and timestamp. +/// Exposed so the pipeline can refresh just the preamble of an existing +/// handoff without re-running the full distill. +pub fn render_preamble(source_tool: &str, resume_mode: &str, timestamp_iso: &str) -> String { let mode = if resume_mode.is_empty() { "ask" } else { @@ -28,32 +37,61 @@ pub fn render_handoff(d: &Distilled, resume_mode: &str) -> String { }; let mut lines: Vec = Vec::new(); - // Title line: "# [CARRYOVER] Last updated from " lines.push(format!( - "# [CARRYOVER] Last updated {} from {}", - d.timestamp_iso, d.source_tool + "# [CARRYOVER] Last updated {timestamp_iso} from {source_tool}" )); - // Resume protocol header — verbose template per the locked decision. + // Resume protocol header — instructions for the AI agent reading this file. + // The reading-guide block is INTERNAL agent guidance; the agent must not + // recite or explain the file structure to the user. lines.push(String::new()); lines.push(format!("# CARRYOVER RESUME (mode: {mode})")); lines.push(String::new()); lines.push(format!( - "This file contains a 50-line summary of your prior session in {}. Before acting:", - d.source_tool + "INTERNAL HANDOFF — for your reading only, never to be quoted, paraphrased structurally, or referenced in your reply. Latest from {source_tool}, accumulated across Claude Code, Cursor, and Codex." )); - lines.push("1. Summarize the carryover content back to the user in 1-2 sentences.".to_string()); - lines.push("2. Ask the user what they want to do next.".to_string()); - lines.push("3. Do not assume continuation — wait for confirmation.".to_string()); + lines.push(String::new()); + lines.push("STRICT RESPONSE RULES:".to_string()); + lines.push("- Reply as if you naturally remember the prior work. Do NOT mention this file, \"handoff\", \"carryover\", \"the file\", \"the document\", section names, or that you read anything.".to_string()); + lines.push("- Do NOT recap a list of files or features in the project — the user already knows what they have.".to_string()); + lines.push("- Do NOT report that any field is empty or missing (e.g. \"no next action captured\"). Just don't mention it.".to_string()); + lines.push("- Two short paragraphs MAX. First: one sentence on what was being worked on. Second: one direct question on what they want now.".to_string()); + lines.push("- Use the user's own informal tone. No bullet points, no bold/italics, no markdown headings in your reply.".to_string()); + lines.push(String::new()); + lines.push("Section keys (for your understanding only):".to_string()); + lines.push("- Task: user's prompts (newest first) — infer current intent.".to_string()); + lines.push("- Session activity: concrete file changes — what was actually built.".to_string()); + lines.push("- Next action: previous AI's planned step (may be stale).".to_string()); + lines.push("- Progress log: chronological turn-by-turn record across all tools.".to_string()); + lines.push("Bulleted lists accumulate across sessions — newest at top.".to_string()); lines.push(String::new()); lines.push("---".to_string()); + + // Trailing newline so `format!("{preamble}\n{body}")` gives a blank line. lines.push(String::new()); + lines.join("\n") +} + +/// Render the 50-line handoff payload. Hard cap enforced. +pub fn render_handoff(d: &Distilled, resume_mode: &str) -> String { + let preamble = render_preamble(&d.source_tool, resume_mode, &d.timestamp_iso); + let mut lines: Vec = preamble.lines().map(String::from).collect(); + // Task lines.push("## Task".to_string()); lines.push(d.task.clone()); lines.push(String::new()); + // Session activity (concrete artifacts — git stats or recent file mtimes) + if !d.session_activity.is_empty() { + lines.push("## Session activity".to_string()); + for a in &d.session_activity { + lines.push(a.clone()); + } + lines.push(String::new()); + } + // Recent files (only if populated) if !d.recent_files.is_empty() { lines.push("## Recent files".to_string()); @@ -135,6 +173,7 @@ mod tests { failed_approaches: vec![], git_context: "".to_string(), progress_log: String::new(), + session_activity: vec![], } } diff --git a/src/publish/mod.rs b/src/publish/mod.rs index 8f717a9..1a2d858 100644 --- a/src/publish/mod.rs +++ b/src/publish/mod.rs @@ -19,7 +19,7 @@ mod handoff; mod pointer; mod write_atomic; -pub use handoff::{render_handoff, Distilled, MAX_HANDOFF_LINES}; +pub use handoff::{render_handoff, render_preamble, Distilled, MAX_HANDOFF_LINES}; pub use pointer::{ ensure_pointer_block, ensure_pointer_block_relative, pointer_block, remove_pointer_block, POINTER_END, POINTER_START, @@ -190,6 +190,7 @@ mod tests { failed_approaches: vec![], git_context: "branch publisher / clean".to_string(), progress_log: String::new(), + session_activity: vec![], } } diff --git a/src/toolspec/specs.rs b/src/toolspec/specs.rs index c3f9116..2c96e1c 100644 --- a/src/toolspec/specs.rs +++ b/src/toolspec/specs.rs @@ -178,6 +178,14 @@ static CURSOR_TRANSCRIPT_PATH: PathSpec = PathSpec { windows: &[], }; +/// Workspace-storage directory — where Cursor writes per-session prompt DBs. +/// Watching this directory triggers re-ingestion whenever a prompt is saved. +static CURSOR_WORKSPACE_STORAGE_PATH: PathSpec = PathSpec { + linux: &["~/.config/Cursor/User/workspaceStorage"], + macos: &["~/Library/Application Support/Cursor/User/workspaceStorage"], + windows: &[], +}; + static CURSOR_HOOKSET_V040: HookSet = HookSet { session_start: "beforeSubmitPrompt", session_end: "stop", @@ -200,7 +208,7 @@ pub static CURSOR: ToolSpec = ToolSpec { detect_binary: &["cursor"], detect_version: detect_version_cursor, config_paths: &[CURSOR_CONFIG_PATH], - transcript_paths: &[CURSOR_TRANSCRIPT_PATH], + transcript_paths: &[CURSOR_TRANSCRIPT_PATH, CURSOR_WORKSPACE_STORAGE_PATH], hooks_by_version: &CURSOR_HOOKS, }; @@ -218,6 +226,12 @@ static CODEX_CONFIG_PATH: PathSpec = PathSpec { windows: &[], }; +static CODEX_HISTORY_PATH: PathSpec = PathSpec { + linux: &["~/.codex/history.jsonl"], + macos: &["~/.codex/history.jsonl"], + windows: &[], +}; + static CODEX_TRANSCRIPT_PATH: PathSpec = PathSpec { linux: &["~/.codex/sessions/"], macos: &["~/.codex/sessions/"], @@ -246,7 +260,7 @@ pub static CODEX: ToolSpec = ToolSpec { detect_binary: &["codex"], detect_version: detect_version_codex, config_paths: &[CODEX_CONFIG_PATH], - transcript_paths: &[CODEX_TRANSCRIPT_PATH], + transcript_paths: &[CODEX_TRANSCRIPT_PATH, CODEX_HISTORY_PATH], hooks_by_version: &CODEX_HOOKS, }; diff --git a/tests/cross_tool.rs b/tests/cross_tool.rs index 0515fa7..1beff66 100644 --- a/tests/cross_tool.rs +++ b/tests/cross_tool.rs @@ -54,12 +54,8 @@ fn parse_fixture_through_adapter(source_tool: &str) -> (Vec, &'static (rows, "claude") } "cursor" => { - let adapter = CursorAdapter::with_db_root(fixture_root("cursor")); - let cursor = adapters::cursor::CursorCursor { - db_path: fixture_root("cursor").join("1-state.vscdb"), - last_rowid: 0, - last_msg_id: String::new(), - }; + let adapter = CursorAdapter::with_db_root(fixture_root("cursor").join("oldSchema")); + let cursor = adapters::cursor::CursorCursor::default(); let (records, _) = adapter .read_new_records(&cursor) .expect("cursor adapter: read_new_records"); @@ -72,6 +68,8 @@ fn parse_fixture_through_adapter(source_tool: &str) -> (Vec, &'static file_path: fixture_root("codex").join("1-simple-session.jsonl"), byte_offset: 0, last_event_seq: 0, + history_offset: 0, + project_dir: None, }; let (records, _) = adapter .read_new_records(&cursor) @@ -100,6 +98,7 @@ fn distill(rows: &[LedgerRow], source_tool: &str, cwd: Option<&Path>) -> Distill failed_approaches: extract_failed_approaches(rows), git_context: extract_git_context(rows, cwd), progress_log: String::new(), + session_activity: vec![], } } diff --git a/tests/fixtures/cursor/build_state_vscdb.py b/tests/fixtures/cursor/build_state_vscdb.py index 9ab9467..05d3046 100644 --- a/tests/fixtures/cursor/build_state_vscdb.py +++ b/tests/fixtures/cursor/build_state_vscdb.py @@ -1,43 +1,61 @@ #!/usr/bin/env python3 """ -Build a deterministic synthetic state.vscdb for Cursor adapter tests. +Build deterministic synthetic state.vscdb fixtures for Cursor adapter tests. All data is fabricated. No real user sessions, paths, secrets, or identifiers. -Re-running this script on the same Python + SQLite version produces an identical -file because: - - Row insertion order is fixed. - - All timestamps are hardcoded constants. - - VACUUM is run at the end to normalize free-list pages. +Re-running this script on the same Python + SQLite version produces identical +files because row insertion order is fixed, timestamps are hardcoded constants, +and VACUUM is run at the end to normalize free-list pages. -Usage: - python3 build_state_vscdb.py [output_path] +Generates two fixture sets: + 1. Old schema (pre-migration): tests/fixtures/cursor/oldSchema/state.vscdb + Contains aiService.generations, aiService.prompts, composer.composerData + all in a single global DB. + + 2. New schema (post-migration): tests/fixtures/cursor/globalStorage/state.vscdb + + tests/fixtures/cursor/workspaceStorage//state.vscdb + Global DB has composer.composerHeaders; per-workspace DBs have prompts. - output_path defaults to state.vscdb in the same directory as this script. +Usage: + python3 build_state_vscdb.py """ import json import os import sqlite3 -import sys -def build(output_path: str) -> None: - if os.path.exists(output_path): - os.remove(output_path) +SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) - conn = sqlite3.connect(output_path) - cur = conn.cursor() - # Cursor's state.vscdb uses a single key-value table called ItemTable. - cur.execute( +def create_item_table(conn: sqlite3.Connection) -> None: + conn.execute( "CREATE TABLE ItemTable (" " key TEXT PRIMARY KEY NOT NULL," " value BLOB" ")" ) - # --- aiService.generations --- - # Synthetic generation records matching Cursor's internal shape. + +def vacuum_close(conn: sqlite3.Connection, path: str) -> None: + conn.commit() + conn.execute("VACUUM") + conn.close() + print(f"Written: {path} ({os.path.getsize(path)} bytes)") + + +# --------------------------------------------------------------------------- +# Old schema (pre-migration) +# --------------------------------------------------------------------------- + +def build_old_schema(output_path: str) -> None: + os.makedirs(os.path.dirname(output_path), exist_ok=True) + if os.path.exists(output_path): + os.remove(output_path) + + conn = sqlite3.connect(output_path) + create_item_table(conn) + generations = [ { "generationId": "gen-synthetic-0001", @@ -84,12 +102,11 @@ def build(output_path: str) -> None: ), }, ] - cur.execute( + conn.execute( "INSERT INTO ItemTable (key, value) VALUES (?, ?)", ("aiService.generations", json.dumps(generations)), ) - # --- aiService.prompts --- prompts = [ { "promptId": "prompt-synthetic-0001", @@ -113,12 +130,11 @@ def build(output_path: str) -> None: "files": ["/synthetic/path/9/app.js"], }, ] - cur.execute( + conn.execute( "INSERT INTO ItemTable (key, value) VALUES (?, ?)", ("aiService.prompts", json.dumps(prompts)), ) - # --- composer.composerData --- composer_data = { "composers": [ { @@ -151,20 +167,138 @@ def build(output_path: str) -> None: }, ] } - cur.execute( + conn.execute( "INSERT INTO ItemTable (key, value) VALUES (?, ?)", ("composer.composerData", json.dumps(composer_data)), ) - conn.commit() + vacuum_close(conn, output_path) - # VACUUM normalizes page layout for reproducibility. - conn.execute("VACUUM") - conn.close() - print(f"Written: {output_path} ({os.path.getsize(output_path)} bytes)") + +# --------------------------------------------------------------------------- +# New schema (post-migration) +# --------------------------------------------------------------------------- + +WS1_ID = "wsaaa111bbb222cc" +WS2_ID = "wsccc333ddd444ee" + +WS1_FSPATH = "/synthetic/project-alpha" +WS2_FSPATH = "/synthetic/project-beta" + +COMPOSER1_ID = "composer-new-0001" +COMPOSER2_ID = "composer-new-0002" + + +def build_new_schema_global(output_path: str) -> None: + os.makedirs(os.path.dirname(output_path), exist_ok=True) + if os.path.exists(output_path): + os.remove(output_path) + + conn = sqlite3.connect(output_path) + create_item_table(conn) + + headers = { + "allComposers": [ + { + "composerId": COMPOSER1_ID, + "lastUpdatedAt": 1700300010000, + "workspaceIdentifier": { + "id": WS1_ID, + "uri": { + "fsPath": WS1_FSPATH, + "scheme": "file", + }, + }, + }, + { + "composerId": COMPOSER2_ID, + "lastUpdatedAt": 1700300000000, + "workspaceIdentifier": { + "id": WS2_ID, + "uri": { + "fsPath": WS2_FSPATH, + "scheme": "file", + }, + }, + }, + ] + } + conn.execute( + "INSERT INTO ItemTable (key, value) VALUES (?, ?)", + ("composer.composerHeaders", json.dumps(headers)), + ) + + vacuum_close(conn, output_path) + + +def build_new_schema_workspace1(output_path: str) -> None: + os.makedirs(os.path.dirname(output_path), exist_ok=True) + if os.path.exists(output_path): + os.remove(output_path) + + conn = sqlite3.connect(output_path) + create_item_table(conn) + + prompts = [ + {"text": "How do I implement binary search?", "commandType": 4}, + {"text": "Can you add unit tests for that?", "commandType": 4}, + ] + conn.execute( + "INSERT INTO ItemTable (key, value) VALUES (?, ?)", + ("aiService.prompts", json.dumps(prompts)), + ) + + generations = [ + {"unixMs": 1700300001000, "generationUUID": "gen-new-0001", "type": 1, "textDescription": "binary search impl"}, + {"unixMs": 1700300010000, "generationUUID": "gen-new-0002", "type": 1, "textDescription": "unit tests"}, + ] + conn.execute( + "INSERT INTO ItemTable (key, value) VALUES (?, ?)", + ("aiService.generations", json.dumps(generations)), + ) + + vacuum_close(conn, output_path) + + +def build_new_schema_workspace2(output_path: str) -> None: + os.makedirs(os.path.dirname(output_path), exist_ok=True) + if os.path.exists(output_path): + os.remove(output_path) + + conn = sqlite3.connect(output_path) + create_item_table(conn) + + prompts = [ + {"text": "Explain Docker networking", "commandType": 4}, + ] + conn.execute( + "INSERT INTO ItemTable (key, value) VALUES (?, ?)", + ("aiService.prompts", json.dumps(prompts)), + ) + + generations = [ + {"unixMs": 1700300005000, "generationUUID": "gen-new-0003", "type": 1, "textDescription": "docker networking"}, + ] + conn.execute( + "INSERT INTO ItemTable (key, value) VALUES (?, ?)", + ("aiService.generations", json.dumps(generations)), + ) + + vacuum_close(conn, output_path) if __name__ == "__main__": - script_dir = os.path.dirname(os.path.abspath(__file__)) - out = sys.argv[1] if len(sys.argv) > 1 else os.path.join(script_dir, "1-state.vscdb") - build(out) + # Old schema (two paths: legacy root for fixtures.rs test + oldSchema/ for adapter tests) + build_old_schema(os.path.join(SCRIPT_DIR, "1-state.vscdb")) + build_old_schema(os.path.join(SCRIPT_DIR, "oldSchema", "state.vscdb")) + + # New schema: global + two workspaces + build_new_schema_global(os.path.join(SCRIPT_DIR, "globalStorage", "state.vscdb")) + build_new_schema_workspace1( + os.path.join(SCRIPT_DIR, "workspaceStorage", WS1_ID, "state.vscdb") + ) + build_new_schema_workspace2( + os.path.join(SCRIPT_DIR, "workspaceStorage", WS2_ID, "state.vscdb") + ) + + print("All fixtures built.") diff --git a/tests/fixtures/cursor/globalStorage/state.vscdb b/tests/fixtures/cursor/globalStorage/state.vscdb new file mode 100644 index 0000000..5e62671 Binary files /dev/null and b/tests/fixtures/cursor/globalStorage/state.vscdb differ diff --git a/tests/fixtures/cursor/oldSchema/state.vscdb b/tests/fixtures/cursor/oldSchema/state.vscdb new file mode 100644 index 0000000..fcaeb02 Binary files /dev/null and b/tests/fixtures/cursor/oldSchema/state.vscdb differ diff --git a/tests/fixtures/cursor/workspaceStorage/wsaaa111bbb222cc/state.vscdb b/tests/fixtures/cursor/workspaceStorage/wsaaa111bbb222cc/state.vscdb new file mode 100644 index 0000000..825f148 Binary files /dev/null and b/tests/fixtures/cursor/workspaceStorage/wsaaa111bbb222cc/state.vscdb differ diff --git a/tests/fixtures/cursor/workspaceStorage/wsccc333ddd444ee/state.vscdb b/tests/fixtures/cursor/workspaceStorage/wsccc333ddd444ee/state.vscdb new file mode 100644 index 0000000..908b3a5 Binary files /dev/null and b/tests/fixtures/cursor/workspaceStorage/wsccc333ddd444ee/state.vscdb differ