From 8745771576b787920cef36d8db224c740cd6c85d Mon Sep 17 00:00:00 2001 From: Jeremy Pinto Date: Sun, 27 Jul 2025 20:19:15 -0400 Subject: [PATCH 1/3] update messaging --- src/main.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main.py b/src/main.py index a333a70..cda19bb 100644 --- a/src/main.py +++ b/src/main.py @@ -53,8 +53,8 @@ def new(directory, custom_dir): print_section_header("Next Steps", "šŸš€") click.echo(f"1. {click.style('cd ' + target_dir, bold=True, fg='cyan')}") - click.echo(f"2. {click.style('Edit ctx.txt with your context', bold=True, fg='cyan')}") - click.echo(f"3. {click.style('Start exploring ideas on feature branches!', bold=True, fg='cyan')}") + click.echo(f"2. {click.style('Edit files with your context', bold=True, fg='cyan')}") + click.echo(f"3. {click.style('Save your work and load it into LLMs!', bold=True, fg='cyan')}") else: print_error_box(f"Failed to create repository: {result.error}") sys.exit(1) From c453766c696f667807fdd9a1c569dc54f18b3933 Mon Sep 17 00:00:00 2001 From: Jeremy Pinto Date: Mon, 18 Aug 2025 22:19:36 -0400 Subject: [PATCH 2/3] WIP: start adding sync feature --- src/ctx_core.py | 547 ++++++++++++++++++++++++++++++---------------- src/main.py | 199 ++++++++++++----- src/mcp_server.py | 285 +++++++++++++++--------- 3 files changed, 683 insertions(+), 348 deletions(-) diff --git a/src/ctx_core.py b/src/ctx_core.py index d296cc7..80fd1c9 100644 --- a/src/ctx_core.py +++ b/src/ctx_core.py @@ -65,7 +65,7 @@ class ShowAllResult: def collect_files(path: Path, relative_base: str = "", recursive=True, pattern: Optional[str] = None): # Get all files in the target directory all_files = [] - + def _collect_files(path: Path, relative_base: str = "", recursive=True, pattern: Optional[str] = None): """Recursively collect all files""" for item in path.iterdir(): @@ -73,23 +73,23 @@ def _collect_files(path: Path, relative_base: str = "", recursive=True, pattern: # Skip git files and hidden files if item.name.startswith('.'): continue - + # Build relative path from ctx root if relative_base: rel_path = f"{relative_base}/{item.name}" else: rel_path = item.name - + # Apply pattern filter if specified if pattern and not fnmatch.fnmatch(item.name, pattern): continue - + all_files.append((rel_path, item)) elif item.is_dir() and not item.name.startswith('.') and recursive: # Recursively scan subdirectories new_base = f"{relative_base}/{item.name}" if relative_base else item.name _collect_files(item, new_base) - + _collect_files(path, relative_base, recursive, pattern) # Sort files for consistent output @@ -99,10 +99,10 @@ def _collect_files(path: Path, relative_base: str = "", recursive=True, pattern: class CtxCore: """Core business logic for ctx operations""" - + def __init__(self): self._project_root: Optional[Path] = None - + @property def project_root(self) -> Path: if self._project_root is None: @@ -112,28 +112,28 @@ def project_root(self) -> Path: def find_project_root(self) -> Path: """Find the project root by looking for ctx.config file.""" current_dir = Path.cwd() - + # Search upward from current directory for parent in [current_dir] + list(current_dir.parents): if (parent / 'ctx.config').exists(): return parent - + return current_dir # Default to current directory if nothing is found - + def get_ctx_config_path(self) -> Path: """Get the path to the ctx.config file""" return self.project_root / 'ctx.config' - + def load_ctx_config(self) -> Dict[str, Any]: """Load the ctx configuration from ctx.config file""" config_path = self.get_ctx_config_path() - + if not config_path.exists(): return { 'active_ctx': None, 'discovered_ctx': [] } - + try: with open(config_path, 'rb') as f: config = tomllib.load(f) @@ -146,22 +146,22 @@ def load_ctx_config(self) -> Dict[str, Any]: 'active_ctx': None, 'discovered_ctx': [] } - + def save_ctx_config(self, config: Dict[str, Any]) -> OperationResult: """Save the ctx configuration to ctx.config file""" config_path = self.get_ctx_config_path() - + try: with open(config_path, 'wb') as f: tomli_w.dump(config, f) return OperationResult(True, "Configuration saved successfully") except Exception as e: return OperationResult(False, error=f"Could not save config: {e}") - + def ensure_ctx_config(self) -> OperationResult: """Ensure the ctx.config file exists, creating it if necessary""" config_path = self.get_ctx_config_path() - + if not config_path.exists(): default_config = { 'discovered_ctx': [] @@ -172,9 +172,9 @@ def ensure_ctx_config(self) -> OperationResult: return OperationResult(True, f"Created ctx.config at {str(config_path)}") except Exception as e: return OperationResult(False, error=f"Could not create ctx.config: {e}") - + return OperationResult(True, "ctx.config already exists") - + def get_active_ctx_path(self) -> Optional[Path]: """ Find the active ctx repository. @@ -195,19 +195,19 @@ def get_active_ctx_path(self) -> Optional[Path]: # 2. If not inside a repo, fall back to config file config = self.load_ctx_config() active_ctx_name = config.get('active_ctx') - + if not active_ctx_name: return None - + # Use project root to construct path ctx_path = self.project_root / active_ctx_name - + # Verify it still exists and has .ctx marker if ctx_path.exists() and (ctx_path / '.ctx').exists(): return ctx_path - + return None - + def get_ctx_repo(self) -> Optional[Repo]: """Get the GitPython repo object for the ctx directory""" ctx_dir = self.get_active_ctx_path() @@ -217,16 +217,16 @@ def get_ctx_repo(self) -> Optional[Repo]: return Repo(ctx_dir) except InvalidGitRepositoryError: return None - + def is_ctx_repo(self) -> bool: """Check if we're in or under a ctx repository""" return self.get_active_ctx_path() is not None - + def get_template_dir(self) -> Path: """Get the path to the template directory""" script_dir = Path(__file__).parent return script_dir / "template" - + def get_current_branch(self) -> str: """Get the current git branch name""" repo = self.get_ctx_repo() @@ -236,7 +236,7 @@ def get_current_branch(self) -> str: return repo.active_branch.name except: return 'main' - + def get_all_branches(self) -> List[str]: """Get all git branches""" repo = self.get_ctx_repo() @@ -246,13 +246,13 @@ def get_all_branches(self) -> List[str]: return [branch.name for branch in repo.branches] except: return ['main'] - + def get_changed_files(self, source_branch: str, target_branch: str = 'main') -> List[str]: """Get files that differ between two branches""" repo = self.get_ctx_repo() if not repo: return [] - + try: # Get the diff between branches diff = repo.git.diff('--name-only', f'{target_branch}...{source_branch}') @@ -261,27 +261,27 @@ def get_changed_files(self, source_branch: str, target_branch: str = 'main') -> return [line.strip() for line in diff.split('\n') if line.strip()] except GitCommandError: return [] - + def get_file_content_at_branch(self, filepath: str, branch: str) -> Optional[str]: """Get file content at a specific branch""" repo = self.get_ctx_repo() if not repo: return None - + try: return repo.git.show(f'{branch}:{filepath}') except GitCommandError: return None - + def detect_merge_conflicts(self, source_branch: str, target_branch: str = 'main') -> List[Dict[str, str]]: """Detect potential merge conflicts between branches""" conflicts = [] changed_files = self.get_changed_files(source_branch, target_branch) - + for filepath in changed_files: target_content = self.get_file_content_at_branch(filepath, target_branch) source_content = self.get_file_content_at_branch(filepath, source_branch) - + if target_content is not None and source_content is not None: if target_content != source_content: conflicts.append({ @@ -289,79 +289,79 @@ def detect_merge_conflicts(self, source_branch: str, target_branch: str = 'main' 'target_content': target_content, 'source_content': source_content }) - + return conflicts - + # Core Operations - + def create_new_ctx(self, directory: str = 'context') -> OperationResult: """Create a new ctx repository""" # Ensure ctx.config exists first config_result = self.ensure_ctx_config() if not config_result.success: return config_result - + ctx_dir = self.project_root / directory - + if ctx_dir.exists(): # Also check for .ctx marker if (ctx_dir / '.ctx').exists(): return OperationResult(False, error=f"Directory '{directory}' already exists") - + # Create directory ctx_dir.mkdir(parents=True) - + # Copy template files template_dir = self.get_template_dir() if not template_dir.exists(): return OperationResult(False, error=f"Template directory not found at {template_dir}") - + copied_files = [] - + # Copy all files from template directory for template_file in template_dir.glob('*'): if template_file.is_file(): dest_file = ctx_dir / template_file.name shutil.copy2(template_file, dest_file) copied_files.append(template_file.name) - + # Create .ctx marker file ctx_marker = ctx_dir / '.ctx' ctx_marker.touch() - + # Initialize the git repo in the directory try: repo = Repo.init(ctx_dir) - + # Add all files to git (including .ctx marker) repo.git.add('-A') - + # Commit the initial files repo.index.commit('first commit') - + # Add to config as the active ctx repository config = self.load_ctx_config() - + # Convert to relative path from current directory try: relative_path = str(ctx_dir.relative_to(Path.cwd())) except ValueError: relative_path = str(ctx_dir.name) - + # Add to discovered list if not already there if relative_path not in config['discovered_ctx']: config['discovered_ctx'].append(relative_path) - + # Set as active config['active_ctx'] = relative_path - + # Save config save_result = self.save_ctx_config(config) if not save_result.success: return save_result - + return OperationResult( - True, + True, f"ctx repository initialized successfully in '{directory}'", data={ 'directory': directory, @@ -369,19 +369,19 @@ def create_new_ctx(self, directory: str = 'context') -> OperationResult: 'relative_path': relative_path } ) - + except Exception as e: return OperationResult(False, error=f"Error initializing git repository: {e}") - + def get_status(self) -> OperationResult: """Get the current status of the ctx repository""" if not self.is_ctx_repo(): return OperationResult(False, error="Not in a ctx repository") - + ctx_dir = self.get_active_ctx_path() if not ctx_dir: return OperationResult(False, error="No ctx repository found") - + repo_info = RepositoryInfo( name=ctx_dir.name, path=ctx_dir, @@ -390,14 +390,14 @@ def get_status(self) -> OperationResult: exists=True, is_valid=True ) - + current_branch = self.get_current_branch() all_branches = self.get_all_branches() - + repo = self.get_ctx_repo() uncommitted_changes = [] is_dirty = False - + if repo: try: is_dirty = repo.is_dirty() @@ -407,7 +407,7 @@ def get_status(self) -> OperationResult: uncommitted_changes.append(item.strip()) except: pass - + status = RepositoryStatus( repository=repo_info, current_branch=current_branch, @@ -415,18 +415,18 @@ def get_status(self) -> OperationResult: is_dirty=is_dirty, uncommitted_changes=uncommitted_changes ) - + return OperationResult(True, "Status retrieved successfully", data=status) - + def start_exploration(self, topic: str) -> OperationResult: """Start exploring a new topic or idea""" if not self.is_ctx_repo(): return OperationResult(False, error="Not in a ctx repository") - + repo = self.get_ctx_repo() if not repo: return OperationResult(False, error="No ctx repository found") - + try: # Create and checkout new branch new_branch = repo.create_head(topic) @@ -434,85 +434,85 @@ def start_exploration(self, topic: str) -> OperationResult: return OperationResult(True, f"Started exploring '{topic}'", data={'topic': topic}) except Exception as e: return OperationResult(False, error=f"Error starting exploration: {e}") - + def save(self, message: str) -> OperationResult: """Save the current state of the context repository""" if not self.is_ctx_repo(): return OperationResult(False, error="Not in a ctx repository") - + repo = self.get_ctx_repo() if not repo: return OperationResult(False, error="No ctx repository found") - + try: # Add all changes repo.git.add(A=True) - + # Check if there are changes to commit if not repo.is_dirty(): return OperationResult(True, "No changes to save") - + # Commit the changes repo.index.commit(message) - + return OperationResult(True, f"Saved context: {message}") - + except Exception as e: return OperationResult(False, error=f"Error saving context: {e}") - + def discard(self, force: bool = False) -> OperationResult: """Discard all changes and reset to the last commit - + This performs a git reset --hard HEAD operation, which: - Removes all staged changes - Removes all unstaged changes - Resets all files to their state at the last commit - + Args: force: If True, also removes untracked files and directories (git clean -fd) """ if not self.is_ctx_repo(): return OperationResult(False, error="Not in a ctx repository") - + repo = self.get_ctx_repo() if not repo: return OperationResult(False, error="No ctx repository found") - + try: # Perform hard reset to HEAD repo.git.reset('--hard', 'HEAD') - + # Only clean untracked files if force is True if force: repo.git.clean('-fd') return OperationResult(True, "All changes discarded and untracked files removed. Reset to last commit.") else: return OperationResult(True, "All changes discarded. Reset to last commit.") - + except GitCommandError as e: return OperationResult(False, error=f"Error discarding changes: {e}") except Exception as e: return OperationResult(False, error=f"Error discarding changes: {e}") - + def get_merge_preview(self, source_branch: str, target_branch: str = 'main') -> OperationResult: """Get a preview of what would happen in a merge""" if not self.is_ctx_repo(): return OperationResult(False, error="Not in a ctx repository") - + # Validate branches exist all_branches = self.get_all_branches() if source_branch not in all_branches: return OperationResult(False, error=f"Exploration '{source_branch}' does not exist") - + if target_branch not in all_branches: return OperationResult(False, error=f"Target branch '{target_branch}' does not exist") - + if source_branch == target_branch: return OperationResult(False, error="Cannot integrate exploration into itself") - + changed_files = self.get_changed_files(source_branch, target_branch) conflicts = self.detect_merge_conflicts(source_branch, target_branch) - + preview = MergePreview( source_branch=source_branch, target_branch=target_branch, @@ -521,52 +521,52 @@ def get_merge_preview(self, source_branch: str, target_branch: str = 'main') -> has_conflicts=len(conflicts) > 0, is_clean=len(conflicts) == 0 ) - + return OperationResult(True, "Merge preview generated", data=preview) - + def perform_integration(self, source_branch: str, target_branch: str = 'main') -> OperationResult: """Perform the actual merge/integration""" if not self.is_ctx_repo(): return OperationResult(False, error="Not in a ctx repository") - + repo = self.get_ctx_repo() if not repo: return OperationResult(False, error="No ctx repository found") - + try: # Switch to target branch repo.git.checkout(target_branch) - + # Perform merge repo.git.merge(source_branch, '--no-edit') - + return OperationResult( - True, + True, f"Successfully merged {source_branch} into {target_branch}", data={ 'source_branch': source_branch, 'target_branch': target_branch } ) - + except GitCommandError as e: return OperationResult(False, error=f"Merge failed: {e}") except Exception as e: return OperationResult(False, error=f"Error performing integration: {e}") - + def list_repositories(self) -> OperationResult: """List all discovered ctx repositories""" config = self.load_ctx_config() - + if not config['discovered_ctx']: return OperationResult(True, "No ctx repositories found", data=[]) - + repositories = [] for ctx_path in config['discovered_ctx']: full_path = Path.cwd() / ctx_path is_active = ctx_path == config['active_ctx'] exists = full_path.exists() and (full_path / '.ctx').exists() - + repo_info = RepositoryInfo( name=ctx_path, path=full_path, @@ -576,54 +576,54 @@ def list_repositories(self) -> OperationResult: is_valid=exists ) repositories.append(repo_info) - + return OperationResult(True, "Repositories listed", data=repositories) - + def switch_repository(self, ctx_name: str) -> OperationResult: """Switch to a different ctx repository""" config = self.load_ctx_config() - + if ctx_name not in config['discovered_ctx']: return OperationResult( - False, + False, error=f"ctx repository '{ctx_name}' not found in config", data={'available_repositories': config['discovered_ctx']} ) - + # Verify the repository still exists ctx_path = Path.cwd() / ctx_name - + if not ctx_path.exists() or not (ctx_path / '.ctx').exists(): return OperationResult(False, error=f"ctx repository '{ctx_name}' directory is missing or invalid") - + # Switch to the new active repository config['active_ctx'] = ctx_name save_result = self.save_ctx_config(config) - + if not save_result.success: return save_result - + return OperationResult(True, f"Switched to ctx repository: {ctx_name}", data={'repository': ctx_name}) - + def get_diff(self, staged: bool = False, branches: Optional[List[str]] = None) -> OperationResult: """Get git diff output for the ctx repository""" if not self.is_ctx_repo(): return OperationResult(False, error="Not in a ctx repository") - + repo = self.get_ctx_repo() if not repo: return OperationResult(False, error="No ctx repository found") - + try: # Build git diff command based on options and arguments diff_args = [] - + if staged: diff_args.append('--staged') - + if branches is None: branches = [] - + # Handle branch arguments if len(branches) == 1: # Compare current branch with specified branch @@ -633,12 +633,12 @@ def get_diff(self, staged: bool = False, branches: Optional[List[str]] = None) - diff_args.append(f'{branches[0]}...{branches[1]}') elif len(branches) > 2: return OperationResult(False, error="Too many branch arguments. Use 0, 1, or 2 branches.") - + # Get diff output diff_output = repo.git.diff(*diff_args) - + return OperationResult( - True, + True, "Diff retrieved successfully", data={ 'diff': diff_output, @@ -647,12 +647,12 @@ def get_diff(self, staged: bool = False, branches: Optional[List[str]] = None) - 'has_changes': bool(diff_output.strip()) } ) - + except GitCommandError as e: if "unknown revision" in str(e).lower(): all_branches = self.get_all_branches() return OperationResult( - False, + False, error="Unknown branch or revision specified", data={'available_branches': all_branches} ) @@ -669,24 +669,24 @@ def load_ctx(self, ctx_name: Optional[str] = None, top_level: bool = True, patte switch_result = self.switch_repository(ctx_name) if not switch_result.success: return OperationResult(False, error=f"ctx '{ctx_name}' not found. Use 'ctx list' to show available contexts.") - + return self.show_all(top_level=top_level, pattern=pattern) def show_all(self, directory: Optional[str] = None, branch: Optional[str] = None, pattern: Optional[str] = None, top_level: bool = False) -> OperationResult: """Display all file contents with clear delimiters for LLM context absorption - + Args: directory: Optional directory to show (relative to ctx root) branch: Optional branch to show files from (default: current branch) pattern: Optional file pattern to filter (e.g., "*.md") - + Returns: OperationResult with file contents and metadata """ if not self.is_ctx_repo(): return OperationResult(False, error="Not in a ctx repository") - + repo = self.get_ctx_repo() if not repo: return OperationResult(False, error="No ctx repository found") @@ -694,16 +694,16 @@ def show_all(self, directory: Optional[str] = None, branch: Optional[str] = None # Determine the branch to use if branch is None: branch = self.get_current_branch() - + # Validate branch exists all_branches = self.get_all_branches() if branch not in all_branches: return OperationResult(False, error=f"Branch '{branch}' does not exist") - + ctx_root = self.get_active_ctx_path() if not ctx_root: return OperationResult(False, error="Could not find ctx root") - + # Determine the directory to scan if directory: target_dir = ctx_root / directory @@ -715,14 +715,14 @@ def show_all(self, directory: Optional[str] = None, branch: Optional[str] = None else: target_dir = ctx_root scan_path = "." - + try: all_files = collect_files(target_dir, scan_path if scan_path != "." else "") if top_level: top_level_files = collect_files(target_dir, scan_path if scan_path != "." else "", recursive=False) - - + + # Read file contents file_contents = [] current_files = top_level_files if top_level else all_files @@ -739,14 +739,14 @@ def show_all(self, directory: Optional[str] = None, branch: Optional[str] = None # Read from working directory with open(file_path, 'r', encoding='utf-8', errors='replace') as f: content = f.read() - + file_contents.append({ 'path': rel_path, 'content': content, 'size': len(content), 'lines': content.count('\n') + 1 if content else 0 }) - + except Exception as e: # Skip files that can't be read (binary files, etc.) file_contents.append({ @@ -755,7 +755,7 @@ def show_all(self, directory: Optional[str] = None, branch: Optional[str] = None 'size': 0, 'lines': 0 }) - + active_ctx = self.load_ctx_config()['active_ctx'] result = ShowAllResult( ctx_name = active_ctx, @@ -769,30 +769,30 @@ def show_all(self, directory: Optional[str] = None, branch: Optional[str] = None ) formatted_result = self.format_show_all(result) - + return OperationResult(True, "Files retrieved successfully", data=formatted_result) - + except Exception as e: - return OperationResult(False, error=f"Error reading files: {e}") - + return OperationResult(False, error=f"Error reading files: {e}") + def get_repository_info(self, name: str) -> RepositoryInfo: """Get information about a specific ctx repository""" config = self.load_ctx_config() active_ctx = config.get('active_ctx') - + path = self.project_root / name absolute_path = path.resolve() exists = path.exists() and (path / '.ctx').exists() - + repo = None if exists: try: repo = Repo(path) except InvalidGitRepositoryError: pass - + is_valid = repo is not None - + return RepositoryInfo( name=name, path=path, @@ -801,22 +801,22 @@ def get_repository_info(self, name: str) -> RepositoryInfo: exists=exists, is_valid=is_valid ) - + def format_show_all(self, show_result: ShowAllResult) -> str: """Format show_all output with clear delimiters for LLM context absorption. - + This method combines the data retrieval and formatting into a single output, perfect for providing complete context to LLM agents or CLI users. - + Args: directory: Optional directory to show (relative to ctx root) branch: Optional branch to show files from (default: current branch) pattern: Optional file pattern to filter (e.g., "*.md") - + Returns: Formatted string with all file contents and clear delimiters """ - + # Build formatted output output = "=" * 80 + "\n" output += "šŸ“ CTX REPOSITORY CONTENTS\n" @@ -829,7 +829,7 @@ def format_show_all(self, show_result: ShowAllResult) -> str: if show_result.pattern: output += f"Pattern: {show_result.pattern}\n" output += f"Total files: {show_result.total_files}\n\n" - + if show_result.top_level: output += "Showing only top-level files and contents of ctx directory\n" else: @@ -843,106 +843,106 @@ def format_show_all(self, show_result: ShowAllResult) -> str: output += "=" * 80 + "\n\n" output += file_info['content'] output += "\n\n" - + output += "=" * 80 + "\n" output += "List of all available files:\n" output += "=" * 80 + "\n" for file in show_result.all_files: output += str(file[0]) + "\n" output += "\n\n" - + output += "=" * 80 + "\n" output += f"āœ… Currently active ctx: {show_result.ctx_name}\n" output += "=" * 80 + "\n" - + return output - + def move_file(self, source: str, destination: str) -> OperationResult: """Move a file within the ctx repository (git mv equivalent) - + Args: source: Source file path (relative to ctx root) destination: Destination file path (relative to ctx root) - + Returns: OperationResult with success/failure information """ if not self.is_ctx_repo(): return OperationResult(False, error="Not in a ctx repository") - + repo = self.get_ctx_repo() if not repo: return OperationResult(False, error="No ctx repository found") - + ctx_root = self.get_active_ctx_path() if not ctx_root: return OperationResult(False, error="Could not find ctx root") - + source_path = ctx_root / source destination_path = ctx_root / destination - + # Validate source file exists if not source_path.exists(): return OperationResult(False, error=f"Source file '{source}' does not exist") - + if not source_path.is_file(): return OperationResult(False, error=f"'{source}' is not a file") - + # Validate destination doesn't exist if destination_path.exists(): return OperationResult(False, error=f"Destination '{destination}' already exists") - + # Create destination directory if it doesn't exist destination_path.parent.mkdir(parents=True, exist_ok=True) - + try: # Use git mv to move the file repo.git.mv(source, destination) - + return OperationResult( - True, + True, f"Moved '{source}' to '{destination}'", data={ 'source': source, 'destination': destination } ) - + except GitCommandError as e: return OperationResult(False, error=f"Error moving file: {e}") except Exception as e: return OperationResult(False, error=f"Error moving file: {e}") - + def remove_file(self, filepath: str, force: bool = False) -> OperationResult: """Remove a file from the ctx repository (git rm / safe rm equivalent) - + Args: filepath: Path to the file to remove (relative to ctx root) force: If True, force removal even if file has uncommitted changes - + Returns: OperationResult with success/failure information """ if not self.is_ctx_repo(): return OperationResult(False, error="Not in a ctx repository") - + repo = self.get_ctx_repo() if not repo: return OperationResult(False, error="No ctx repository found") - + ctx_root = self.get_active_ctx_path() if not ctx_root: return OperationResult(False, error="Could not find ctx root") - + file_path = ctx_root / filepath - + # Validate file exists if not file_path.exists(): return OperationResult(False, error=f"File '{filepath}' does not exist") - + if not file_path.is_file(): return OperationResult(False, error=f"'{filepath}' is not a file") - + try: # Check if file is tracked by git try: @@ -950,7 +950,7 @@ def remove_file(self, filepath: str, force: bool = False) -> OperationResult: is_tracked = True except GitCommandError: is_tracked = False - + if is_tracked: # Use git rm to remove the file from git and filesystem if force: @@ -962,7 +962,7 @@ def remove_file(self, filepath: str, force: bool = False) -> OperationResult: except GitCommandError as e: if "has changes staged" in str(e) or "has local modifications" in str(e): return OperationResult( - False, + False, error=f"File '{filepath}' has uncommitted changes. Use --force to remove anyway." ) else: @@ -970,9 +970,9 @@ def remove_file(self, filepath: str, force: bool = False) -> OperationResult: else: # File is not tracked by git, just remove from filesystem file_path.unlink() - + return OperationResult( - True, + True, f"Removed '{filepath}'" + (" (forced)" if force else ""), data={ 'filepath': filepath, @@ -980,43 +980,214 @@ def remove_file(self, filepath: str, force: bool = False) -> OperationResult: 'forced': force } ) - + except GitCommandError as e: return OperationResult(False, error=f"Error removing file: {e}") except Exception as e: return OperationResult(False, error=f"Error removing file: {e}") - + def delete_repository(self, name: str, force: bool = False) -> OperationResult: """Delete a ctx repository""" config = self.load_ctx_config() - + if name not in config['discovered_ctx']: return OperationResult(False, error=f"Context repository '{name}' not found in config") - + ctx_path = self.project_root / name - + if not ctx_path.exists() or not (ctx_path / '.ctx').exists(): # Directory doesn't exist, so just remove from config pass - + # Remove from discovered list config['discovered_ctx'].remove(name) - + # If it's the active repository, set active to None if config['active_ctx'] == name: config['active_ctx'] = None - + # Save updated config save_result = self.save_ctx_config(config) - + if not save_result.success: return save_result - + # If directory exists, delete it if ctx_path.exists(): try: shutil.rmtree(ctx_path) except Exception as e: return OperationResult(False, error=f"Error deleting directory: {e}") - - return OperationResult(True, f"Deleted ctx repository: {name}", data={'repository': name}) \ No newline at end of file + + return OperationResult(True, f"Deleted ctx repository: {name}", data={'repository': name}) + + # Remote Repository Management + + def add_remote(self, url: str, name: str = "origin") -> OperationResult: + """Add a git remote to the repository + + Args: + url: Git URL for the remote repository + name: Name for the remote (e.g., 'origin', 'hf', 'github') (default: 'origin') + + Returns: + OperationResult with success/failure information + """ + if not self.is_ctx_repo(): + return OperationResult(False, error="Not in a ctx repository") + + repo = self.get_ctx_repo() + if not repo: + return OperationResult(False, error="No ctx repository found") + + try: + # Check if remote already exists + existing_remotes = [remote.name for remote in repo.remotes] + if name in existing_remotes: + return OperationResult(False, error=f"Remote '{name}' already exists") + + # Add the remote + repo.create_remote(name, url) + + return OperationResult( + True, + f"Added remote '{name}' -> {url}", + data={ + 'name': name, + 'url': url + } + ) + + except GitCommandError as e: + return OperationResult(False, error=f"Error adding remote: {e}") + except Exception as e: + return OperationResult(False, error=f"Error adding remote: {e}") + + def list_remotes(self) -> OperationResult: + """List all configured remotes + + Returns: + OperationResult with list of remotes and their URLs + """ + if not self.is_ctx_repo(): + return OperationResult(False, error="Not in a ctx repository") + + repo = self.get_ctx_repo() + if not repo: + return OperationResult(False, error="No ctx repository found") + + try: + remotes = [] + for remote in repo.remotes: + remotes.append({ + 'name': remote.name, + 'url': list(remote.urls)[0] if remote.urls else '', + 'fetch_url': list(remote.urls)[0] if remote.urls else '', + 'push_url': list(remote.urls)[0] if remote.urls else '' + }) + + if not remotes: + return OperationResult(True, "No remotes configured", data=[]) + + return OperationResult( + True, + f"Found {len(remotes)} remote(s)", + data=remotes + ) + + except Exception as e: + return OperationResult(False, error=f"Error listing remotes: {e}") + + def sync_remote(self, remote_name: str = "origin") -> OperationResult: + """Pull then push to remote (basic sync) + + Args: + remote_name: Name of the remote to sync with (default: 'origin') + + Returns: + OperationResult with sync information + """ + if not self.is_ctx_repo(): + return OperationResult(False, error="Not in a ctx repository") + + repo = self.get_ctx_repo() + if not repo: + return OperationResult(False, error="No ctx repository found") + + try: + # Check if remote exists + remote_names = [remote.name for remote in repo.remotes] + if remote_name not in remote_names: + return OperationResult( + False, + error=f"Remote '{remote_name}' not found. Available remotes: {', '.join(remote_names) if remote_names else 'none'}" + ) + + remote = repo.remotes[remote_name] + current_branch = self.get_current_branch() + + # Check if we have any changes that need to be committed first + if repo.is_dirty(): + return OperationResult( + False, + error="Repository has uncommitted changes. Please commit or discard changes before syncing." + ) + + sync_info = { + 'remote': remote_name, + 'branch': current_branch, + 'pulled': False, + 'pushed': False, + 'conflicts': False + } + + try: + # Pull from remote + pull_info = remote.pull(current_branch) + sync_info['pulled'] = True + sync_info['pull_info'] = str(pull_info[0].flags) if pull_info else "up-to-date" + + except GitCommandError as e: + if "merge conflict" in str(e).lower() or "conflict" in str(e).lower(): + sync_info['conflicts'] = True + return OperationResult( + False, + error=f"Pull resulted in merge conflicts. Please resolve conflicts manually and try again.", + data=sync_info + ) + else: + # Try to continue even if pull fails (might be first push) + sync_info['pull_error'] = str(e) + + try: + # Push to remote + push_info = remote.push(current_branch) + sync_info['pushed'] = True + sync_info['push_info'] = str(push_info[0].flags) if push_info else "up-to-date" + + except GitCommandError as e: + if "rejected" in str(e).lower(): + return OperationResult( + False, + error=f"Push rejected. The remote may have changes that need to be pulled first.", + data=sync_info + ) + else: + return OperationResult( + False, + error=f"Push failed: {e}", + data=sync_info + ) + + message = f"Synced with remote '{remote_name}'" + if sync_info.get('pull_error'): + message += f" (pull had issues: {sync_info['pull_error']})" + + return OperationResult( + True, + message, + data=sync_info + ) + + except Exception as e: + return OperationResult(False, error=f"Error syncing with remote: {e}") \ No newline at end of file diff --git a/src/main.py b/src/main.py index cda19bb..04c4472 100644 --- a/src/main.py +++ b/src/main.py @@ -4,8 +4,8 @@ import sys from src.ctx_core import CtxCore from src.cli_styles import ( - print_banner, print_section_header, print_success_box, print_warning_box, - print_error_box, print_repository_card, print_status_summary, + print_banner, print_section_header, print_success_box, print_warning_box, + print_error_box, print_repository_card, print_status_summary, print_integration_preview, print_celebration, print_explore_banner ) @@ -27,7 +27,7 @@ def main(ctx): @click.option('--dir', 'custom_dir', help='Custom directory name (alternative to positional argument)') def new(directory, custom_dir): """Create a new ctx repository - + Examples: ctx new # Creates 'context' directory ctx new my-research # Creates 'my-research' directory @@ -35,12 +35,12 @@ def new(directory, custom_dir): """ # Use custom_dir if provided, otherwise use directory argument target_dir = custom_dir if custom_dir else directory - + result = ctx_core.create_new_ctx(target_dir) - + if result.success: print_section_header("Creating New Repository", "šŸŽÆ") - + click.echo(f"\nšŸ“‚ Creating '{target_dir}' directory and copying template files...") for filename in result.data['copied_files']: click.echo(f" āœ“ Copied {filename}") @@ -48,9 +48,9 @@ def new(directory, custom_dir): click.echo(f" āœ“ Initialized git repository") click.echo(f" āœ“ Files committed with 'first commit' message") click.echo(f" āœ“ Added '{target_dir}' to ctx config as active repository") - + print_celebration() - + print_section_header("Next Steps", "šŸš€") click.echo(f"1. {click.style('cd ' + target_dir, bold=True, fg='cyan')}") click.echo(f"2. {click.style('Edit files with your context', bold=True, fg='cyan')}") @@ -65,18 +65,18 @@ def new(directory, custom_dir): @click.option('--target', default='main', help='Target branch to integrate into (default: main)') def integrate(exploration, preview, target): """Integrate insights from an exploration - + Git equivalent: git merge """ # Get merge preview preview_result = ctx_core.get_merge_preview(exploration, target) - + if not preview_result.success: click.echo(f"Error: {preview_result.error}", err=True) sys.exit(1) - + merge_preview = preview_result.data - + # Show beautiful preview print_integration_preview( source=merge_preview.source_branch, @@ -85,24 +85,24 @@ def integrate(exploration, preview, target): has_conflicts=merge_preview.has_conflicts, conflicts=merge_preview.conflicts ) - + if not merge_preview.changed_files: return - + # If preview mode, stop here if preview: return - + # Ask for confirmation if there are conflicts if merge_preview.has_conflicts: if not click.confirm(f"\nāš ļø Conflicts detected. Proceed with integration anyway?"): click.echo("Integration cancelled.") return - + # Perform the integration click.echo(f"\nProceeding with integration...") integration_result = ctx_core.perform_integration(exploration, target) - + if integration_result.success: print_celebration() print_success_box(f"Insights from '{exploration}' successfully integrated into '{target}'!", "šŸŽ‰") @@ -114,13 +114,13 @@ def integrate(exploration, preview, target): def status(): """Show current ctx repository status""" result = ctx_core.get_status() - + if not result.success: print_error_box(f"Failed to get status: {result.error}") sys.exit(1) - + status_data = result.data - + print_status_summary( repository_name=status_data.repository.name, current_branch=status_data.current_branch, @@ -134,11 +134,11 @@ def status(): @click.argument('topic') def explore(topic): """Start exploring a new topic or idea - + Git equivalent: git checkout -b """ result = ctx_core.start_exploration(topic) - + if result.success: print_explore_banner(topic) print_success_box("Branch created successfully!\nDocument your ideas and insights as you explore!", "šŸš€") @@ -150,11 +150,11 @@ def explore(topic): @click.argument('message') def save(message): """Saves the current state of the context repository - + Git equivalent: git add -A && git commit -m "" """ result = ctx_core.save(message) - + if result.success: print_success_box(f"Saved: {result.message}", "šŸ’¾") else: @@ -165,12 +165,12 @@ def save(message): @click.option('--force', is_flag=True, help='Force discard without confirmation and remove untracked files') def discard(force): """Reset to last commit, dropping all changes - + Git equivalent: git reset --hard HEAD - + This will: - Remove all staged changes - - Remove all unstaged changes + - Remove all unstaged changes - Reset all files to their state at the last commit - With --force: also removes untracked files and directories """ @@ -179,28 +179,28 @@ def discard(force): if not status_result.success: click.echo(f"Error: {status_result.error}", err=True) sys.exit(1) - + if not status_result.data.is_dirty: click.echo("No changes to discard. Working tree is clean.") return - + # Show what will be discarded click.echo("The following changes will be permanently lost:") for item in status_result.data.uncommitted_changes: click.echo(f" {item}") - + if force: click.echo("\nāš ļø --force flag: untracked files will also be removed") - + # Ask for confirmation unless --force is used if not force: if not click.confirm("\nAre you sure you want to discard all changes? This cannot be undone"): click.echo("Discard cancelled.") return - + # Perform the discard result = ctx_core.discard(force=force) - + if result.success: click.echo(f"āœ“ {result.message}") else: @@ -211,20 +211,20 @@ def discard(force): def list_repos(): """List all discovered ctx repositories""" result = ctx_core.list_repositories() - + if not result.success: print_error_box(f"Failed to list repositories: {result.error}") sys.exit(1) - + repositories = result.data - + if not repositories: print_section_header("No Repositories Found", "šŸ“‚") print_warning_box("No ctx repositories found in config.\nRun 'ctx new' to create a new ctx repository.", "šŸ’”") return - + print_section_header("Discovered Repositories", "šŸ“‚") - + for repo_info in repositories: print_repository_card( name=repo_info.name, @@ -238,7 +238,7 @@ def list_repos(): def switch(ctx_name): """Switch to a different ctx repository""" result = ctx_core.switch_repository(ctx_name) - + if result.success: print_success_box(f"Switched to repository: {ctx_name}", "šŸ”„") else: @@ -255,9 +255,9 @@ def switch(ctx_name): @click.option('--pattern', help='File pattern to filter (e.g., "*.md")') def show_all(directory, branch, pattern): """Show all the current ctx repository contents - + Perfect for LLM context absorption - shows entire repository state in one command. - + Examples: ctx load # Show all files in current branch ctx load --pattern "*.md" # Show only markdown files @@ -274,9 +274,9 @@ def show_all(directory, branch, pattern): @click.option('--pattern', help='File pattern to filter (e.g., "*.md")') def load(ctx_name, pattern): """Load the ctx_name - + Perfect for LLM context absorption - shows entire repository state in one command. - + Examples: ctx load # Show all files in current branch ctx load --pattern "*.md" # Show only markdown files @@ -292,7 +292,7 @@ def load(ctx_name, pattern): @click.argument('branches', nargs=-1) def diff(staged, branches): """Show git diff equivalent for the ctx repository - + Examples: ctx difference # Show current changes ctx difference --staged # Show staged changes @@ -300,19 +300,19 @@ def diff(staged, branches): ctx difference feature-branch main # Show changes between two branches """ result = ctx_core.get_diff(staged=staged, branches=list(branches)) - + if not result.success: click.echo(f"Error: {result.error}", err=True) if result.data and 'available_branches' in result.data: click.echo(f"Available branches: {', '.join(result.data['available_branches'])}") sys.exit(1) - + diff_data = result.data - + if not diff_data['has_changes']: click.echo("No changes to show") return - + # Print diff header if diff_data['staged']: click.echo("Staged changes:") @@ -323,14 +323,14 @@ def diff(staged, branches): click.echo(f"Changes between {diff_data['branches'][0]} and {diff_data['branches'][1]}:") else: click.echo("Current changes:") - + click.echo("=" * 50) click.echo(diff_data['diff']) @main.command() def mcp(): """Start the MCP server for AI agent integration - + This starts the Model Context Protocol server that allows AI agents to connect and use ctx as persistent, version-controlled memory. """ @@ -352,16 +352,16 @@ def mcp(): @click.argument('destination') def mv(source, destination): """Move a file within the ctx repository - + Git equivalent: git mv - + Examples: ctx mv old-file.txt new-file.txt # Rename file ctx mv file.txt subdir/file.txt # Move to subdirectory ctx mv subdir/file.txt file.txt # Move to parent directory """ result = ctx_core.move_file(source, destination) - + if result.success: click.echo(f"āœ“ {result.message}") else: @@ -373,25 +373,110 @@ def mv(source, destination): @click.option('--force', is_flag=True, help='Force removal even if file has uncommitted changes') def rm(filepath, force): """Remove a file from the ctx repository - + Git equivalent: git rm - + This will: - Remove the file from git tracking - Remove the file from the filesystem - Fail if file has uncommitted changes (unless --force is used) - + Examples: ctx rm old-file.txt # Remove tracked file ctx rm --force modified-file.txt # Force remove file with changes """ result = ctx_core.remove_file(filepath, force=force) - + if result.success: click.echo(f"āœ“ {result.message}") else: click.echo(f"Error: {result.error}", err=True) sys.exit(1) +@main.group(name="remote") +def remote(): + """Manage remote repositories""" + pass + +@remote.command(name="add") +@click.argument('name') +@click.argument('url') +def remote_add(name, url): + """Add a remote repository + + Examples: + ctx remote add origin https://github.com/user/repo.git + ctx remote add hf https://huggingface.co/datasets/user/ctx-repo + """ + result = ctx_core.add_remote(name, url) + + if result.success: + print_success_box(f"Remote '{name}' added successfully!", "šŸ”—") + click.echo(f" URL: {url}") + else: + print_error_box(f"Failed to add remote: {result.error}") + sys.exit(1) + +@remote.command(name="list") +def remote_list(): + """List all configured remotes + + Shows all remotes with their URLs + """ + result = ctx_core.list_remotes() + + if not result.success: + print_error_box(f"Failed to list remotes: {result.error}") + sys.exit(1) + + remotes = result.data + + if not remotes: + print_section_header("No Remotes Configured", "šŸ”—") + print_warning_box("No remote repositories configured.\nUse 'ctx remote add ' to add a remote.", "šŸ’”") + return + + print_section_header("Configured Remotes", "šŸ”—") + + for remote in remotes: + click.echo(f" {click.style(remote['name'], bold=True, fg='cyan')} -> {remote['url']}") + +@main.command() +@click.argument('remote_name', required=False, default='origin') +def sync(remote_name): + """Sync with remote repository (pull then push) + + Git equivalent: git pull && git push + + Examples: + ctx sync # Sync with 'origin' remote + ctx sync hf # Sync with 'hf' remote + """ + result = ctx_core.sync_remote(remote_name) + + if result.success: + sync_data = result.data + print_success_box(f"Successfully synced with remote '{remote_name}'!", "šŸ”„") + + if sync_data: + click.echo(f" Branch: {sync_data.get('branch', 'unknown')}") + if sync_data.get('pulled'): + click.echo(f" āœ“ Pulled from remote") + if sync_data.get('pushed'): + click.echo(f" āœ“ Pushed to remote") + + if sync_data.get('pull_error'): + click.echo(f" āš ļø Pull had issues: {sync_data['pull_error']}") + else: + sync_data = result.data or {} + error_msg = f"Failed to sync: {result.error}" + + if sync_data.get('conflicts'): + error_msg += "\n\nThe repository has merge conflicts that need to be resolved manually." + error_msg += "\nUse standard git commands to resolve conflicts, then commit and try syncing again." + + print_error_box(error_msg) + sys.exit(1) + if __name__ == "__main__": main() \ No newline at end of file diff --git a/src/mcp_server.py b/src/mcp_server.py index 842c7c4..cd99b4b 100644 --- a/src/mcp_server.py +++ b/src/mcp_server.py @@ -38,7 +38,7 @@ def ctx_show_all(directory: str = "", branch: str = "", pattern: str = "") -> st directory: Optional directory to show (relative to ctx root) branch: Optional branch to show files from (default: current branch) pattern: Optional file pattern to filter (e.g., "*.md") - + Returns: Formatted output with all file contents and clear delimiters """ @@ -46,7 +46,7 @@ def ctx_show_all(directory: str = "", branch: str = "", pattern: str = "") -> st dir_param = directory if directory else None branch_param = branch if branch else None pattern_param = pattern if pattern else None - + result = core.show_all(directory=dir_param, branch=branch_param, pattern=pattern_param) if result.success: return result.data @@ -54,17 +54,17 @@ def ctx_show_all(directory: str = "", branch: str = "", pattern: str = "") -> st @mcp.tool def ctx_load(ctx_name: str = "", pattern: str = ""): - """Load the ctx from the ctx_name context + """Load the ctx from the ctx_name context To be invoked when a user asks for ctx load. Will show all available files, and contents of all top-level files. Perfect for LLM context absorption - shows entire repository state in one command. If a user does not specify a name, the currently active ctx will be loaded. - + Examples: ctx load # Show all files in current active ctx ctx load --pattern "*.md" # Show only markdown files - + """ pattern_param = pattern if pattern else None ctx_name_param = ctx_name if ctx_name else None @@ -76,15 +76,15 @@ def ctx_load(ctx_name: str = "", pattern: str = ""): @mcp.tool def ctx_new(directory: str = "context") -> str: """Create a new ctx repository for collaborative memory. - + Args: directory: Name of the directory to create (default: "context") - + Returns: Success message and repository details """ result = core.create_new_ctx(directory) - + if result.success: return f"āœ… {result.message}\n\nRepository created at: {directory}" else: @@ -93,12 +93,12 @@ def ctx_new(directory: str = "context") -> str: @mcp.tool def ctx_status() -> str: """Get the status of the current ctx repository. - + Returns: Repository status including current branch, changes, and metadata """ result = core.get_status() - + if result.success: status = result.data if status: @@ -108,12 +108,12 @@ def ctx_status() -> str: output += f"Current Branch: {status.current_branch}\n" output += f"All Branches: {', '.join(status.all_branches)}\n" output += f"Has Changes: {'Yes' if status.is_dirty else 'No'}\n" - + if status.uncommitted_changes: output += f"\nUncommitted Changes:\n" for change in status.uncommitted_changes: output += f" • {change}\n" - + return output else: return result.message @@ -123,24 +123,24 @@ def ctx_status() -> str: @mcp.tool def ctx_list() -> str: """List all available ctx repositories. - + Returns: List of discovered repositories with their status """ result = core.list_repositories() - + if result.success: repositories = result.data if not repositories: return "šŸ“‚ No ctx repositories found\n\nUse ctx_new() to create your first repository." - + output = "šŸ“‚ Available ctx repositories:\n\n" for repo in repositories: status = "🟢 Active" if repo.is_active else "⚪ Available" validity = "āœ… Valid" if repo.is_valid else "āŒ Invalid" output += f" {status} {repo.name} - {validity}\n" output += f" Path: {repo.path}\n\n" - + return output else: return f"āŒ {result.error}" @@ -148,15 +148,15 @@ def ctx_list() -> str: @mcp.tool def ctx_switch(repository_name: str) -> str: """Switch to a different ctx repository. - + Args: repository_name: Name of the repository to switch to - + Returns: Success message confirming the switch """ result = core.switch_repository(repository_name) - + if result.success: return f"āœ… {result.message}" else: @@ -171,15 +171,15 @@ def ctx_switch(repository_name: str) -> str: @mcp.tool def ctx_explore(topic: str) -> str: """Start exploring a new topic by creating a new branch. - + Args: topic: The topic or question to explore - + Returns: Success message and branch information """ result = core.start_exploration(topic) - + if result.success: return f"šŸ” {result.message}\n\nYou're now on branch '{topic}' ready to explore this topic." else: @@ -188,15 +188,15 @@ def ctx_explore(topic: str) -> str: @mcp.tool def ctx_save(message: str) -> str: """Saves the current state of the context repository. - + Args: message: Description of what you're saving - + Returns: Success message confirming the save """ result = core.save(message) - + if result.success: return f"šŸ’¾ {result.message}" else: @@ -205,21 +205,21 @@ def ctx_save(message: str) -> str: @mcp.tool def ctx_discard(force: bool = False) -> str: """Reset to last commit, dropping all changes. - + This performs a git reset --hard HEAD operation, which: - Removes all staged changes - Removes all unstaged changes - Resets all files to their state at the last commit - With force=True: also removes untracked files and directories - + Args: force: If True, also removes untracked files and directories (default: False) - + Returns: Success message confirming the discard operation """ result = core.discard(force=force) - + if result.success: return f"šŸ—‘ļø {result.message}" else: @@ -228,22 +228,22 @@ def ctx_discard(force: bool = False) -> str: @mcp.tool def ctx_integrate(source_branch: str, target_branch: str = "main") -> str: """Integrate insights from one branch into another. - + Args: source_branch: The branch with insights to integrate target_branch: The branch to integrate into (default: "main") - + Returns: Success message or conflict information """ # First get a preview to check for conflicts preview_result = core.get_merge_preview(source_branch, target_branch) - + if not preview_result.success: return f"āŒ {preview_result.error}" - + preview = preview_result.data - + if preview.has_conflicts: output = f"āš ļø Merge conflicts detected!\n\n" output += f"Conflicts in {len(preview.conflicts)} files:\n" @@ -251,10 +251,10 @@ def ctx_integrate(source_branch: str, target_branch: str = "main") -> str: output += f" • {conflict.get('file', 'Unknown file')}\n" output += f"\nResolve conflicts manually before integrating." return output - + # Perform the integration result = core.perform_integration(source_branch, target_branch) - + if result.success: return f"šŸ”„ {result.message}" else: @@ -263,12 +263,12 @@ def ctx_integrate(source_branch: str, target_branch: str = "main") -> str: @mcp.tool def ctx_diff(staged: bool = False, source_branch: str = "", target_branch: str = "") -> str: """Get differences between branches or current changes. - + Args: staged: Show only staged changes (default: False) source_branch: First branch to compare (optional) target_branch: Second branch to compare (optional) - + Returns: Diff output showing changes """ @@ -277,14 +277,14 @@ def ctx_diff(staged: bool = False, source_branch: str = "", target_branch: str = branches.append(source_branch) if target_branch: branches.append(target_branch) - + result = core.get_diff(staged=staged, branches=branches if branches else None) - + if result.success: diff_data = result.data if not diff_data['has_changes']: return "šŸ“„ No changes to show" - + output = "šŸ“‹ Diff Results\n\n" if diff_data['staged']: output += "Type: Staged changes\n" @@ -295,7 +295,7 @@ def ctx_diff(staged: bool = False, source_branch: str = "", target_branch: str = output += f"Type: Changes between {diff_data['branches'][0]} and {diff_data['branches'][1]}\n" else: output += "Type: Current changes\n" - + output += "=" * 50 + "\n" output += diff_data['diff'] return output @@ -307,17 +307,17 @@ def ctx_diff(staged: bool = False, source_branch: str = "", target_branch: str = @mcp.tool def ctx_read_file(filepath: str, branch: str = "") -> str: """Read a specific file from the ctx repository. - + Args: filepath: Path to the file relative to ctx root branch: Optional branch to read from (default: current branch) - + Returns: File contents or error message """ if not core.is_ctx_repo(): return "āŒ Not in a ctx repository" - + try: if branch: # Read from specific branch @@ -330,46 +330,46 @@ def ctx_read_file(filepath: str, branch: str = "") -> str: ctx_root = core.get_active_ctx_path() if not ctx_root: return "āŒ Could not find ctx root" - + file_path = ctx_root / filepath if not file_path.exists(): return f"āŒ File '{filepath}' not found" - + if not file_path.is_file(): return f"āŒ '{filepath}' is not a file" - + with open(file_path, 'r', encoding='utf-8', errors='replace') as f: content = f.read() - + return f"šŸ“„ {filepath}\n{'=' * 50}\n{content}" - + except Exception as e: return f"āŒ Error reading file: {e}" @mcp.tool def ctx_write_file(filepath: str, content: str) -> str: """Write content to a file in the ctx repository. - + Args: filepath: Path to the file to write content: Content to write to the file - + Returns: Success message or error """ ctx_root = core.get_active_ctx_path() if not ctx_root: return "āŒ Not in a ctx repository" - + try: file_path = ctx_root / filepath - + # Create parent directories if they don't exist file_path.parent.mkdir(parents=True, exist_ok=True) - + with open(file_path, 'w', encoding='utf-8') as f: f.write(content) - + return f"āœ… File '{filepath}' written successfully" except Exception as e: return f"āŒ Error writing file: {e}" @@ -377,40 +377,40 @@ def ctx_write_file(filepath: str, content: str) -> str: @mcp.tool def ctx_list_files(directory: str = "") -> str: """List files in the ctx repository. - + Args: directory: Directory to list (default: root of repository) - + Returns: List of files and directories """ ctx_root = core.get_active_ctx_path() if not ctx_root: return "āŒ Not in a ctx repository" - + try: target_dir = ctx_root / directory if directory else ctx_root - + if not target_dir.exists(): return f"āŒ Directory '{directory}' not found" - + if not target_dir.is_dir(): return f"āŒ '{directory}' is not a directory" - + output = f"šŸ“ Files in {directory or 'repository root'}:\n\n" - + # Get all items, sorted with directories first items = list(target_dir.iterdir()) items.sort(key=lambda x: (x.is_file(), x.name)) - + for item in items: if item.name.startswith('.'): continue # Skip hidden files - + icon = "šŸ“" if item.is_dir() else "šŸ“„" rel_path = item.relative_to(ctx_root) output += f" {icon} {rel_path}\n" - + return output except Exception as e: return f"āŒ Error listing files: {e}" @@ -418,18 +418,18 @@ def ctx_list_files(directory: str = "") -> str: @mcp.tool def ctx_move(source: str, destination: str) -> str: """Move a file within the ctx repository. - + Git equivalent: git mv - + Args: source: Source file path (relative to ctx root) destination: Destination file path (relative to ctx root) - + Returns: Success message or error """ result = core.move_file(source, destination) - + if result.success: return f"āœ… {result.message}" else: @@ -438,49 +438,128 @@ def ctx_move(source: str, destination: str) -> str: @mcp.tool def ctx_remove(filepath: str, force: bool = False) -> str: """Remove a file from the ctx repository. - + Git equivalent: git rm - + This will: - Remove the file from git tracking - Remove the file from the filesystem - Fail if file has uncommitted changes (unless force=True) - + Args: filepath: Path to the file to remove (relative to ctx root) force: If True, force removal even if file has uncommitted changes - + Returns: Success message or error """ result = core.remove_file(filepath, force=force) - + if result.success: return f"šŸ—‘ļø {result.message}" else: return f"āŒ {result.error}" +# === Remote Repository Management Tools === + +@mcp.tool +def ctx_remote_add(name: str, url: str) -> str: + """Add a remote repository. + + Args: + name: Name for the remote (e.g., 'origin', 'hf', 'github') + url: Git URL for the remote repository + + Returns: + Success message or error + """ + result = core.add_remote(name, url) + + if result.success: + return f"šŸ”— {result.message}" + else: + return f"āŒ {result.error}" + +@mcp.tool +def ctx_remote_list() -> str: + """List all configured remotes. + + Returns: + List of remotes with their URLs + """ + result = core.list_remotes() + + if result.success: + remotes = result.data + if not remotes: + return "šŸ”— No remotes configured\n\nUse ctx_remote_add() to add a remote repository." + + output = "šŸ”— Configured remotes:\n\n" + for remote in remotes: + output += f" šŸ“ {remote['name']} -> {remote['url']}\n" + + return output + else: + return f"āŒ {result.error}" + +@mcp.tool +def ctx_sync(remote_name: str = "origin") -> str: + """Sync with remote repository (pull then push). + + Args: + remote_name: Name of the remote to sync with (default: 'origin') + + Returns: + Success message with sync details or error information + """ + result = core.sync_remote(remote_name) + + if result.success: + sync_data = result.data + output = f"šŸ”„ Successfully synced with remote '{remote_name}'\n\n" + + if sync_data: + output += f"Branch: {sync_data.get('branch', 'unknown')}\n" + if sync_data.get('pulled'): + output += "āœ… Pulled from remote\n" + if sync_data.get('pushed'): + output += "āœ… Pushed to remote\n" + + if sync_data.get('pull_error'): + output += f"āš ļø Pull had issues: {sync_data['pull_error']}\n" + + return output + else: + sync_data = result.data or {} + error_msg = f"āŒ Failed to sync: {result.error}" + + if sync_data.get('conflicts'): + error_msg += "\n\nāš ļø The repository has merge conflicts that need to be resolved manually." + error_msg += "\nUse standard git commands to resolve conflicts, then commit and try syncing again." + + return error_msg + # === Navigation Tools === @mcp.tool def ctx_get_branches() -> str: """Get all branches in the ctx repository. - + Returns: List of all branches with current branch highlighted """ if not core.is_ctx_repo(): return "āŒ Not in a ctx repository" - + try: current_branch = core.get_current_branch() all_branches = core.get_all_branches() - + output = "🌿 Repository branches:\n\n" for branch in all_branches: indicator = "→" if branch == current_branch else " " output += f" {indicator} {branch}\n" - + return output except Exception as e: return f"āŒ Error getting branches: {e}" @@ -488,21 +567,21 @@ def ctx_get_branches() -> str: @mcp.tool def ctx_get_history(branch: str = "", limit: int = 10) -> str: """Get commit history for a branch. - + Args: branch: Branch to get history for (default: current branch) limit: Maximum number of commits to show (default: 10) - + Returns: Commit history with messages and dates """ if not core.is_ctx_repo(): return "āŒ Not in a ctx repository" - + repo = core.get_ctx_repo() if not repo: return "āŒ No ctx repository found" - + try: if branch: # Get history for specific branch @@ -513,24 +592,24 @@ def ctx_get_history(branch: str = "", limit: int = 10) -> str: # Get history for current branch target_branch = repo.active_branch branch = target_branch.name - + commits = list(repo.iter_commits(target_branch, max_count=limit)) - + output = f"šŸ“œ History for branch '{branch}' (last {min(len(commits), limit)} commits):\n\n" - + for commit in commits: # Format commit date date_str = commit.committed_datetime.strftime("%Y-%m-%d %H:%M") - + # Get short hash short_hash = commit.hexsha[:7] - + # Get commit message (first line only) message = str(commit.message).strip().split('\n')[0] - + output += f" {short_hash} - {date_str}\n" output += f" {message}\n\n" - + return output except Exception as e: return f"āŒ Error getting history: {e}" @@ -538,34 +617,34 @@ def ctx_get_history(branch: str = "", limit: int = 10) -> str: @mcp.tool def ctx_search_content(query: str, file_pattern: str = "*") -> str: """Search for content within files in the ctx repository. - + Args: query: Text to search for file_pattern: File pattern to search in (default: "*" for all files) - + Returns: Search results with file paths and line numbers """ ctx_root = core.get_active_ctx_path() if not ctx_root: return "āŒ Not in a ctx repository" - + try: import fnmatch - + matches = [] - + # Walk through all files in the repository for file_path in ctx_root.rglob('*'): # Skip directories and hidden files if file_path.is_dir() or file_path.name.startswith('.'): continue - + # Check if file matches pattern rel_path = file_path.relative_to(ctx_root) if not fnmatch.fnmatch(str(rel_path), file_pattern): continue - + try: with open(file_path, 'r', encoding='utf-8') as f: for line_num, line in enumerate(f, 1): @@ -578,20 +657,20 @@ def ctx_search_content(query: str, file_pattern: str = "*") -> str: except (UnicodeDecodeError, PermissionError): # Skip binary files or files we can't read continue - + if not matches: return f"šŸ” No matches found for '{query}'" - + output = f"šŸ” Found {len(matches)} matches for '{query}':\n\n" - + current_file = None for match in matches: if match['file'] != current_file: current_file = match['file'] output += f"šŸ“„ {current_file}:\n" - + output += f" Line {match['line']}: {match['content']}\n" - + return output except Exception as e: return f"āŒ Error searching content: {e}" @@ -603,4 +682,4 @@ def run_server(): mcp.run() if __name__ == "__main__": - run_server() \ No newline at end of file + run_server() \ No newline at end of file From c0e883da0a9a07a67eed1e046f2f942f0904b205 Mon Sep 17 00:00:00 2001 From: Jeremy Pinto Date: Mon, 18 Aug 2025 23:05:32 -0400 Subject: [PATCH 3/3] WIP: Add sync feature --- src/ctx_core.py | 7 +++---- src/main.py | 12 +++--------- src/mcp_server.py | 6 +++--- 3 files changed, 9 insertions(+), 16 deletions(-) diff --git a/src/ctx_core.py b/src/ctx_core.py index 80fd1c9..44d5175 100644 --- a/src/ctx_core.py +++ b/src/ctx_core.py @@ -1156,14 +1156,13 @@ def sync_remote(self, remote_name: str = "origin") -> OperationResult: data=sync_info ) else: - # Try to continue even if pull fails (might be first push) + # Try to continue even if pull fails (branch might not exist on remote) sync_info['pull_error'] = str(e) try: - # Push to remote - push_info = remote.push(current_branch) + # Push to remote with set-upstream + repo.git.push("--set-upstream", remote_name, repo.head.ref) sync_info['pushed'] = True - sync_info['push_info'] = str(push_info[0].flags) if push_info else "up-to-date" except GitCommandError as e: if "rejected" in str(e).lower(): diff --git a/src/main.py b/src/main.py index 04c4472..5cb8ad3 100644 --- a/src/main.py +++ b/src/main.py @@ -399,8 +399,8 @@ def remote(): pass @remote.command(name="add") -@click.argument('name') @click.argument('url') +@click.argument('name', required=False, default='origin') def remote_add(name, url): """Add a remote repository @@ -408,7 +408,7 @@ def remote_add(name, url): ctx remote add origin https://github.com/user/repo.git ctx remote add hf https://huggingface.co/datasets/user/ctx-repo """ - result = ctx_core.add_remote(name, url) + result = ctx_core.add_remote(url=url, name=name) if result.success: print_success_box(f"Remote '{name}' added successfully!", "šŸ”—") @@ -456,15 +456,9 @@ def sync(remote_name): if result.success: sync_data = result.data - print_success_box(f"Successfully synced with remote '{remote_name}'!", "šŸ”„") + print_success_box(f"Successfully synced branch {sync_data['branch']} with remote '{remote_name}'!", "šŸ”„") if sync_data: - click.echo(f" Branch: {sync_data.get('branch', 'unknown')}") - if sync_data.get('pulled'): - click.echo(f" āœ“ Pulled from remote") - if sync_data.get('pushed'): - click.echo(f" āœ“ Pushed to remote") - if sync_data.get('pull_error'): click.echo(f" āš ļø Pull had issues: {sync_data['pull_error']}") else: diff --git a/src/mcp_server.py b/src/mcp_server.py index cd99b4b..b68f31e 100644 --- a/src/mcp_server.py +++ b/src/mcp_server.py @@ -463,17 +463,17 @@ def ctx_remove(filepath: str, force: bool = False) -> str: # === Remote Repository Management Tools === @mcp.tool -def ctx_remote_add(name: str, url: str) -> str: +def ctx_remote_add(url: str, name: str = "origin") -> str: """Add a remote repository. Args: - name: Name for the remote (e.g., 'origin', 'hf', 'github') url: Git URL for the remote repository + name: Name for the remote (e.g., 'origin', 'hf', 'github') (default: 'origin') Returns: Success message or error """ - result = core.add_remote(name, url) + result = core.add_remote(url, name) if result.success: return f"šŸ”— {result.message}"