diff --git a/claude_code_log/discovery.py b/claude_code_log/discovery.py new file mode 100644 index 00000000..2730da9e --- /dev/null +++ b/claude_code_log/discovery.py @@ -0,0 +1,69 @@ +"""Unified session discovery across all providers.""" + +from typing import Iterator, Optional + +from .providers import discover_providers +from .providers.base import SessionInfo + + +def discover_all_sessions( + providers: Optional[list[str]] = None, +) -> Iterator[SessionInfo]: + """Discover sessions from all available providers. + + Args: + providers: Optional list of provider names to include. + If None, discovers from all available providers. + + Yields: + SessionInfo objects from all providers. + """ + registry = discover_providers() + + if providers is None: + providers = registry.get_available_providers() + + for provider_name in providers: + provider = registry.get_provider(provider_name) + if provider and provider.is_available(): + yield from provider.discover_sessions() + + +def discover_sessions_by_provider(provider_name: str) -> Iterator[SessionInfo]: + """Discover sessions from a specific provider. + + Args: + provider_name: Name of the provider to discover sessions from. + + Yields: + SessionInfo objects from the specified provider. + """ + registry = discover_providers() + yield from registry.discover_sessions_by_provider(provider_name) + + +def get_session_stats() -> dict[str, int]: + registry = discover_providers() + stats: dict[str, int] = {} + + for provider_name in registry.get_available_providers(): + provider = registry.get_provider(provider_name) + if provider: + count = sum(1 for _ in provider.discover_sessions()) + stats[provider_name] = count + + return stats + + +def load_session(provider_name: str, session_id: str): + """Load a session from a specific provider. + + Args: + provider_name: Name of the provider. + session_id: ID of the session to load. + + Returns: + Iterator of TranscriptEntry objects. + """ + registry = discover_providers() + return registry.load_session(provider_name, session_id) diff --git a/claude_code_log/providers/__init__.py b/claude_code_log/providers/__init__.py new file mode 100644 index 00000000..77d3a82d --- /dev/null +++ b/claude_code_log/providers/__init__.py @@ -0,0 +1,11 @@ +"""Provider abstraction layer for multi-provider session support.""" + +from .base import BaseProvider, SessionInfo +from .registry import ProviderRegistry, discover_providers + +__all__ = [ + "BaseProvider", + "SessionInfo", + "ProviderRegistry", + "discover_providers", +] diff --git a/claude_code_log/providers/base.py b/claude_code_log/providers/base.py new file mode 100644 index 00000000..39bb81f6 --- /dev/null +++ b/claude_code_log/providers/base.py @@ -0,0 +1,226 @@ +"""Abstract base class for session providers.""" + +from abc import ABC, abstractmethod +from dataclasses import dataclass +from datetime import datetime +from pathlib import Path +from typing import Any, Iterator, Optional, cast + +from claude_code_log.models import ( + AssistantMessageModel, + AssistantTranscriptEntry, + TextContent, + ThinkingContent, + ToolUseContent, + TranscriptEntry, + UserMessageModel, + UserTranscriptEntry, +) + + +@dataclass +class SessionInfo: + provider: str + session_id: str + title: Optional[str] = None + created_at: Optional[str] = None + updated_at: Optional[str] = None + project_path: Optional[Path] = None + message_count: int = 0 + total_tokens: int = 0 + + +def extract_text(content: Any) -> str: + if isinstance(content, str): + return content + if isinstance(content, list): + items: list[Any] = cast(list[Any], content) + parts: list[str] = [] + for item in items: + item_dict = cast(dict[str, Any], item) if isinstance(item, dict) else None + if item_dict is not None: + parts.append(str(item_dict.get("text", ""))) + elif isinstance(item, str): + parts.append(item) + return "\n".join(parts) + return str(content) + + +def file_mtime_iso(path: Path) -> str: + return datetime.fromtimestamp(path.stat().st_mtime).isoformat() + + +def make_user_entry( + session_id: str, + uuid: str, + timestamp: str, + content: Any, +) -> UserTranscriptEntry: + return UserTranscriptEntry( + type="user", + parentUuid=None, + isSidechain=False, + userType="external", + cwd="", + sessionId=session_id, + version="", + uuid=uuid, + timestamp=timestamp, + message=UserMessageModel( + role="user", + content=[TextContent(type="text", text=extract_text(content))], + ), + ) + + +def make_tool_result_entry( + session_id: str, + uuid: str, + timestamp: str, + tool_use_id: str, + content: str, +) -> UserTranscriptEntry: + from claude_code_log.models import ToolResultContent + + return UserTranscriptEntry( + type="user", + parentUuid=None, + isSidechain=False, + userType="external", + cwd="", + sessionId=session_id, + version="", + uuid=uuid, + timestamp=timestamp, + message=UserMessageModel( + role="user", + content=[ + ToolResultContent( + type="tool_result", + tool_use_id=tool_use_id, + content=content, + ) + ], + ), + ) + + +def make_assistant_entry( + session_id: str, + uuid: str, + timestamp: str, + model: str, + content: Any, +) -> AssistantTranscriptEntry: + content_list: list[Any] = ( + cast(list[Any], content) + if isinstance(content, list) + else [TextContent(type="text", text=str(content))] + ) + return AssistantTranscriptEntry( + type="assistant", + parentUuid=None, + isSidechain=False, + userType="external", + cwd="", + sessionId=session_id, + version="", + uuid=uuid, + timestamp=timestamp, + message=AssistantMessageModel( + id=uuid, + type="message", + role="assistant", + model=model, + content=content_list, + ), + ) + + +def make_thinking_entry( + session_id: str, + uuid: str, + timestamp: str, + model: str, + text: str, +) -> AssistantTranscriptEntry: + return AssistantTranscriptEntry( + type="assistant", + parentUuid=None, + isSidechain=False, + userType="external", + cwd="", + sessionId=session_id, + version="", + uuid=uuid, + timestamp=timestamp, + message=AssistantMessageModel( + id=uuid, + type="message", + role="assistant", + model=model, + content=[ThinkingContent(type="thinking", thinking=text)], + ), + ) + + +def make_tool_use_entry( + session_id: str, + uuid: str, + timestamp: str, + model: str, + tool_id: str, + tool_name: str, + tool_input: Any, +) -> AssistantTranscriptEntry: + return AssistantTranscriptEntry( + type="assistant", + parentUuid=None, + isSidechain=False, + userType="external", + cwd="", + sessionId=session_id, + version="", + uuid=uuid, + timestamp=timestamp, + message=AssistantMessageModel( + id=uuid, + type="message", + role="assistant", + model=model, + content=[ + ToolUseContent( + type="tool_use", + id=tool_id, + name=tool_name, + input=tool_input, + ) + ], + ), + ) + + +class BaseProvider(ABC): + @abstractmethod + def get_provider_name(self) -> str: ... + + @abstractmethod + def get_session_format(self) -> str: ... + + @abstractmethod + def get_data_dir(self) -> Optional[Path]: ... + + @abstractmethod + def discover_sessions(self) -> Iterator[SessionInfo]: ... + + @abstractmethod + def load_session( + self, session_id: str, max_messages: Optional[int] = None + ) -> Iterator[TranscriptEntry]: ... + + def is_available(self) -> bool: + data_dir = self.get_data_dir() + return data_dir is not None and data_dir.exists() + + def get_session_stats(self, session_id: str) -> dict[str, Any]: + return {} diff --git a/claude_code_log/providers/claude.py b/claude_code_log/providers/claude.py new file mode 100644 index 00000000..24aae0e9 --- /dev/null +++ b/claude_code_log/providers/claude.py @@ -0,0 +1,56 @@ +"""Claude Code session provider.""" + +from pathlib import Path +from typing import Iterator, Optional + +from claude_code_log.models import TranscriptEntry + +from .base import BaseProvider, SessionInfo, file_mtime_iso + + +class ClaudeProvider(BaseProvider): + def get_provider_name(self) -> str: + return "claude" + + def get_session_format(self) -> str: + return "jsonl" + + def get_data_dir(self) -> Optional[Path]: + data_dir = Path.home() / ".claude" / "projects" + return data_dir if data_dir.exists() else None + + def discover_sessions(self) -> Iterator[SessionInfo]: + data_dir = self.get_data_dir() + if data_dir is None: + return + + for project_dir in data_dir.iterdir(): + if not project_dir.is_dir(): + continue + for jsonl_file in project_dir.glob("*.jsonl"): + if jsonl_file.name.startswith("agent-"): + continue + yield SessionInfo( + provider="claude", + session_id=jsonl_file.stem, + project_path=project_dir, + created_at=file_mtime_iso(jsonl_file), + ) + + def load_session( + self, session_id: str, max_messages: Optional[int] = None + ) -> Iterator[TranscriptEntry]: + from claude_code_log.converter import load_transcript + + data_dir = self.get_data_dir() + if data_dir is None: + raise ValueError("Claude data directory not found") + + for project_dir in data_dir.iterdir(): + if not project_dir.is_dir(): + continue + jsonl_file = project_dir / f"{session_id}.jsonl" + if jsonl_file.exists(): + return iter(load_transcript(jsonl_file)) + + raise FileNotFoundError(f"Session {session_id} not found") diff --git a/claude_code_log/providers/registry.py b/claude_code_log/providers/registry.py new file mode 100644 index 00000000..9b8d130d --- /dev/null +++ b/claude_code_log/providers/registry.py @@ -0,0 +1,93 @@ +"""Provider registry for auto-discovery and management.""" + +from typing import Dict, Iterator, List, Optional, Type + +from .base import BaseProvider, SessionInfo + + +class ProviderRegistry: + """Registry for managing session providers. + + Providers are registered with their data directory paths. + Auto-discovery checks which directories exist and only enables + providers with valid data directories. + """ + + def __init__(self): + self._providers: Dict[str, BaseProvider] = {} + self._provider_classes: Dict[str, Type[BaseProvider]] = {} + + def register(self, provider: BaseProvider) -> None: + """Register a provider instance.""" + name = provider.get_provider_name() + self._providers[name] = provider + + def register_class(self, name: str, provider_class: Type[BaseProvider]) -> None: + """Register a provider class for lazy instantiation.""" + self._provider_classes[name] = provider_class + + def instantiate_registered(self) -> None: + for provider_class in self._provider_classes.values(): + try: + provider = provider_class() + self.register(provider) + except Exception: + # Skip providers that fail to initialize + pass + + def get_provider(self, name: str) -> Optional[BaseProvider]: + """Get a registered provider by name.""" + return self._providers.get(name) + + def get_available_providers(self) -> List[str]: + """Get names of all available providers (with valid data directories).""" + available: List[str] = [] + for name, provider in self._providers.items(): + if provider.is_available(): + available.append(name) + return available + + def get_all_providers(self) -> List[str]: + """Get names of all registered providers.""" + return list(self._providers.keys()) + + def discover_all_sessions(self) -> Iterator[SessionInfo]: + """Discover sessions from all available providers.""" + for provider in self._providers.values(): + if provider.is_available(): + yield from provider.discover_sessions() + + def discover_sessions_by_provider( + self, provider_name: str + ) -> Iterator[SessionInfo]: + """Discover sessions from a specific provider.""" + provider = self._providers.get(provider_name) + if provider and provider.is_available(): + yield from provider.discover_sessions() + + def load_session( + self, provider_name: str, session_id: str, max_messages: Optional[int] = None + ): + """Load a session from a specific provider.""" + provider = self._providers.get(provider_name) + if provider is None: + raise ValueError(f"Unknown provider: {provider_name}") + if not provider.is_available(): + raise ValueError(f"Provider {provider_name} is not available") + return provider.load_session(session_id, max_messages=max_messages) + + +def discover_providers() -> ProviderRegistry: + """Auto-discover available providers based on ~/. directories. + + Returns a ProviderRegistry with all available providers registered. + """ + registry = ProviderRegistry() + + from .claude import ClaudeProvider + + registry.register_class("claude", ClaudeProvider) + + registry.instantiate_registered() + + return registry