diff --git a/.DS_Store b/.DS_Store new file mode 100644 index 0000000..edcc40f Binary files /dev/null and b/.DS_Store differ diff --git a/benchmarking/.gitignore b/benchmarking/.gitignore index d21e39e..36f5389 100644 --- a/benchmarking/.gitignore +++ b/benchmarking/.gitignore @@ -3,3 +3,5 @@ __pycache__/ .DS_store outputs/ *.sif +*agent_systems/ +agent_systems/ \ No newline at end of file diff --git a/benchmarking/InteractiveAgentTester.py b/benchmarking/InteractiveAgentTester.py deleted file mode 100644 index d3b06b2..0000000 --- a/benchmarking/InteractiveAgentTester.py +++ /dev/null @@ -1,524 +0,0 @@ -#!/usr/bin/env python3 -""" -Interactive Agent Tester – Docker, Singularity‑API, or **Singularity‑Exec (offline‑REPL)** -======================================================================================= -Run a natural‑language chat loop that generates runnable Python, executes it inside a -container, and streams the results back. Works even on clusters where **no networking** -is allowed for Singularity by using a long‑lived REPL inside the container. - -Back‑ends ---------- -1. **docker** – Docker daemon + container with FastAPI kernel. -2. **singularity** – Singularity *instance* with FastAPI kernel. -3. **singularity-exec** – Long‑lived `singularity exec` REPL that talks to - `/opt/offline_kernel.py --repl` (no TCP). -""" -from __future__ import annotations - -import base64 -import json -import os -import re -import shlex -import subprocess -import sys -import tempfile -import textwrap -import time -import uuid -from datetime import datetime -from pathlib import Path -from typing import Dict, List, Optional, Tuple - -# ── 3rd‑party deps ────────────────────────────────────────────────────────── -try: - from dotenv import load_dotenv - from openai import OpenAI, APIError - import requests # only needed for networked back‑ends - from rich.console import Console - from rich.panel import Panel - from rich.prompt import Prompt - from rich.syntax import Syntax - from rich.table import Table -except ImportError as e: - print(f"Missing dependency: {e}. Install required packages.", file=sys.stderr) - sys.exit(1) - -console = Console() -SCRIPT_DIR = Path(__file__).resolve().parent -DATASETS_DIR = SCRIPT_DIR / "datasets" -OUTPUTS_DIR = SCRIPT_DIR / "outputs" -ENV_FILE = SCRIPT_DIR / ".env" - -# In‑container canonical paths -SANDBOX_DATA_PATH = "/workspace/dataset.h5ad" -SANDBOX_RESOURCES_DIR = "/workspace/resources" - -# ============================================================================== -# 1 · Choose back‑end BEFORE importing heavy managers -# ============================================================================== -backend = Prompt.ask( - "Choose sandbox backend", - choices=["docker", "singularity", "singularity-exec"], - default="docker", -) - -# Ask user whether to force‑update the sandbox image/SIF -force_refresh = ( - Prompt.ask( - "Force update sandbox environment?", choices=["y", "n"], default="n" - ).lower() - == "y" -) - -is_exec_mode = backend == "singularity-exec" - -# ----------------------------------------------------------------------------- -# 1a · Docker (FastAPI) back‑end -# ----------------------------------------------------------------------------- -if backend == "docker": - sandbox_dir = SCRIPT_DIR / "sandbox" - sys.path.insert(0, str(sandbox_dir)) - try: - from benchmarking_sandbox_management import ( - SandboxManager as _BackendManager, - CONTAINER_NAME as _SANDBOX_HANDLE, - IMAGE_NAME as _SANDBOX_IMAGE, # assume this constant exists - API_PORT_HOST as _API_PORT, - ) - finally: - sys.path.pop(0) - - # --- optional force‑refresh logic -------------------------------------- - if force_refresh: - console.print("[yellow]Forcing Docker sandbox refresh…[/yellow]") - # Stop & remove any running container gracefully - subprocess.run(["docker", "rm", "-f", _SANDBOX_HANDLE], check=False) - # Remove the sandbox image to ensure re‑pull/build - subprocess.run(["docker", "image", "rm", "-f", _SANDBOX_IMAGE], check=False) - console.print("[green]Docker image removed – it will be pulled/built on next start.[/green]") - - def COPY_CMD(src: str, dst: str): - subprocess.run(["docker", "cp", src, dst], check=True) - - EXECUTE_ENDPOINT = f"http://localhost:{_API_PORT}/execute" - STATUS_ENDPOINT = f"http://localhost:{_API_PORT}/status" - -# ----------------------------------------------------------------------------- -# 1b · Singularity instance (FastAPI) back‑end -# ----------------------------------------------------------------------------- -elif backend == "singularity": - sandbox_dir = SCRIPT_DIR / "sandbox" - sys.path.insert(0, str(sandbox_dir)) - try: - import benchmarking_sandbox_management_singularity as sing - finally: - sys.path.pop(0) - - # optional force‑refresh - if force_refresh: - console.print("[yellow]Forcing Singularity sandbox refresh…[/yellow]") - try: - sing.stop_instance() - except Exception: - pass # ignore if not running - if sing.SIF_PATH.exists(): - sing.SIF_PATH.unlink() - console.print( - f"[green]Deleted {sing.SIF_PATH.name} – it will be re‑downloaded on next start.[/green]" - ) - - class _SingInstanceWrapper: - def start_container(self): - return sing.start_instance() - - def stop_container(self): - return sing.stop_instance() - - _BackendManager = _SingInstanceWrapper - _SANDBOX_HANDLE = sing.INSTANCE_NAME - _API_PORT = sing.API_PORT_HOST - - def COPY_CMD(src: str, dst: str): - console.print( - f"[yellow]Singularity instance: ensure {src} is reachable at {dst} via bind mount.[/yellow]" - ) - - EXECUTE_ENDPOINT = f"http://localhost:{_API_PORT}/execute" - STATUS_ENDPOINT = f"http://localhost:{_API_PORT}/status" - -# ----------------------------------------------------------------------------- -# 1c · Singularity exec (offline‑REPL) back‑end -# ----------------------------------------------------------------------------- -elif backend == "singularity-exec": - sandbox_dir = SCRIPT_DIR / "sandbox" - sys.path.insert(0, str(sandbox_dir)) - try: - import benchmarking_sandbox_management_singularity as sing - finally: - sys.path.pop(0) - - # optional force‑refresh - if force_refresh: - console.print("[yellow]Forcing Singularity sandbox refresh…[/yellow]") - if sing.SIF_PATH.exists(): - sing.SIF_PATH.unlink() - console.print( - f"[green]Deleted {sing.SIF_PATH.name} – it will be re‑downloaded on next start.[/green]" - ) - - SIF_PATH = sing.SIF_PATH - SING_BIN = sing.SING_BIN - SENTINEL = "<<>>" - - class _SingExecBackend: - """Launch one long‑lived REPL inside the SIF and stream code to it.""" - - def __init__(self): - self._binds: List[str] = [] - self._proc: Optional[subprocess.Popen[str]] = None - - def set_data(self, dataset: Path, resources: List[Tuple[Path, str]]): - self._binds = [ - "--bind", - f"{dataset.resolve()}:{SANDBOX_DATA_PATH}", - ] - for host, cont in resources: - self._binds.extend(["--bind", f"{host.resolve()}:{cont}"]) - - # ------------------------------------------------------------------ - # Container lifecycle - # ------------------------------------------------------------------ - def start_container(self): - if self._proc: - return True # already running - if not sing.pull_sif_if_needed(): - return False - - cmd = [ - SING_BIN, - "exec", - "--containall", - "--cleanenv", - *self._binds, - str(SIF_PATH), - "python", - "/opt/offline_kernel.py", - "--repl", - ] - self._proc = subprocess.Popen( - cmd, - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - text=True, - bufsize=1, # line buffered - ) - # Wait for the REPL banner - ready_line = self._proc.stdout.readline().strip() - if ready_line != "__REPL_READY__": - console.print( - f"[red]REPL failed to start. Got: {ready_line}[/red]" - ) - self.stop_container() - return False - return True - - def stop_container(self): - if not self._proc: - return True - try: - if self._proc.stdin: - self._proc.stdin.close() - self._proc.terminate() - self._proc.wait(timeout=5) - except Exception: - self._proc.kill() - self._proc = None - return True - - # ------------------------------------------------------------------ - # Code execution - # ------------------------------------------------------------------ - def exec_code(self, code: str, timeout: int = 300) -> Dict: - if not self._proc: - raise RuntimeError("REPL not running") - assert self._proc.stdin and self._proc.stdout - - # Send code block + sentinel - self._proc.stdin.write(code) - if not code.endswith("\n"): - self._proc.stdin.write("\n") - self._proc.stdin.write(SENTINEL + "\n") - self._proc.stdin.flush() - - # Read exactly one JSON line - start_time = time.time() - while True: - if time.time() - start_time > timeout: - return { - "status": "timeout", - "stdout": "", - "stderr": "Execution timed out in REPL.", - "images": [], - } - line = self._proc.stdout.readline() - if not line: - continue - line = line.strip() - try: - return json.loads(line) - except json.JSONDecodeError: - # Non‑JSON noise; continue reading - continue - - _BackendManager = _SingExecBackend - - def COPY_CMD(src: str, dst: str): - console.print("[yellow]singularity-exec mode uses bind mounts instead of docker cp.[/yellow]") -else: - console.print("[red]Unknown backend.") - sys.exit(1) - -# ==================================================================================== -# 2 · Generic helpers (unchanged) -# ==================================================================================== - -def extract_python_code(txt: str) -> Optional[str]: - m = re.search(r"```python\s*([\s\S]+?)\s*```", txt) - return m.group(1).strip() if m else None - - -# Rich display wrappers - -def _panel(role: str, content: str): - titles = {"system": "SYSTEM", "user": "USER", "assistant": "ASSISTANT"} - styles = {"system": "dim blue", "user": "cyan", "assistant": "green"} - console.print(Panel(content, title=titles.get(role, role.upper()), border_style=styles.get(role, "white"))) - - -def display(role: str, content: str): - if role == "assistant": - code = extract_python_code(content) or "" - text_part = re.sub(r"```python[\s\S]+?```", "", content, count=1).strip() - if text_part: - _panel("assistant", text_part) - if code: - console.print( - Panel( - Syntax(code, "python", line_numbers=True), - title="ASSISTANT (code)", - border_style="green", - ) - ) - else: - _panel(role, content) - - -# ==================================================================================== -# 3 · Dataset / prompt helpers (unchanged) -# ==================================================================================== - -def get_initial_prompt() -> str: - console.print("[bold cyan]Enter the initial user prompt (Ctrl+D to finish):[/bold cyan]") - try: - txt = sys.stdin.read().strip() - except EOFError: - txt = "" - if not txt: - console.print("[red]Empty prompt – aborting.[/red]") - sys.exit(1) - return txt - - -def select_dataset() -> Tuple[Path, dict]: - if not DATASETS_DIR.exists(): - console.print(f"[red]Datasets dir not found: {DATASETS_DIR}[/red]") - sys.exit(1) - items = [ - (p, json.loads(p.with_suffix(".json").read_text())) - for p in DATASETS_DIR.glob("*.h5ad") - if p.with_suffix(".json").exists() - ] - if not items: - console.print("[red]No datasets found.[/red]") - sys.exit(1) - tbl = Table(title="Datasets") - tbl.add_column("Idx", justify="right") - tbl.add_column("Name") - tbl.add_column("Cells", justify="right") - for i, (p, meta) in enumerate(items, 1): - tbl.add_row(str(i), meta.get("dataset_title", p.stem), str(meta.get("cell_count", "?"))) - console.print(tbl) - idx = int(Prompt.ask("Choose index", choices=[str(i) for i in range(1, len(items) + 1)])) - 1 - return items[idx] - - -def collect_resources() -> List[Tuple[Path, str]]: - console.print("\n[bold cyan]Optional: paths to bind inside sandbox[/bold cyan] (blank line to finish)") - res: List[Tuple[Path, str]] = [] - while True: - p = Prompt.ask("Path", default="").strip() - if not p: - break - path = Path(p).expanduser().resolve() - if not path.exists(): - console.print(f"[yellow]Path does not exist: {path}[/yellow]") - continue - res.append((path, f"{SANDBOX_RESOURCES_DIR}/{path.name}")) - return res - - -# ==================================================================================== -# 4 · Networked FastAPI helpers (skipped for exec mode) -# ==================================================================================== - -def api_alive(max_retries: int = 10, delay: float = 1.5) -> bool: - if is_exec_mode: - return True # nothing to ping - for _ in range(max_retries): - try: - if requests.get(STATUS_ENDPOINT, timeout=2).json().get("status") == "ok": - return True - except Exception: - time.sleep(delay) - return False - - -def format_execute_response(resp: dict) -> str: - lines = ["Code execution result:"] - if resp.get("status") != "ok": - lines.append(f"[status: {resp.get('status')}]") - stdout, stderr = resp.get("stdout", ""), resp.get("stderr", "") - if stdout: - lines += ["--- STDOUT ---", stdout[:1500]] - if stderr: - lines += ["--- STDERR ---", stderr[:1500]] - img_paths = [] - for b64 in resp.get("images", []): - fname = OUTPUTS_DIR / f"{datetime.now():%Y%m%d_%H%M%S_%f}.png" - fname.parent.mkdir(exist_ok=True, parents=True) - with open(fname, "wb") as f: - f.write(base64.b64decode(b64)) - img_paths.append(str(fname)) - if img_paths: - lines.append("Saved images: " + ", ".join(img_paths)) - return "\n".join(lines) - - -# ==================================================================================== -# 5 · Main interactive loop (unchanged) -# ==================================================================================== - -def run_interactive(prompt: str, dataset: Path, metadata: dict, resources: List[Tuple[Path, str]]): - mgr = _BackendManager() - console.print(f"Starting sandbox ({backend}) …") - - # Tell exec back‑end where data/resources are (creates bind list) - if is_exec_mode and hasattr(mgr, "set_data"): - mgr.set_data(dataset, resources) - - if not mgr.start_container(): - console.print("[red]Failed to start sandbox.[/red]") - return - - if not api_alive(): - console.print("[red]Kernel API not responsive (networked back‑end).[/red]") - return - - # For docker / singularity‑instance we still *attempt* docker cp (no‑op or warning otherwise) - if not is_exec_mode: - COPY_CMD(str(dataset), f"{_SANDBOX_HANDLE}:{SANDBOX_DATA_PATH}") - for h, c in resources: - COPY_CMD(str(h), f"{_SANDBOX_HANDLE}:{c}") - - resource_lines = [f"- {c} (from {h})" for h, c in resources] or ["- (none)"] - sys_prompt = textwrap.dedent( - f""" - You are an AI assistant analysing a single‑cell dataset. - Dataset path inside container: **{SANDBOX_DATA_PATH}** - Additional resources:\n""" - + "\n".join(resource_lines) - + "\n\n" - + textwrap.dedent( - f"Dataset metadata:\n{json.dumps(metadata, indent=2)}\n\n" - "Wrap runnable Python in triple‑backtick ```python blocks. Imports & variables persist within the container session." - ) - ) - - history = [ - {"role": "system", "content": sys_prompt}, - {"role": "user", "content": prompt}, - ] - display("system", sys_prompt) - display("user", prompt) - - openai = OpenAI(api_key=os.getenv("OPENAI_API_KEY")) - turn = 0 - while True: - turn += 1 - console.print(f"\n[bold]OpenAI call (turn {turn})…[/bold]") - try: - rsp = openai.chat.completions.create( - model="gpt-4o", messages=history, temperature=0.7 - ) - except APIError as e: - console.print(f"[red]OpenAI error: {e}[/red]") - break - assistant_msg = rsp.choices[0].message.content - history.append({"role": "assistant", "content": assistant_msg}) - display("assistant", assistant_msg) - - code = extract_python_code(assistant_msg) - if code: - console.print("[cyan]Executing code…[/cyan]") - try: - if is_exec_mode: - exec_result = mgr.exec_code(code, timeout=300) - else: - exec_result = requests.post( - EXECUTE_ENDPOINT, json={"code": code, "timeout": 300}, timeout=310 - ).json() - feedback = format_execute_response(exec_result) - except Exception as exc: - feedback = f"Code execution result:\n[Execution error on host: {exc}]" - - history.append({"role": "user", "content": feedback}) - display("user", feedback) - - console.print("\n[bold]Next message (blank = continue, 'exit' to quit):[/bold]") - try: - user_in = input().strip() - except (EOFError, KeyboardInterrupt): - user_in = "exit" - if user_in.lower() in {"exit", "quit"}: - break - if user_in: - history.append({"role": "user", "content": user_in}) - display("user", user_in) - - console.print("Stopping sandbox…") - mgr.stop_container() - - -# ==================================================================================== -# 6 · Entry‑point -# ==================================================================================== - -def main(): - load_dotenv(Path(ENV_FILE)) - if not os.getenv("OPENAI_API_KEY"): - console.print(f"[red]OPENAI_API_KEY not set in {ENV_FILE}.[/red]") - sys.exit(1) - - prompt = get_initial_prompt() - data_p, meta = select_dataset() - resources = collect_resources() - run_interactive(prompt, data_p, meta, resources) - - -if __name__ == "__main__": - try: - main() - except KeyboardInterrupt: - console.print("\nInterrupted.") diff --git a/benchmarking/__init__.py b/benchmarking/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/benchmarking/agents/AgentSystem.py b/benchmarking/agents/AgentSystem.py new file mode 100644 index 0000000..c9fc0df --- /dev/null +++ b/benchmarking/agents/AgentSystem.py @@ -0,0 +1,141 @@ +import json +from typing import Dict, Optional + +class Command: + """Represents a command an agent can issue to a neighboring agent.""" + def __init__(self, name: str, target_agent: str, description: str): + self.name = name + self.target_agent = target_agent + self.description = description + + def __repr__(self) -> str: + return (f"Command(name='{self.name}', target='{self.target_agent}', " + f"desc='{self.description[:30]}...')") + +class Agent: + """Represents a single agent in the system.""" + def __init__(self, name: str, prompt: str, commands: Dict[str, Command]): + self.name = name + self.prompt = prompt + self.commands = commands + + def __repr__(self) -> str: + return f"Agent(name='{self.name}', commands={list(self.commands.keys())})" + + def get_full_prompt(self) -> str: + """Constructs the full prompt including command descriptions for the LLM.""" + full_prompt = self.prompt + if self.commands: + full_prompt += "\n\nYou can use the following commands to delegate tasks:" + for name, command in self.commands.items(): + full_prompt += f"\n- Command: `{name}`" + full_prompt += f"\n - Description: {command.description}" + full_prompt += f"\n - Target Agent: {command.target_agent}" + return full_prompt + +class AgentSystem: + """ + Loads and holds the entire agent system configuration from a JSON file, + representing the network of agents and their communication channels. + """ + def __init__(self, agents: Dict[str, Agent]): + self.agents = agents + + @classmethod + def load_from_json(cls, file_path: str) -> 'AgentSystem': + """Parses the JSON blueprint and builds the AgentSystem data structure.""" + print(f"Loading agent system from: {file_path}") + with open(file_path, 'r') as f: + config = json.load(f) + + agents: Dict[str, Agent] = {} + for agent_name, agent_data in config.get('agents', {}).items(): + commands: Dict[str, Command] = {} + for cmd_name, cmd_data in agent_data.get('neighbors', {}).items(): + command = Command( + name=cmd_name, + target_agent=cmd_data['target_agent'], + description=cmd_data['description'] + ) + commands[cmd_name] = command + + agent = Agent( + name=agent_name, + prompt=agent_data['prompt'], + commands=commands + ) + agents[agent_name] = agent + + print("Agent system loaded successfully.") + return cls(agents) + + def get_agent(self, name: str) -> Optional[Agent]: + """Retrieves an agent by its unique name.""" + return self.agents.get(name) + + def get_all_agents(self) -> Dict[str, Agent]: + """Returns a dictionary of all agents in the system.""" + return self.agents + + def get_insturctions(self) -> str: + """Generates a summary of the system's instructions for the LLM.""" + instructions = "You are part of a multi-agent system with the following agents:\n" + for agent in self.agents.values(): + instructions += f"\n- Agent: {agent.name}\n Prompt: {agent.prompt}\n" + if agent.commands: + instructions += " Commands:\n" + for cmd in agent.commands.values(): + instructions += f" - {cmd.name}: {cmd.description} (target: {cmd.target_agent})\n" + return instructions + + def __repr__(self) -> str: + return f"AgentSystem(agents={list(self.agents.keys())})" + +# --- Example Usage --- +if __name__ == '__main__': + # 1. Define the agent system blueprint in a JSON structure + SYSTEM_BLUEPRINT = { + "agents": { + "master_agent": { + "prompt": "You are the master agent. Your primary role is to analyze incoming user requests and delegate them to the appropriate specialist agent. You do not perform tasks yourself.", + "neighbors": { + "delegate_to_coder": { + "target_agent": "coder_agent", + "description": "Use this command for any request that involves writing, debugging, or explaining code." + }, + "delegate_to_researcher": { + "target_agent": "research_agent", + "description": "Use this command for any request that requires searching for information, summarizing articles, or answering general knowledge questions." + } + } + }, + "coder_agent": { + "prompt": "You are a specialist coder agent. Your job is to write high-quality, executable code based on the user's request. You do not delegate tasks.", + "neighbors": {} + }, + "research_agent": { + "prompt": "You are a specialist research agent. You fulfill user requests by finding and synthesizing information from reliable sources. You do not write code or delegate tasks.", + "neighbors": {} + } + } + } + + # 2. Write the blueprint to a file + file_path = 'system_blueprint.json' + with open(file_path, 'w') as f: + json.dump(SYSTEM_BLUEPRINT, f, indent=2) + + # 3. Load the blueprint into the AgentSystem data structure + agent_system = AgentSystem.load_from_json(file_path) + print("\n--- Loaded Agent System ---") + print(agent_system) + + # 4. Inspect a specific agent and its full prompt + print("\n--- Inspecting 'master_agent' ---") + master_agent = agent_system.get_agent('master_agent') + if master_agent: + print(f"Agent Name: {master_agent.name}") + print(f"Agent Commands: {master_agent.commands}") + print("\n--- Full Prompt for LLM ---") + print(master_agent.get_full_prompt()) + diff --git a/benchmarking/agents/__init__.py b/benchmarking/agents/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/benchmarking/agents/create_agent_system.py b/benchmarking/agents/create_agent_system.py new file mode 100644 index 0000000..6569268 --- /dev/null +++ b/benchmarking/agents/create_agent_system.py @@ -0,0 +1,156 @@ +import json +import os +from typing import Dict, Any + +# A simple class to hold ANSI color codes for terminal output +class Colors: + """A class to hold ANSI color codes for terminal output.""" + HEADER = '\033[95m' # Magenta + OKBLUE = '\033[94m' # Blue + OKCYAN = '\033[96m' # Cyan + OKGREEN = '\033[92m' # Green + WARNING = '\033[93m' # Yellow + FAIL = '\033[91m' # Red + ENDC = '\033[0m' # Reset to default + BOLD = '\033[1m' + UNDERLINE = '\033[4m' + +def get_output_directory() -> str: + """Asks the user for an output directory, with a default option.""" + default_dir = "benchmarking/agent_systems" + dir_prompt = f"{Colors.WARNING}Enter the output directory (press Enter to use '{default_dir}'): {Colors.ENDC}" + user_input = input(dir_prompt).strip() + return user_input or default_dir + +def define_agents() -> Dict[str, Dict[str, Any]]: + """Guides the user through defining all agents and their prompts.""" + agents = {} + print(f"\n{Colors.OKBLUE}--- Agent Definition ---{Colors.ENDC}") + print("Let's define your agents. Type 'done' when you have no more agents to add.") + + while True: + prompt_text = f"\n{Colors.WARNING}Enter a unique name for the agent (e.g., 'master_agent') or 'done': {Colors.ENDC}" + agent_name = input(prompt_text).strip() + + if agent_name.lower() == 'done': + if not agents: + print(f"{Colors.FAIL}No agents defined. Exiting.{Colors.ENDC}") + return {} + break + + if not agent_name: + print(f"{Colors.FAIL}Agent name cannot be empty. Please try again.{Colors.ENDC}") + continue + + if agent_name in agents: + print(f"{Colors.FAIL}Agent '{agent_name}' already exists. Please use a unique name.{Colors.ENDC}") + continue + + prompt = input(f"{Colors.WARNING}Enter the system prompt for '{Colors.OKCYAN}{agent_name}{Colors.WARNING}': {Colors.ENDC}").strip() + agents[agent_name] = {"prompt": prompt, "neighbors": {}} + print(f"{Colors.OKGREEN}Agent '{Colors.OKCYAN}{agent_name}{Colors.OKGREEN}' added successfully.{Colors.ENDC}") + + print(f"\n{Colors.OKBLUE}--- All Agents Defined ---{Colors.ENDC}") + for name in agents: + print(f"- {Colors.OKCYAN}{name}{Colors.ENDC}") + return agents + +def connect_agents(agents: Dict[str, Dict[str, Any]]) -> None: + """Guides the user through connecting agents to each other.""" + print(f"\n{Colors.OKBLUE}--- Agent Connection ---{Colors.ENDC}") + print("Now, let's define the connections (neighbors) between agents.") + print("Type 'done' at any point to finish connecting agents.") + + agent_names = list(agents.keys()) + if len(agent_names) < 2: + print("You need at least two agents to create a connection. Skipping this step.") + return + + while True: + print(f"\n{Colors.BOLD}Select the agent that will delegate the task (source agent).{Colors.ENDC}") + for i, name in enumerate(agent_names): + print(f" {i + 1}: {Colors.OKCYAN}{name}{Colors.ENDC}") + + source_choice_input = input(f"{Colors.WARNING}Enter the number of the source agent (or 'done'): {Colors.ENDC}").strip() + if source_choice_input.lower() == 'done': + break + + try: + source_idx = int(source_choice_input) - 1 + if not 0 <= source_idx < len(agent_names): + raise ValueError + source_agent_name = agent_names[source_idx] + except (ValueError, IndexError): + print(f"{Colors.FAIL}Invalid selection. Please enter a number from the list.{Colors.ENDC}") + continue + + print(f"\nSelected source agent: '{Colors.OKCYAN}{source_agent_name}{Colors.ENDC}'") + print(f"{Colors.BOLD}Select the agent to delegate to (target agent).{Colors.ENDC}") + + # Create a list of valid target choices to check against + valid_targets = [] + for i, name in enumerate(agent_names): + if name != source_agent_name: + print(f" {i + 1}: {Colors.OKCYAN}{name}{Colors.ENDC}") + valid_targets.append(name) + + target_choice_input = input(f"{Colors.WARNING}Enter the number of the target agent: {Colors.ENDC}").strip() + try: + target_idx = int(target_choice_input) - 1 + # Adjust index for display vs. actual list of agents + potential_target_name = agent_names[target_idx] + if potential_target_name not in valid_targets: + raise ValueError + target_agent_name = potential_target_name + except (ValueError, IndexError): + print(f"{Colors.FAIL}Invalid selection. Please enter a valid number for a different agent.{Colors.ENDC}") + continue + + delegation_command = input(f"{Colors.WARNING}Enter the delegation command name (e.g., 'delegate_to_coder'): {Colors.ENDC}").strip() + description = input(f"{Colors.WARNING}Enter the description for this delegation to '{Colors.OKCYAN}{target_agent_name}{Colors.WARNING}': {Colors.ENDC}").strip() + + # Add the neighbor connection to the source agent + agents[source_agent_name]["neighbors"][delegation_command] = { + "target_agent": target_agent_name, + "description": description + } + print(f"{Colors.OKGREEN}Successfully connected '{Colors.OKCYAN}{source_agent_name}{Colors.OKGREEN}' to '{Colors.OKCYAN}{target_agent_name}{Colors.OKGREEN}' via '{delegation_command}'.{Colors.ENDC}") + + +def save_configuration(agents_config: Dict[str, Any], output_dir: str) -> None: + """Saves the final configuration to a JSON file.""" + if not agents_config: + return + + final_structure = {"agents": agents_config} + + os.makedirs(output_dir, exist_ok=True) + + filename_prompt = f"\n{Colors.WARNING}Enter a filename for your agent system (e.g., 'my_research_team.json'): {Colors.ENDC}" + filename = input(filename_prompt).strip() + if not filename.endswith('.json'): + filename += '.json' + + file_path = os.path.join(output_dir, filename) + + try: + with open(file_path, 'w') as f: + json.dump(final_structure, f, indent=2) + print(f"\n{Colors.OKGREEN}{Colors.BOLD}Success! Agent configuration saved to: {file_path}{Colors.ENDC}") + except IOError as e: + print(f"\n{Colors.FAIL}Error: Could not save the file. {e}{Colors.ENDC}") + + +def main(): + """Main function to run the interactive agent builder.""" + print(f"{Colors.HEADER}{Colors.BOLD}--- Welcome to the Interactive Agent Configuration Builder ---{Colors.ENDC}") + output_directory = get_output_directory() + + agents_data = define_agents() + + if agents_data: + connect_agents(agents_data) + save_configuration(agents_data, output_directory) + +if __name__ == "__main__": + main() diff --git a/benchmarking/agents/system_blueprint.json b/benchmarking/agents/system_blueprint.json new file mode 100644 index 0000000..a2e2266 --- /dev/null +++ b/benchmarking/agents/system_blueprint.json @@ -0,0 +1,25 @@ +{ + "agents": { + "master_agent": { + "prompt": "You are the master agent. Your primary role is to analyze incoming user requests and delegate them to the appropriate specialist agent. You do not perform tasks yourself.", + "neighbors": { + "delegate_to_coder": { + "target_agent": "coder_agent", + "description": "Use this command for any request that involves writing, debugging, or explaining code." + }, + "delegate_to_researcher": { + "target_agent": "research_agent", + "description": "Use this command for any request that requires searching for information, summarizing articles, or answering general knowledge questions." + } + } + }, + "coder_agent": { + "prompt": "You are a specialist coder agent. Your job is to write high-quality, executable code based on the user's request. You do not delegate tasks.", + "neighbors": {} + }, + "research_agent": { + "prompt": "You are a specialist research agent. You fulfill user requests by finding and synthesizing information from reliable sources. You do not write code or delegate tasks.", + "neighbors": {} + } + } +} \ No newline at end of file diff --git a/benchmarking/core/__init__.py b/benchmarking/core/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/benchmarking/core/io_helpers.py b/benchmarking/core/io_helpers.py new file mode 100644 index 0000000..6e4b549 --- /dev/null +++ b/benchmarking/core/io_helpers.py @@ -0,0 +1,144 @@ +from rich.console import Console +from rich.panel import Panel +from rich.prompt import Prompt +from rich.syntax import Syntax +from rich.table import Table +from typing import Optional +import re +import json +import sys +from pathlib import Path +from typing import Tuple, List +import textwrap +import base64 +from datetime import datetime + + + +def extract_python_code(txt: str) -> Optional[str]: + """Return the *first* fenced code block, or None if absent. + + Handles: + * ```python ... ``` + * ``` ... ``` (no language tag) + * Leading indentation before fences (common in Markdown transcripts) + """ + _FENCE_RE = re.compile( + r'^[ \t]*```(?:python)?[ \t]*\n' # opening fence, with optional "python" + r'([\s\S]*?)' # capture all lines (including blank ones) + r'^[ \t]*```[ \t]*$', # closing fence + re.MULTILINE + ) + match = _FENCE_RE.search(txt) + if not match: + return None + # Dedent to normalise indentation inside the block + code = textwrap.dedent(match.group(1)) + return code.strip() or None + +# Rich display wrappers + +def _panel(console, role: str, content: str): + titles = {"system": "SYSTEM", "user": "USER", "assistant": "ASSISTANT"} + styles = {"system": "dim blue", "user": "cyan", "assistant": "green"} + console.print(Panel(content, title=titles.get(role, role.upper()), border_style=styles.get(role, "white"))) + +def display(console, role: str, content: str): + if "assistant" in role.lower(): + code = extract_python_code(content) or "" + text_part = re.sub(r"```python[\s\S]+?```", "", content, count=1).strip() + if text_part: + _panel(console, "assistant", text_part) + if code: + console.print( + Panel( + Syntax(code, "python", line_numbers=True), + title="ASSISTANT (code)", + border_style="green", + ) + ) + else: + _panel(console, role, content) + +def select_dataset(console, dataset_dir) -> Tuple[Path, dict]: + if not dataset_dir.exists(): + console.print(f"[red]Datasets dir not found: {dataset_dir}[/red]") + sys.exit(1) + items = [ + (p, json.loads(p.with_suffix(".json").read_text())) + for p in dataset_dir.glob("*.h5ad") + if p.with_suffix(".json").exists() + ] + if not items: + console.print("[red]No datasets found.[/red]") + sys.exit(1) + tbl = Table(title="Datasets") + tbl.add_column("Idx", justify="right") + tbl.add_column("Name") + tbl.add_column("Cells", justify="right") + for i, (p, meta) in enumerate(items, 1): + tbl.add_row(str(i), meta.get("dataset_title", p.stem), str(meta.get("cell_count", "?"))) + console.print(tbl) + idx = int(Prompt.ask("Choose index", choices=[str(i) for i in range(1, len(items) + 1)])) - 1 + return items[idx] + +def get_initial_prompt(console) -> str: + console.print("[bold cyan]Enter the initial user prompt (Ctrl+D to finish):[/bold cyan]") + try: + txt = sys.stdin.read().strip() + except EOFError: + txt = "" + if not txt: + console.print("[red]Empty prompt – aborting.[/red]") + sys.exit(1) + return txt + +def collect_resources(console, sandbox_sources_dir) -> List[Tuple[Path, str]]: + console.print("\n[bold cyan]Optional: paths to bind inside sandbox[/bold cyan] (blank line to finish)") + res: List[Tuple[Path, str]] = [] + while True: + p = Prompt.ask("Path", default="").strip() + if not p: + break + path = Path(p).expanduser().resolve() + if not path.exists(): + console.print(f"[yellow]Path does not exist: {path}[/yellow]") + continue + res.append((path, f"{sandbox_sources_dir}/{path.name}")) + return res + + +def format_execute_response(resp: dict, output_dir) -> str: + lines = ["Code execution result:"] + print(f"Response: {resp}") + if resp.get("final_status") != "ok": + lines.append(f"[status: {resp.get('status')}]") + #if the key outputs in in resp we get the second dictionary + if 'outputs' in resp: + outputs = resp['outputs'] + resp = outputs[1] + stdout, stderr, text = resp.get("stdout", ""), resp.get("stderr", ""), resp.get("text", "") + error = False + if resp.get("type") == "error": + error = resp.get("evalue", "") + traceback = resp.get("traceback", "") + if traceback: + error += "\n" + traceback + if text and not error: + lines += ["--- TEXT ---", text[:1500]] + if stdout: + lines += ["--- STDOUT ---", stdout[:1500]] + if stderr: + lines += ["--- STDERR ---", stderr[:1500]] + if error: + lines += ["--- ERROR ---", error[:1500]] + img_paths = [] + for b64 in resp.get("images", []): + fname = output_dir / f"{datetime.now():%Y%m%d_%H%M%S_%f}.png" + fname.parent.mkdir(exist_ok=True, parents=True) + with open(fname, "wb") as f: + f.write(base64.b64decode(b64)) + img_paths.append(str(fname)) + if img_paths: + lines.append("Saved images: " + ", ".join(img_paths)) + return "\n".join(lines) \ No newline at end of file diff --git a/benchmarking/core/sandbox_management.py b/benchmarking/core/sandbox_management.py new file mode 100644 index 0000000..0474b67 --- /dev/null +++ b/benchmarking/core/sandbox_management.py @@ -0,0 +1,200 @@ +import time +from typing import List, Tuple, Dict, Optional +from pathlib import Path +import json + +from benchmarking.sandbox.benchmarking_sandbox_management import ( + SandboxManager as _BackendManager, + CONTAINER_NAME as _SANDBOX_HANDLE, + IMAGE_TAG as _SANDBOX_IMAGE, + API_PORT_HOST as _API_PORT, +) + + +def init_docker(script_dir:str, subprocess, console, force_refresh:bool=False): + sandbox_dir = script_dir / "workspace" + # --- optional force‑refresh logic -------------------------------------- + if force_refresh: + console.print("[yellow]Forcing Docker sandbox refresh…[/yellow]") + # Stop & remove any running container gracefully + subprocess.run(["docker", "rm", "-f", _SANDBOX_HANDLE], check=False) + # Remove the sandbox image to ensure re‑pull/build + subprocess.run(["docker", "image", "rm", "-f", _SANDBOX_IMAGE], check=False) + console.print("[green]Docker image removed – it will be pulled/built on next start.[/green]") + + def COPY_CMD(src: str, dst: str): + subprocess.run(["docker", "cp", src, dst], check=True) + + # create sandbox directory in docker + EXECUTE_ENDPOINT = f"http://localhost:{_API_PORT}/execute" + STATUS_ENDPOINT = f"http://localhost:{_API_PORT}/status" + + return _BackendManager, _SANDBOX_HANDLE, COPY_CMD, EXECUTE_ENDPOINT, STATUS_ENDPOINT + +def init_singularity(script_dir:str, subprocess, console, force_refresh:bool=False): + import benchmarking.sandbox.benchmarking_sandbox_management_singularity as sing + sandbox_dir = script_dir / "sandbox" + + # optional force‑refresh + if force_refresh: + console.print("[yellow]Forcing Singularity sandbox refresh…[/yellow]") + try: + sing.stop_instance() + except Exception: + pass # ignore if not running + if sing.SIF_PATH.exists(): + sing.SIF_PATH.unlink() + console.print( + f"[green]Deleted {sing.SIF_PATH.name} – it will be re‑downloaded on next start.[/green]" + ) + + class _SingInstanceWrapper: + def start_container(self): + return sing.start_instance() + + def stop_container(self): + return sing.stop_instance() + + _BackendManager = _SingInstanceWrapper + _SANDBOX_HANDLE = sing.INSTANCE_NAME + _API_PORT = sing.API_PORT_HOST + + def COPY_CMD(src: str, dst: str): + console.print( + f"[yellow]Singularity instance: ensure {src} is reachable at {dst} via bind mount.[/yellow]" + ) + + EXECUTE_ENDPOINT = f"http://localhost:{_API_PORT}/execute" + STATUS_ENDPOINT = f"http://localhost:{_API_PORT}/status" + + return _BackendManager, _SANDBOX_HANDLE, COPY_CMD, EXECUTE_ENDPOINT, STATUS_ENDPOINT + + + +def init_singularity_exec(script_dir: str, sanbox_data_path, subprocess, console, force_refresh: bool = False): + import benchmarking.sandbox.benchmarking_sandbox_management_singularity as sing + sandbox_dir = script_dir / "sandbox" + + # optional force‑refresh + if force_refresh: + console.print("[yellow]Forcing Singularity sandbox refresh…[/yellow]") + if sing.SIF_PATH.exists(): + sing.SIF_PATH.unlink() + console.print( + f"[green]Deleted {sing.SIF_PATH.name} – it will be re‑downloaded on next start.[/green]" + ) + + SIF_PATH = sing.SIF_PATH + SING_BIN = sing.SING_BIN + SENTINEL = "<<>>" + + class _SingExecBackend: + """Launch one long‑lived REPL inside the SIF and stream code to it.""" + + def __init__(self): + self._binds: List[str] = [] + self._proc = None + + def set_data(self, dataset: Path, resources: List[Tuple[Path, str]]): + self._binds = [ + "--bind", + f"{dataset.resolve()}:{sanbox_data_path}", + ] + for host, cont in resources: + self._binds.extend(["--bind", f"{host.resolve()}:{cont}"]) + + # ------------------------------------------------------------------ + # Container lifecycle + # ------------------------------------------------------------------ + def start_container(self): + if self._proc: + return True # already running + if not sing.pull_sif_if_needed(): + return False + + cmd = [ + SING_BIN, + "exec", + "--containall", + "--cleanenv", + *self._binds, + str(SIF_PATH), + "python", + "/opt/offline_kernel.py", + "--repl", + ] + self._proc = subprocess.Popen( + cmd, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + bufsize=1, # line buffered + ) + # Wait for the REPL banner + ready_line = self._proc.stdout.readline().strip() + if ready_line != "__REPL_READY__": + console.print( + f"[red]REPL failed to start. Got: {ready_line}[/red]" + ) + self.stop_container() + return False + return True + + def stop_container(self): + if not self._proc: + return True + try: + if self._proc.stdin: + self._proc.stdin.close() + self._proc.terminate() + self._proc.wait(timeout=5) + except Exception: + self._proc.kill() + self._proc = None + return True + + # ------------------------------------------------------------------ + # Code execution + # ------------------------------------------------------------------ + def exec_code(self, code: str, timeout: int = 300) -> Dict: + if not self._proc: + raise RuntimeError("REPL not running") + assert self._proc.stdin and self._proc.stdout + + # Send code block + sentinel + self._proc.stdin.write(code) + if not code.endswith("\n"): + self._proc.stdin.write("\n") + self._proc.stdin.write(SENTINEL + "\n") + self._proc.stdin.flush() + + # Read exactly one JSON line + start_time = time.time() + while True: + if time.time() - start_time > timeout: + return { + "status": "timeout", + "stdout": "", + "stderr": "Execution timed out in REPL.", + "images": [], + } + line = self._proc.stdout.readline() + if not line: + continue + line = line.strip() + try: + return json.loads(line) + except json.JSONDecodeError: + # Non‑JSON noise; continue reading + continue + + _BackendManager = _SingExecBackend + + def COPY_CMD(src: str, dst: str): + console.print("[yellow]singularity-exec mode uses bind mounts instead of docker cp.[/yellow]") + + return _BackendManager, None, COPY_CMD, None, None + + + diff --git a/benchmarking/create_agent_system.sh b/benchmarking/create_agent_system.sh new file mode 100755 index 0000000..f64997b --- /dev/null +++ b/benchmarking/create_agent_system.sh @@ -0,0 +1,4 @@ +#!/usr/bin/env bash +# move *out* of benchmarking/ into its parent (Olaf/) +cd "$(dirname "$0")"/.. +python -m benchmarking.agents.create_agent_system "$@" \ No newline at end of file diff --git a/benchmarking/Evaluator.py b/benchmarking/prompt_testing/Evaluator.py similarity index 100% rename from benchmarking/Evaluator.py rename to benchmarking/prompt_testing/Evaluator.py diff --git a/benchmarking/prompt_testing/InteractiveAgentTester.py b/benchmarking/prompt_testing/InteractiveAgentTester.py new file mode 100644 index 0000000..68edfcb --- /dev/null +++ b/benchmarking/prompt_testing/InteractiveAgentTester.py @@ -0,0 +1,244 @@ +#!/usr/bin/env python3 +""" +Interactive Agent Tester – Docker, Singularity‑API, or **Singularity‑Exec (offline‑REPL)** +======================================================================================= +Run a natural‑language chat loop that generates runnable Python, executes it inside a +container, and streams the results back. Works even on clusters where **no networking** +is allowed for Singularity by using a long‑lived REPL inside the container. + +Back‑ends +--------- +1. **docker** – Docker daemon + container with FastAPI kernel. +2. **singularity** – Singularity *instance* with FastAPI kernel. +3. **singularity-exec** – Long‑lived `singularity exec` REPL that talks to + `/opt/offline_kernel.py --repl` (no TCP). +""" +from __future__ import annotations + +import base64 +import json +import os +import re +import shlex +import subprocess +import sys +import tempfile +import textwrap +import time +import uuid +from datetime import datetime +from pathlib import Path +from typing import Dict, List, Optional, Tuple + + +# ── 3rd‑party deps ────────────────────────────────────────────────────────── +try: + from dotenv import load_dotenv + from openai import OpenAI, APIError + import requests # only needed for networked back‑ends + from rich.console import Console + from rich.panel import Panel + from rich.prompt import Prompt + from rich.syntax import Syntax + from rich.table import Table +except ImportError as e: + print(f"Missing dependency: {e}. Install required packages.", file=sys.stderr) + sys.exit(1) + +# -- Local imports --------------------------------------------------------------- +from benchmarking.core.io_helpers import extract_python_code, display, select_dataset, collect_resources, get_initial_prompt, format_execute_response +from benchmarking.core.sandbox_management import init_docker, init_singularity, init_singularity_exec + + +console = Console() +SCRIPT_DIR = Path(__file__).resolve().parent +PARENT_DIR = SCRIPT_DIR.parent +DATASETS_DIR = PARENT_DIR / "datasets" +OUTPUTS_DIR = PARENT_DIR / "outputs" +ENV_FILE = PARENT_DIR / ".env" + +# In‑container canonical paths +SANDBOX_DATA_PATH = "/workspace/dataset.h5ad" +SANDBOX_RESOURCES_DIR = "/workspace/resources" + +# ============================================================================== +# 1 · Choose back‑end BEFORE importing heavy managers +# ============================================================================== +backend = Prompt.ask( + "Choose sandbox backend", + choices=["docker", "singularity", "singularity-exec"], + default="docker", +) + +# Ask user whether to force‑update the sandbox image/SIF +force_refresh = ( + Prompt.ask( + "Force update sandbox environment?", choices=["y", "n"], default="n" + ).lower() + == "y" +) + +is_exec_mode = backend == "singularity-exec" + +# ----------------------------------------------------------------------------- +# 1a · Docker (FastAPI) back‑end +# ----------------------------------------------------------------------------- +if backend == "docker": + _BackendManager, _SANDBOX_HANDLE, COPY_CMD, EXECUTE_ENDPOINT, STATUS_ENDPOINT = init_docker( + SCRIPT_DIR, subprocess, console, force_refresh + ) + SANDBOX_DATA_PATH = "dataset.h5ad" + +# ----------------------------------------------------------------------------- +# 1b · Singularity instance (FastAPI) back‑end +# ----------------------------------------------------------------------------- + +elif backend == "singularity": + _BackendManager, _SANDBOX_HANDLE, COPY_CMD, EXECUTE_ENDPOINT, STATUS_ENDPOINT = init_singularity( + SCRIPT_DIR, subprocess, console, force_refresh + ) +# ----------------------------------------------------------------------------- +# 1c · Singularity exec (offline‑REPL) back‑end +# ----------------------------------------------------------------------------- +elif backend == "singularity-exec": + _BackendManager, _SANDBOX_HANDLE, COPY_CMD, EXECUTE_ENDPOINT, STATUS_ENDPOINT = init_singularity_exec( + SCRIPT_DIR, SANDBOX_DATA_PATH, subprocess, console, force_refresh + ) +else: + console.print("[red]Unknown backend.") + sys.exit(1) + +# ==================================================================================== +# 4 · Networked FastAPI helpers (skipped for exec mode) +# ==================================================================================== + +def api_alive(max_retries: int = 10, delay: float = 1.5) -> bool: + if is_exec_mode: + return True # nothing to ping + for _ in range(max_retries): + try: + if requests.get(STATUS_ENDPOINT, timeout=2).json().get("status") == "ok": + return True + except Exception: + time.sleep(delay) + return False + + +# ==================================================================================== +# 5 · Main interactive loop (unchanged) +# ==================================================================================== + +def run_interactive(prompt: str, dataset: Path, metadata: dict, resources: List[Tuple[Path, str]]): + mgr = _BackendManager() + console.print(f"Starting sandbox ({backend}) …") + + # Tell exec back‑end where data/resources are (creates bind list) + if is_exec_mode and hasattr(mgr, "set_data"): + mgr.set_data(dataset, resources) + + if not mgr.start_container(): + console.print("[red]Failed to start sandbox.[/red]") + return + + if not api_alive(): + console.print("[red]Kernel API not responsive (networked back‑end).[/red]") + return + + # For docker / singularity‑instance we still *attempt* docker cp (no‑op or warning otherwise) + if not is_exec_mode: + COPY_CMD(str(dataset), f"{_SANDBOX_HANDLE}:{SANDBOX_DATA_PATH}") + for h, c in resources: + COPY_CMD(str(h), f"{_SANDBOX_HANDLE}:{c}") + + resource_lines = [f"- {c} (from {h})" for h, c in resources] or ["- (none)"] + sys_prompt = textwrap.dedent( + f""" + You are an AI assistant analysing a single‑cell dataset. + Dataset path inside container: **{SANDBOX_DATA_PATH}** + Additional resources:\n""" + + "\n".join(resource_lines) + + "\n\n" + + textwrap.dedent( + f"Dataset metadata:\n{json.dumps(metadata, indent=2)}\n\n" + "Wrap runnable Python in triple‑backtick ```python blocks. Imports & variables persist within the container session." + ) + ) + + history = [ + {"role": "system", "content": sys_prompt}, + {"role": "user", "content": prompt}, + ] + display(console, "system", sys_prompt) + display(console, "user", prompt) + + openai = OpenAI(api_key=os.getenv("OPENAI_API_KEY")) + turn = 0 + while True: + turn += 1 + console.print(f"\n[bold]OpenAI call (turn {turn})…[/bold]") + try: + rsp = openai.chat.completions.create( + model="gpt-4o", messages=history, temperature=0.7 + ) + except APIError as e: + console.print(f"[red]OpenAI error: {e}[/red]") + break + assistant_msg = rsp.choices[0].message.content + history.append({"role": "assistant", "content": assistant_msg}) + display(console, "assistant", assistant_msg) + + code = extract_python_code(assistant_msg) + if code: + console.print("[cyan]Executing code…[/cyan]") + try: + if is_exec_mode: + exec_result = mgr.exec_code(code, timeout=300) + else: + exec_result = requests.post( + EXECUTE_ENDPOINT, json={"code": code, "timeout": 300}, timeout=310 + ).json() + + feedback = format_execute_response(exec_result, OUTPUTS_DIR) + except Exception as exc: + feedback = f"Code execution result:\n[Execution error on host: {exc}]" + + history.append({"role": "user", "content": feedback}) + display(console, "user", feedback) + + console.print("\n[bold]Next message (blank = continue, 'exit' to quit):[/bold]") + try: + user_in = input().strip() + except (EOFError, KeyboardInterrupt): + user_in = "exit" + if user_in.lower() in {"exit", "quit"}: + break + if user_in: + history.append({"role": "user", "content": user_in}) + display(console, "user", user_in) + + console.print("Stopping sandbox…") + mgr.stop_container() + + +# ==================================================================================== +# 6 · Entry‑point +# ==================================================================================== + +def main(): + ENV_FILE = Path(__file__).resolve().parent.parent / ".env" + load_dotenv(Path(ENV_FILE)) + if not os.getenv("OPENAI_API_KEY"): + console.print(f"[red]OPENAI_API_KEY not set in {ENV_FILE}.[/red]") + sys.exit(1) + + prompt = get_initial_prompt(console) + data_p, meta = select_dataset(console, DATASETS_DIR) + resources = collect_resources(console, SANDBOX_RESOURCES_DIR) + run_interactive(prompt, data_p, meta, resources) + + +if __name__ == "__main__": + try: + main() + except KeyboardInterrupt: + console.print("\nInterrupted.") diff --git a/benchmarking/prompt_testing/MultiAgentTester.py b/benchmarking/prompt_testing/MultiAgentTester.py new file mode 100644 index 0000000..383ba53 --- /dev/null +++ b/benchmarking/prompt_testing/MultiAgentTester.py @@ -0,0 +1,251 @@ +#!/usr/bin/env python3 +""" +Interactive Agent System Tester (v1.1) +====================================== +• **New in v1.1** – Smarter delegation detection. + The router now recognises any of the following patterns in an assistant reply + when deciding to switch agents: + + ```text + //delegate_to_coder + delegate_to_coder + `delegate_to_coder` + Executing command: `delegate_to_coder` + ``` + + No need to rigidly start the reply with the token – the regex scans the whole + message. Once detected, we alert the user ("🔄 Routing to …") and prepend the + new agent’s system prompt. +""" +from __future__ import annotations + +import base64 +import json +import os +import re +import subprocess +import sys +import textwrap +import time +from datetime import datetime +from pathlib import Path +from typing import List, Tuple, Optional, Dict + +# ── Dependencies ------------------------------------------------------------ +try: + from dotenv import load_dotenv + from openai import OpenAI, APIError + import requests + from rich.console import Console + from rich.prompt import Prompt +except ImportError as e: + print(f"Missing dependency: {e}", file=sys.stderr) + sys.exit(1) + +# ── Agent framework --------------------------------------------------------- +try: + from benchmarking.agents.AgentSystem import AgentSystem, Agent +except ImportError: + print("[ERROR] Could not import backend.agents.agent_system", file=sys.stderr) + raise + +# ── Local helpers ----------------------------------------------------------- +from benchmarking.core.io_helpers import ( + extract_python_code, + display, + select_dataset, + collect_resources, + get_initial_prompt, + format_execute_response +) +from benchmarking.core.sandbox_management import ( + init_docker, + init_singularity, + init_singularity_exec, +) + +console = Console() +SCRIPT_DIR = Path(__file__).resolve().parent +PARENT_DIR = SCRIPT_DIR.parent +DATASETS_DIR = PARENT_DIR / "datasets" +OUTPUTS_DIR = PARENT_DIR / "outputs" +ENV_FILE = PARENT_DIR / ".env" + +SANDBOX_DATA_PATH = "/workspace/dataset.h5ad" +SANDBOX_RESOURCES_DIR = "/workspace/resources" + +# =========================================================================== +# 1 · Backend selection +# =========================================================================== +backend = Prompt.ask("Choose backend", choices=["docker", "singularity", "singularity-exec"], default="docker") +force_refresh = Prompt.ask("Force refresh environment?", choices=["y", "n"], default="n").lower() == "y" +is_exec_mode = backend == "singularity-exec" + +if backend == "docker": + _BackendManager, _SANDBOX_HANDLE, COPY_CMD, EXECUTE_ENDPOINT, STATUS_ENDPOINT = init_docker( + SCRIPT_DIR, subprocess, console, force_refresh + ) + SANDBOX_DATA_PATH = "dataset.h5ad" +elif backend == "singularity": + _BackendManager, _SANDBOX_HANDLE, COPY_CMD, EXECUTE_ENDPOINT, STATUS_ENDPOINT = init_singularity( + SCRIPT_DIR, subprocess, console, force_refresh + ) +elif backend == "singularity-exec": + _BackendManager, _SANDBOX_HANDLE, COPY_CMD, EXECUTE_ENDPOINT, STATUS_ENDPOINT = init_singularity_exec( + SCRIPT_DIR, SANDBOX_DATA_PATH, subprocess, console, force_refresh + ) +else: + console.print("[red]Unknown backend.") + sys.exit(1) + +# =========================================================================== +# 2 · Agent helpers +# =========================================================================== + +def load_agent_system() -> Tuple[AgentSystem, Agent, str]: + bp = Path(Prompt.ask("Blueprint JSON", default="system_blueprint.json")).expanduser() + if not bp.exists(): + console.print(f"[red]Blueprint {bp} not found.") + sys.exit(1) + system = AgentSystem.load_from_json(str(bp)) + driver_name = Prompt.ask("Driver agent", choices=list(system.agents.keys()), default=list(system.agents)[0]) + driver = system.get_agent(driver_name) + instr = system.get_insturctions() + return system, driver, instr + +# Smarter regex – matches inline/backtick/explicit styles +# Match variations like //delegate_to_coder, with optional punctuation. +_DELEG_RE = re.compile(r"delegate_to_([A-Za-z0-9_]+)") + +def detect_delegation(msg: str) -> Optional[str]: + """Return the *full* command name (e.g. 'delegate_to_coder') if present.""" + m = _DELEG_RE.search(msg) + return f"delegate_to_{m.group(1)}" if m else None + + +def api_alive(url: str, tries: int = 10) -> bool: + if is_exec_mode: + return True + for _ in range(tries): + try: + if requests.get(url, timeout=2).json().get("status") == "ok": + return True + except Exception: + time.sleep(1.5) + return False + +# =========================================================================== +# 3 · Interactive loop +# =========================================================================== + +def run(agent_system: AgentSystem, agent: Agent, roster_instr: str, dataset: Path, metadata: dict, resources: List[Tuple[Path, str]]): + mgr = _BackendManager() + console.print(f"Launching sandbox ({backend})…") + + if is_exec_mode and hasattr(mgr, "set_data"): + mgr.set_data(dataset, resources) + if not mgr.start_container(): + console.print("[red]Failed to start sandbox") + return + if not api_alive(STATUS_ENDPOINT): + console.print("[red]Kernel API not responsive.") + return + + if not is_exec_mode: + COPY_CMD(str(dataset), f"{_SANDBOX_HANDLE}:{SANDBOX_DATA_PATH}") + for hp, cp in resources: + COPY_CMD(str(hp), f"{_SANDBOX_HANDLE}:{cp}") + + res_lines = [f"- {c} (from {h})" for h, c in resources] or ["- (none)"] + analysis_ctx = textwrap.dedent( + f"Dataset path: **{SANDBOX_DATA_PATH}**\nResources:\n" + "\n".join(res_lines) + "\n\nMetadata:\n" + json.dumps(metadata, indent=2) + ) + + def build_system(a: Agent) -> str: + return roster_instr + "\n\n" + a.get_full_prompt() + "\n\n" + analysis_ctx + + history = [{"role": "system", "content": build_system(agent)}] + first_user = "Beginning interactive session. You can ask questions or give commands." + history.append({"role": "user", "content": first_user}) + display(console, "system", history[0]["content"]) + display(console, "user", first_user) + + openai = OpenAI(api_key=os.getenv("OPENAI_API_KEY")) + current_agent = agent + turn = 0 + + while True: + turn += 1 + console.print(f"\n[bold]OpenAI call (turn {turn})…") + try: + resp = openai.chat.completions.create(model="gpt-4o", messages=history, temperature=0.7) + except APIError as e: + console.print(f"[red]OpenAI error: {e}") + break + msg = resp.choices[0].message.content + history.append({"role": "assistant", "content": msg}) + display(console, f"assistant ({current_agent.name})", msg) + + cmd = detect_delegation(msg) + if cmd and cmd in current_agent.commands: + tgt = current_agent.commands[cmd].target_agent + new_agent = agent_system.get_agent(tgt) + if new_agent: + console.print(f"[yellow]🔄 Routing to '{tgt}' via {cmd}") + history.append({"role": "assistant", "content": f"🔄 Routing to **{tgt}** (command `{cmd}`)"}) + current_agent = new_agent + history.insert(0, {"role": "system", "content": build_system(new_agent)}) + continue + + code = extract_python_code(msg) + if code: + console.print("[cyan]Executing code…[/cyan]") + try: + if is_exec_mode: + exec_result = mgr.exec_code(code, timeout=300) + else: + exec_result = requests.post( + EXECUTE_ENDPOINT, json={"code": code, "timeout": 300}, timeout=310 + ).json() + feedback = format_execute_response(exec_result, OUTPUTS_DIR) + except Exception as exc: + feedback = f"Code execution result:\n[Execution error on host: {exc}]" + + history.append({"role": "user", "content": feedback}) + display(console, "user", feedback) + + console.print("\n[bold]Next message (blank = continue, 'exit' to quit):") + try: + user_in = input().strip() + except (EOFError, KeyboardInterrupt): + user_in = "exit" + if user_in.lower() in {"exit", "quit"}: + break + if user_in: + history.append({"role": "user", "content": user_in}) + display(console, "user", user_in) + + console.print("Stopping sandbox…") + mgr.stop_container() + +# =========================================================================== +# 4 · Entry point +# =========================================================================== + +def main(): + load_dotenv(ENV_FILE) + if not os.getenv("OPENAI_API_KEY"): + console.print("[red]OPENAI_API_KEY not set in .env") + sys.exit(1) + + sys, drv, roster = load_agent_system() + dp, meta = select_dataset(console, DATASETS_DIR) + res = collect_resources(console, SANDBOX_RESOURCES_DIR) + run(sys, drv, roster, dp, meta, res) + + +if __name__ == "__main__": + try: + main() + except KeyboardInterrupt: + console.print("\nInterrupted.") diff --git a/benchmarking/OneShotAgentTester.py b/benchmarking/prompt_testing/OneShotAgentTester.py similarity index 100% rename from benchmarking/OneShotAgentTester.py rename to benchmarking/prompt_testing/OneShotAgentTester.py diff --git a/benchmarking/PromptEvolver.py b/benchmarking/prompt_testing/PromptEvolver.py similarity index 100% rename from benchmarking/PromptEvolver.py rename to benchmarking/prompt_testing/PromptEvolver.py diff --git a/benchmarking/prompt_testing/__init__.py b/benchmarking/prompt_testing/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/benchmarking/run_interactive.sh b/benchmarking/run_interactive.sh new file mode 100755 index 0000000..0021c95 --- /dev/null +++ b/benchmarking/run_interactive.sh @@ -0,0 +1,4 @@ +#!/usr/bin/env bash +# move *out* of benchmarking/ into its parent (Olaf/) +cd "$(dirname "$0")"/.. +python -m benchmarking.prompt_testing.MultiAgentTester "$@" \ No newline at end of file diff --git a/benchmarking/sandbox/__init__.py b/benchmarking/sandbox/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/benchmarking/tools/__init__.py b/benchmarking/tools/__init__.py new file mode 100644 index 0000000..e69de29