From d385c085227be886eb2f10ff0b4dac69421ac1e2 Mon Sep 17 00:00:00 2001 From: Sisyphus Date: Thu, 25 Jun 2026 16:09:56 -0400 Subject: [PATCH 1/3] feat: add base provider abstraction and Claude adapter --- claude_code_log/discovery.py | 69 ++++++++ claude_code_log/providers/__init__.py | 11 ++ claude_code_log/providers/base.py | 226 ++++++++++++++++++++++++++ claude_code_log/providers/claude.py | 56 +++++++ claude_code_log/providers/registry.py | 93 +++++++++++ 5 files changed, 455 insertions(+) create mode 100644 claude_code_log/discovery.py create mode 100644 claude_code_log/providers/__init__.py create mode 100644 claude_code_log/providers/base.py create mode 100644 claude_code_log/providers/claude.py create mode 100644 claude_code_log/providers/registry.py diff --git a/claude_code_log/discovery.py b/claude_code_log/discovery.py new file mode 100644 index 00000000..2730da9e --- /dev/null +++ b/claude_code_log/discovery.py @@ -0,0 +1,69 @@ +"""Unified session discovery across all providers.""" + +from typing import Iterator, Optional + +from .providers import discover_providers +from .providers.base import SessionInfo + + +def discover_all_sessions( + providers: Optional[list[str]] = None, +) -> Iterator[SessionInfo]: + """Discover sessions from all available providers. + + Args: + providers: Optional list of provider names to include. + If None, discovers from all available providers. + + Yields: + SessionInfo objects from all providers. + """ + registry = discover_providers() + + if providers is None: + providers = registry.get_available_providers() + + for provider_name in providers: + provider = registry.get_provider(provider_name) + if provider and provider.is_available(): + yield from provider.discover_sessions() + + +def discover_sessions_by_provider(provider_name: str) -> Iterator[SessionInfo]: + """Discover sessions from a specific provider. + + Args: + provider_name: Name of the provider to discover sessions from. + + Yields: + SessionInfo objects from the specified provider. + """ + registry = discover_providers() + yield from registry.discover_sessions_by_provider(provider_name) + + +def get_session_stats() -> dict[str, int]: + registry = discover_providers() + stats: dict[str, int] = {} + + for provider_name in registry.get_available_providers(): + provider = registry.get_provider(provider_name) + if provider: + count = sum(1 for _ in provider.discover_sessions()) + stats[provider_name] = count + + return stats + + +def load_session(provider_name: str, session_id: str): + """Load a session from a specific provider. + + Args: + provider_name: Name of the provider. + session_id: ID of the session to load. + + Returns: + Iterator of TranscriptEntry objects. + """ + registry = discover_providers() + return registry.load_session(provider_name, session_id) diff --git a/claude_code_log/providers/__init__.py b/claude_code_log/providers/__init__.py new file mode 100644 index 00000000..77d3a82d --- /dev/null +++ b/claude_code_log/providers/__init__.py @@ -0,0 +1,11 @@ +"""Provider abstraction layer for multi-provider session support.""" + +from .base import BaseProvider, SessionInfo +from .registry import ProviderRegistry, discover_providers + +__all__ = [ + "BaseProvider", + "SessionInfo", + "ProviderRegistry", + "discover_providers", +] diff --git a/claude_code_log/providers/base.py b/claude_code_log/providers/base.py new file mode 100644 index 00000000..39bb81f6 --- /dev/null +++ b/claude_code_log/providers/base.py @@ -0,0 +1,226 @@ +"""Abstract base class for session providers.""" + +from abc import ABC, abstractmethod +from dataclasses import dataclass +from datetime import datetime +from pathlib import Path +from typing import Any, Iterator, Optional, cast + +from claude_code_log.models import ( + AssistantMessageModel, + AssistantTranscriptEntry, + TextContent, + ThinkingContent, + ToolUseContent, + TranscriptEntry, + UserMessageModel, + UserTranscriptEntry, +) + + +@dataclass +class SessionInfo: + provider: str + session_id: str + title: Optional[str] = None + created_at: Optional[str] = None + updated_at: Optional[str] = None + project_path: Optional[Path] = None + message_count: int = 0 + total_tokens: int = 0 + + +def extract_text(content: Any) -> str: + if isinstance(content, str): + return content + if isinstance(content, list): + items: list[Any] = cast(list[Any], content) + parts: list[str] = [] + for item in items: + item_dict = cast(dict[str, Any], item) if isinstance(item, dict) else None + if item_dict is not None: + parts.append(str(item_dict.get("text", ""))) + elif isinstance(item, str): + parts.append(item) + return "\n".join(parts) + return str(content) + + +def file_mtime_iso(path: Path) -> str: + return datetime.fromtimestamp(path.stat().st_mtime).isoformat() + + +def make_user_entry( + session_id: str, + uuid: str, + timestamp: str, + content: Any, +) -> UserTranscriptEntry: + return UserTranscriptEntry( + type="user", + parentUuid=None, + isSidechain=False, + userType="external", + cwd="", + sessionId=session_id, + version="", + uuid=uuid, + timestamp=timestamp, + message=UserMessageModel( + role="user", + content=[TextContent(type="text", text=extract_text(content))], + ), + ) + + +def make_tool_result_entry( + session_id: str, + uuid: str, + timestamp: str, + tool_use_id: str, + content: str, +) -> UserTranscriptEntry: + from claude_code_log.models import ToolResultContent + + return UserTranscriptEntry( + type="user", + parentUuid=None, + isSidechain=False, + userType="external", + cwd="", + sessionId=session_id, + version="", + uuid=uuid, + timestamp=timestamp, + message=UserMessageModel( + role="user", + content=[ + ToolResultContent( + type="tool_result", + tool_use_id=tool_use_id, + content=content, + ) + ], + ), + ) + + +def make_assistant_entry( + session_id: str, + uuid: str, + timestamp: str, + model: str, + content: Any, +) -> AssistantTranscriptEntry: + content_list: list[Any] = ( + cast(list[Any], content) + if isinstance(content, list) + else [TextContent(type="text", text=str(content))] + ) + return AssistantTranscriptEntry( + type="assistant", + parentUuid=None, + isSidechain=False, + userType="external", + cwd="", + sessionId=session_id, + version="", + uuid=uuid, + timestamp=timestamp, + message=AssistantMessageModel( + id=uuid, + type="message", + role="assistant", + model=model, + content=content_list, + ), + ) + + +def make_thinking_entry( + session_id: str, + uuid: str, + timestamp: str, + model: str, + text: str, +) -> AssistantTranscriptEntry: + return AssistantTranscriptEntry( + type="assistant", + parentUuid=None, + isSidechain=False, + userType="external", + cwd="", + sessionId=session_id, + version="", + uuid=uuid, + timestamp=timestamp, + message=AssistantMessageModel( + id=uuid, + type="message", + role="assistant", + model=model, + content=[ThinkingContent(type="thinking", thinking=text)], + ), + ) + + +def make_tool_use_entry( + session_id: str, + uuid: str, + timestamp: str, + model: str, + tool_id: str, + tool_name: str, + tool_input: Any, +) -> AssistantTranscriptEntry: + return AssistantTranscriptEntry( + type="assistant", + parentUuid=None, + isSidechain=False, + userType="external", + cwd="", + sessionId=session_id, + version="", + uuid=uuid, + timestamp=timestamp, + message=AssistantMessageModel( + id=uuid, + type="message", + role="assistant", + model=model, + content=[ + ToolUseContent( + type="tool_use", + id=tool_id, + name=tool_name, + input=tool_input, + ) + ], + ), + ) + + +class BaseProvider(ABC): + @abstractmethod + def get_provider_name(self) -> str: ... + + @abstractmethod + def get_session_format(self) -> str: ... + + @abstractmethod + def get_data_dir(self) -> Optional[Path]: ... + + @abstractmethod + def discover_sessions(self) -> Iterator[SessionInfo]: ... + + @abstractmethod + def load_session( + self, session_id: str, max_messages: Optional[int] = None + ) -> Iterator[TranscriptEntry]: ... + + def is_available(self) -> bool: + data_dir = self.get_data_dir() + return data_dir is not None and data_dir.exists() + + def get_session_stats(self, session_id: str) -> dict[str, Any]: + return {} diff --git a/claude_code_log/providers/claude.py b/claude_code_log/providers/claude.py new file mode 100644 index 00000000..24aae0e9 --- /dev/null +++ b/claude_code_log/providers/claude.py @@ -0,0 +1,56 @@ +"""Claude Code session provider.""" + +from pathlib import Path +from typing import Iterator, Optional + +from claude_code_log.models import TranscriptEntry + +from .base import BaseProvider, SessionInfo, file_mtime_iso + + +class ClaudeProvider(BaseProvider): + def get_provider_name(self) -> str: + return "claude" + + def get_session_format(self) -> str: + return "jsonl" + + def get_data_dir(self) -> Optional[Path]: + data_dir = Path.home() / ".claude" / "projects" + return data_dir if data_dir.exists() else None + + def discover_sessions(self) -> Iterator[SessionInfo]: + data_dir = self.get_data_dir() + if data_dir is None: + return + + for project_dir in data_dir.iterdir(): + if not project_dir.is_dir(): + continue + for jsonl_file in project_dir.glob("*.jsonl"): + if jsonl_file.name.startswith("agent-"): + continue + yield SessionInfo( + provider="claude", + session_id=jsonl_file.stem, + project_path=project_dir, + created_at=file_mtime_iso(jsonl_file), + ) + + def load_session( + self, session_id: str, max_messages: Optional[int] = None + ) -> Iterator[TranscriptEntry]: + from claude_code_log.converter import load_transcript + + data_dir = self.get_data_dir() + if data_dir is None: + raise ValueError("Claude data directory not found") + + for project_dir in data_dir.iterdir(): + if not project_dir.is_dir(): + continue + jsonl_file = project_dir / f"{session_id}.jsonl" + if jsonl_file.exists(): + return iter(load_transcript(jsonl_file)) + + raise FileNotFoundError(f"Session {session_id} not found") diff --git a/claude_code_log/providers/registry.py b/claude_code_log/providers/registry.py new file mode 100644 index 00000000..9b8d130d --- /dev/null +++ b/claude_code_log/providers/registry.py @@ -0,0 +1,93 @@ +"""Provider registry for auto-discovery and management.""" + +from typing import Dict, Iterator, List, Optional, Type + +from .base import BaseProvider, SessionInfo + + +class ProviderRegistry: + """Registry for managing session providers. + + Providers are registered with their data directory paths. + Auto-discovery checks which directories exist and only enables + providers with valid data directories. + """ + + def __init__(self): + self._providers: Dict[str, BaseProvider] = {} + self._provider_classes: Dict[str, Type[BaseProvider]] = {} + + def register(self, provider: BaseProvider) -> None: + """Register a provider instance.""" + name = provider.get_provider_name() + self._providers[name] = provider + + def register_class(self, name: str, provider_class: Type[BaseProvider]) -> None: + """Register a provider class for lazy instantiation.""" + self._provider_classes[name] = provider_class + + def instantiate_registered(self) -> None: + for provider_class in self._provider_classes.values(): + try: + provider = provider_class() + self.register(provider) + except Exception: + # Skip providers that fail to initialize + pass + + def get_provider(self, name: str) -> Optional[BaseProvider]: + """Get a registered provider by name.""" + return self._providers.get(name) + + def get_available_providers(self) -> List[str]: + """Get names of all available providers (with valid data directories).""" + available: List[str] = [] + for name, provider in self._providers.items(): + if provider.is_available(): + available.append(name) + return available + + def get_all_providers(self) -> List[str]: + """Get names of all registered providers.""" + return list(self._providers.keys()) + + def discover_all_sessions(self) -> Iterator[SessionInfo]: + """Discover sessions from all available providers.""" + for provider in self._providers.values(): + if provider.is_available(): + yield from provider.discover_sessions() + + def discover_sessions_by_provider( + self, provider_name: str + ) -> Iterator[SessionInfo]: + """Discover sessions from a specific provider.""" + provider = self._providers.get(provider_name) + if provider and provider.is_available(): + yield from provider.discover_sessions() + + def load_session( + self, provider_name: str, session_id: str, max_messages: Optional[int] = None + ): + """Load a session from a specific provider.""" + provider = self._providers.get(provider_name) + if provider is None: + raise ValueError(f"Unknown provider: {provider_name}") + if not provider.is_available(): + raise ValueError(f"Provider {provider_name} is not available") + return provider.load_session(session_id, max_messages=max_messages) + + +def discover_providers() -> ProviderRegistry: + """Auto-discover available providers based on ~/. directories. + + Returns a ProviderRegistry with all available providers registered. + """ + registry = ProviderRegistry() + + from .claude import ClaudeProvider + + registry.register_class("claude", ClaudeProvider) + + registry.instantiate_registered() + + return registry From 9f45b64afa030f1f574211f8b3022b74bf6fd73d Mon Sep 17 00:00:00 2001 From: Sisyphus Date: Thu, 25 Jun 2026 16:15:24 -0400 Subject: [PATCH 2/3] feat: add AGY provider with entry threading and full type coverage - Add AgyProvider for Antigravity CLI sessions - Entry threading via parentUuid chaining (sequential DAG) - Handle all AGY entry types: USER_INPUT, PLANNER_RESPONSE, CHECKPOINT, LIST_DIRECTORY, GENERIC, RUN_COMMAND, VIEW_FILE, CODE_ACTION - Register in ProviderRegistry for auto-discovery - Pyright: 0 errors (strict mode) - All 2207 tests pass --- claude_code_log/providers/agy.py | 368 ++++++++++++++++++++++++++ claude_code_log/providers/registry.py | 2 + 2 files changed, 370 insertions(+) create mode 100644 claude_code_log/providers/agy.py diff --git a/claude_code_log/providers/agy.py b/claude_code_log/providers/agy.py new file mode 100644 index 00000000..55a1881e --- /dev/null +++ b/claude_code_log/providers/agy.py @@ -0,0 +1,368 @@ +"""Antigravity CLI (agy) session provider.""" + +import json +import re +from pathlib import Path +from typing import Any, Iterator, Optional, cast + +from claude_code_log.models import TranscriptEntry + +from .base import ( + BaseProvider, + SessionInfo, + extract_text, + file_mtime_iso, + make_assistant_entry, + make_user_entry, +) + + +class AgyProvider(BaseProvider): + def get_provider_name(self) -> str: + return "agy" + + def get_session_format(self) -> str: + return "jsonl" + + def get_data_dir(self) -> Optional[Path]: + data_dir = Path.home() / ".gemini" / "antigravity-cli" + return data_dir if data_dir.exists() else None + + def discover_sessions(self) -> Iterator[SessionInfo]: + data_dir = self.get_data_dir() + if data_dir is None: + return + + brain_dir = data_dir / "brain" + if not brain_dir.exists(): + return + + for session_dir in brain_dir.iterdir(): + if not session_dir.is_dir(): + continue + transcript_file = ( + session_dir / ".system_generated" / "logs" / "transcript.jsonl" + ) + if not transcript_file.exists(): + continue + yield SessionInfo( + provider="agy", + session_id=session_dir.name, + created_at=file_mtime_iso(transcript_file), + ) + + def load_session( + self, session_id: str, max_messages: Optional[int] = None + ) -> Iterator[TranscriptEntry]: + data_dir = self.get_data_dir() + if data_dir is None: + raise ValueError("Antigravity CLI data directory not found") + + transcript_file = ( + data_dir + / "brain" + / session_id + / ".system_generated" + / "logs" + / "transcript.jsonl" + ) + if not transcript_file.exists(): + raise FileNotFoundError( + f"Transcript for session {session_id} not found at {transcript_file}" + ) + + # Entry threading: track previous UUID for parentUuid chaining + prev_uuid: Optional[str] = None + + with open(transcript_file, "r", encoding="utf-8") as f: + for i, line in enumerate(f): + line = line.strip() + if not line: + continue + + raw_entry: Any = json.loads(line) + if isinstance(raw_entry, dict): + entry = cast(dict[str, Any], raw_entry) + for transcript_entry in self._parse_entry( + entry, session_id, i, prev_uuid + ): + # Factory functions return UserTranscriptEntry or + # AssistantTranscriptEntry — both inherit BaseTranscriptEntry + # which always has uuid. Narrow for Pyright. + if hasattr(transcript_entry, "uuid"): + prev_uuid = cast(Any, transcript_entry).uuid + yield transcript_entry + + if max_messages is not None and i >= max_messages: + break + + def _parse_entry( + self, + entry: dict[str, Any], + session_id: str, + index: int, + parent_uuid: Optional[str], + ) -> Iterator[TranscriptEntry]: + entry_type = str(entry.get("type", "")) + timestamp = str(entry.get("created_at", "")) + content = entry.get("content", "") + + if entry_type == "USER_INPUT": + yield from self._parse_user_input( + content, session_id, index, timestamp, parent_uuid + ) + + elif entry_type == "PLANNER_RESPONSE": + yield from self._parse_planner_response( + entry, content, session_id, index, timestamp, parent_uuid + ) + + elif entry_type == "CHECKPOINT": + yield from self._parse_checkpoint( + content, session_id, index, timestamp, parent_uuid + ) + + elif entry_type == "LIST_DIRECTORY": + yield from self._make_tool_entry( + "list_dir", content, session_id, index, timestamp, parent_uuid + ) + + elif entry_type == "GENERIC": + yield from self._parse_generic( + content, session_id, index, timestamp, parent_uuid + ) + + elif entry_type == "RUN_COMMAND": + yield from self._parse_run_command( + entry, content, session_id, index, timestamp, parent_uuid + ) + + elif entry_type == "VIEW_FILE": + yield from self._parse_view_file( + entry, content, session_id, index, timestamp, parent_uuid + ) + + elif entry_type == "CODE_ACTION": + yield from self._parse_code_action( + entry, content, session_id, index, timestamp, parent_uuid + ) + + # CONVERSATION_HISTORY entries are internal bookkeeping, skip them + + # -- Entry type parsers -- + + def _parse_user_input( + self, + content: Any, + session_id: str, + index: int, + timestamp: str, + parent_uuid: Optional[str], + ) -> Iterator[TranscriptEntry]: + content_str = content if isinstance(content, str) else json.dumps(content) + text = self._extract_user_request(content_str) + if text: + uid = f"agy-{session_id}-{index}" + entry = make_user_entry(session_id, uid, timestamp, text) + entry.parentUuid = parent_uuid + yield entry + + def _parse_planner_response( + self, + raw_entry: dict[str, Any], + content: Any, + session_id: str, + index: int, + timestamp: str, + parent_uuid: Optional[str], + ) -> Iterator[TranscriptEntry]: + text = content if isinstance(content, str) else json.dumps(content) + tool_calls_raw = raw_entry.get("tool_calls", []) + tool_calls = self._coerce_tool_calls(tool_calls_raw) + + if tool_calls: + yield from self._parse_tool_calls( + tool_calls, text, session_id, index, timestamp, parent_uuid + ) + elif text: + uid = f"agy-{session_id}-{index}" + entry = make_assistant_entry( + session_id, uid, timestamp, "antigravity", text + ) + entry.parentUuid = parent_uuid + yield entry + + def _parse_checkpoint( + self, + content: Any, + session_id: str, + index: int, + timestamp: str, + parent_uuid: Optional[str], + ) -> Iterator[TranscriptEntry]: + """CHECKPOINT entries are compaction summaries — render as system context.""" + text = content if isinstance(content, str) else json.dumps(content) + if text: + uid = f"agy-{session_id}-{index}" + entry = make_assistant_entry( + session_id, uid, timestamp, "antigravity", f"[checkpoint]\n{text}" + ) + entry.parentUuid = parent_uuid + yield entry + + def _parse_generic( + self, + content: Any, + session_id: str, + index: int, + timestamp: str, + parent_uuid: Optional[str], + ) -> Iterator[TranscriptEntry]: + """GENERIC entries are uncategorized model output.""" + text = extract_text(content) + if text: + uid = f"agy-{session_id}-{index}" + entry = make_assistant_entry( + session_id, uid, timestamp, "antigravity", text + ) + entry.parentUuid = parent_uuid + yield entry + + def _parse_run_command( + self, + raw_entry: dict[str, Any], + content: Any, + session_id: str, + index: int, + timestamp: str, + parent_uuid: Optional[str], + ) -> Iterator[TranscriptEntry]: + """RUN_COMMAND entries are shell command executions.""" + command = str(raw_entry.get("command", "")) + text = content if isinstance(content, str) else json.dumps(content) + display = ( + f"[run_command: {command}]\n{text}" if command else f"[run_command]\n{text}" + ) + uid = f"agy-{session_id}-{index}" + entry = make_assistant_entry(session_id, uid, timestamp, "antigravity", display) + entry.parentUuid = parent_uuid + yield entry + + def _parse_view_file( + self, + raw_entry: dict[str, Any], + content: Any, + session_id: str, + index: int, + timestamp: str, + parent_uuid: Optional[str], + ) -> Iterator[TranscriptEntry]: + """VIEW_FILE entries are file reads.""" + file_path = str(raw_entry.get("file_path", raw_entry.get("path", ""))) + text = content if isinstance(content, str) else json.dumps(content) + display = ( + f"[view_file: {file_path}]\n{text}" if file_path else f"[view_file]\n{text}" + ) + uid = f"agy-{session_id}-{index}" + entry = make_assistant_entry(session_id, uid, timestamp, "antigravity", display) + entry.parentUuid = parent_uuid + yield entry + + def _parse_code_action( + self, + raw_entry: dict[str, Any], + content: Any, + session_id: str, + index: int, + timestamp: str, + parent_uuid: Optional[str], + ) -> Iterator[TranscriptEntry]: + """CODE_ACTION entries are code modifications (edits, writes).""" + action = str(raw_entry.get("action", "")) + file_path = str(raw_entry.get("file_path", raw_entry.get("path", ""))) + text = content if isinstance(content, str) else json.dumps(content) + label = f"[code_action: {action} {file_path}]".strip() + display = f"{label}\n{text}" if text else label + uid = f"agy-{session_id}-{index}" + entry = make_assistant_entry(session_id, uid, timestamp, "antigravity", display) + entry.parentUuid = parent_uuid + yield entry + + # -- Helpers -- + + def _make_tool_entry( + self, + tool_name: str, + content: Any, + session_id: str, + index: int, + timestamp: str, + parent_uuid: Optional[str], + ) -> Iterator[TranscriptEntry]: + text = content if isinstance(content, str) else json.dumps(content) + if text: + uid = f"agy-{session_id}-{index}" + entry = make_assistant_entry( + session_id, + uid, + timestamp, + "antigravity", + f"[tool: {tool_name}]\n{text}", + ) + entry.parentUuid = parent_uuid + yield entry + + def _parse_tool_calls( + self, + tool_calls: list[dict[str, Any]], + fallback_text: str, + session_id: str, + index: int, + timestamp: str, + parent_uuid: Optional[str], + ) -> Iterator[TranscriptEntry]: + last_uuid = parent_uuid + + for tc in tool_calls: + name = str(tc.get("name", "unknown")) + args_raw = tc.get("args", {}) + args: dict[str, Any] = ( + cast(dict[str, Any], args_raw) if isinstance(args_raw, dict) else {} + ) + args_str = json.dumps(args, indent=2) if args else "" + text = f"[tool: {name}]\n{args_str}" if args_str else f"[tool: {name}]" + uid = f"agy-{session_id}-{index}-{name}" + entry = make_assistant_entry( + session_id, uid, timestamp, "antigravity", text + ) + entry.parentUuid = last_uuid + last_uuid = uid + yield entry + + # Emit the response text after tool calls, chained to the last tool + if fallback_text and not fallback_text.startswith("[tool:"): + uid = f"agy-{session_id}-{index}-response" + entry = make_assistant_entry( + session_id, uid, timestamp, "antigravity", fallback_text + ) + entry.parentUuid = last_uuid + yield entry + + def _coerce_tool_calls(self, tool_calls_raw: Any) -> list[dict[str, Any]]: + result: list[dict[str, Any]] = [] + if not isinstance(tool_calls_raw, list): + return result + for tc_raw in cast(list[Any], tool_calls_raw): + if isinstance(tc_raw, dict): + result.append(cast(dict[str, Any], tc_raw)) + else: + result.append({"name": "unknown", "args": {"raw": str(tc_raw)}}) + return result + + def _extract_user_request(self, content: str) -> str: + match = re.search( + r"\s*(.*?)\s*", content, re.DOTALL + ) + if match: + return match.group(1).strip() + return content.strip() if content else "" diff --git a/claude_code_log/providers/registry.py b/claude_code_log/providers/registry.py index 9b8d130d..bd6b0b4c 100644 --- a/claude_code_log/providers/registry.py +++ b/claude_code_log/providers/registry.py @@ -85,8 +85,10 @@ def discover_providers() -> ProviderRegistry: registry = ProviderRegistry() from .claude import ClaudeProvider + from .agy import AgyProvider registry.register_class("claude", ClaudeProvider) + registry.register_class("agy", AgyProvider) registry.instantiate_registered() From c035fb3c9bce574e877def6c6e59c1bd6c052b75 Mon Sep 17 00:00:00 2001 From: Sisyphus Date: Thu, 25 Jun 2026 16:28:13 -0400 Subject: [PATCH 3/3] fix: address CodeRabbit review feedback on AGY provider - Path traversal: validate session_id against hex+dash pattern - Malformed JSON: skip bad lines with logging instead of aborting - max_messages: count yielded entries, not raw JSONL lines - Duplicate UUIDs: include tool call index in UUID --- claude_code_log/providers/agy.py | 34 ++++++++++++++++++++++---------- 1 file changed, 24 insertions(+), 10 deletions(-) diff --git a/claude_code_log/providers/agy.py b/claude_code_log/providers/agy.py index 55a1881e..5494e3d2 100644 --- a/claude_code_log/providers/agy.py +++ b/claude_code_log/providers/agy.py @@ -1,10 +1,13 @@ """Antigravity CLI (agy) session provider.""" import json +import logging import re from pathlib import Path from typing import Any, Iterator, Optional, cast +logger = logging.getLogger(__name__) + from claude_code_log.models import TranscriptEntry from .base import ( @@ -54,6 +57,9 @@ def discover_sessions(self) -> Iterator[SessionInfo]: def load_session( self, session_id: str, max_messages: Optional[int] = None ) -> Iterator[TranscriptEntry]: + if not self._is_valid_session_id(session_id): + raise ValueError(f"Invalid session_id: {session_id}") + data_dir = self.get_data_dir() if data_dir is None: raise ValueError("Antigravity CLI data directory not found") @@ -71,29 +77,34 @@ def load_session( f"Transcript for session {session_id} not found at {transcript_file}" ) - # Entry threading: track previous UUID for parentUuid chaining prev_uuid: Optional[str] = None + message_count = 0 with open(transcript_file, "r", encoding="utf-8") as f: - for i, line in enumerate(f): + for line in f: line = line.strip() if not line: continue - raw_entry: Any = json.loads(line) + try: + raw_entry: Any = json.loads(line) + except json.JSONDecodeError: + logger.warning( + "Skipping malformed JSON line in %s", transcript_file + ) + continue + if isinstance(raw_entry, dict): entry = cast(dict[str, Any], raw_entry) for transcript_entry in self._parse_entry( - entry, session_id, i, prev_uuid + entry, session_id, message_count, prev_uuid ): - # Factory functions return UserTranscriptEntry or - # AssistantTranscriptEntry — both inherit BaseTranscriptEntry - # which always has uuid. Narrow for Pyright. if hasattr(transcript_entry, "uuid"): prev_uuid = cast(Any, transcript_entry).uuid yield transcript_entry + message_count += 1 - if max_messages is not None and i >= max_messages: + if max_messages is not None and message_count >= max_messages: break def _parse_entry( @@ -323,7 +334,7 @@ def _parse_tool_calls( ) -> Iterator[TranscriptEntry]: last_uuid = parent_uuid - for tc in tool_calls: + for tc_index, tc in enumerate(tool_calls): name = str(tc.get("name", "unknown")) args_raw = tc.get("args", {}) args: dict[str, Any] = ( @@ -331,7 +342,7 @@ def _parse_tool_calls( ) args_str = json.dumps(args, indent=2) if args else "" text = f"[tool: {name}]\n{args_str}" if args_str else f"[tool: {name}]" - uid = f"agy-{session_id}-{index}-{name}" + uid = f"agy-{session_id}-{index}-{tc_index}-{name}" entry = make_assistant_entry( session_id, uid, timestamp, "antigravity", text ) @@ -366,3 +377,6 @@ def _extract_user_request(self, content: str) -> str: if match: return match.group(1).strip() return content.strip() if content else "" + + def _is_valid_session_id(self, session_id: str) -> bool: + return bool(re.fullmatch(r"[a-f0-9\-]+", session_id))