From 40f816008efd4c29246cecaacc638bfc99859003 Mon Sep 17 00:00:00 2001 From: djriffle Date: Thu, 22 May 2025 15:15:49 -0400 Subject: [PATCH 1/2] Created an Interactive Agent Script --- benchmarking/InteractiveAgentTester.py | 341 +++++++++++++++++++++++++ 1 file changed, 341 insertions(+) create mode 100644 benchmarking/InteractiveAgentTester.py diff --git a/benchmarking/InteractiveAgentTester.py b/benchmarking/InteractiveAgentTester.py new file mode 100644 index 0000000..61c995c --- /dev/null +++ b/benchmarking/InteractiveAgentTester.py @@ -0,0 +1,341 @@ +#!/usr/bin/env python3 +""" +Interactive Agent Tester (API Mode) +================================== +A drop‑in replacement for the previous *one‑shot* tester. This version keeps the +same execution model (GPT‑powered assistant that sends code which is executed in +an isolated Docker sandbox exposing a FastAPI kernel service) but removes the +hard limit on message turns and lets **you** steer the dialogue interactively. + +Key additions +------------- +* **Unlimited conversation** – after every turn you can type a follow‑up message + that is appended to the chat history before the next assistant call. +* **Resource upload** – copy any number of files or whole folders from the host + into the running sandbox (stored under */home/sandboxuser/resources/*). A + summary of the uploaded paths is automatically prepended to the system prompt + so the assistant knows what is available. + +Usage +----- +$ python interactive_agent_tester.py # guided TUI (Rich) + +Exit the interactive loop at any time by typing `exit`, `quit` or pressing +`Ctrl‑C`. +""" + +from __future__ import annotations + +import argparse +import base64 +import json +import os +import re +import shlex +import subprocess +import sys +import textwrap +import time +from datetime import datetime +from pathlib import Path +from typing import List, Tuple + +# --- Third‑party deps --------------------------------------------------------- +try: + from dotenv import load_dotenv + from openai import OpenAI, APIError + import requests + from rich.console import Console + from rich.markdown import Markdown + from rich.panel import Panel + from rich.prompt import Prompt, Confirm + from rich.syntax import Syntax + from rich.table import Table +except ImportError as e: # graceful fallback if Rich not installed + print(f"Missing dependency: {e}. Please install required packages.", file=sys.stderr) + sys.exit(1) + +# --- Local sandbox manager ---------------------------------------------------- +SCRIPT_DIR = Path(__file__).resolve().parent +sandbox_dir = SCRIPT_DIR / "sandbox" +sys.path.insert(0, str(sandbox_dir)) +try: + from benchmarking_sandbox_management import ( + SandboxManager, + CONTAINER_NAME as SANDBOX_CONTAINER_NAME, + API_PORT_HOST, + ) +finally: + sys.path.pop(0) + +# --- Constants ---------------------------------------------------------------- +DATASETS_DIR = SCRIPT_DIR / "datasets" +OUTPUTS_DIR = SCRIPT_DIR / "outputs" +ENV_FILE = SCRIPT_DIR / ".env" +SANDBOX_DATA_PATH = "/home/sandboxuser/data.h5ad" +SANDBOX_RESOURCES_DIR = "/home/sandboxuser/resources" +API_BASE_URL = f"http://localhost:{API_PORT_HOST}" +EXECUTE_ENDPOINT = f"{API_BASE_URL}/execute" +STATUS_ENDPOINT = f"{API_BASE_URL}/status" +console = Console() + +# ----------------------------------------------------------------------------- +# Utility helpers +# ----------------------------------------------------------------------------- + +def extract_python_code(text: str) -> str | None: + """Return the first ```python``` code block found in *text*, or *None*.""" + m = re.search(r"```python\s*([\s\S]+?)\s*```", text) + return m.group(1).strip() if m else None + + +def display(role: str, content: str) -> None: + """Pretty print a chat turn to the terminal using Rich formatting.""" + title_map = { + "system": "SYSTEM", + "user": "USER", + "assistant": "ASSISTANT", + } + style_map = { + "system": "dim blue", + "user": "cyan", + "assistant": "green", + } + title = title_map.get(role, role.upper()) + style = style_map.get(role, "white") + + if role == "assistant": + code = extract_python_code(content) + text_part = re.sub(r"```python[\s\S]+?```", "", content, count=1).strip() + if text_part: + console.print(Panel(text_part, title=f"{title} (text)", border_style=style)) + if code: + console.print(Panel(Syntax(code, "python", line_numbers=True), title=f"{title} (code)", border_style=style)) + else: + console.print(Panel(content, title=title, border_style=style)) + + +# ----------------------------------------------------------------------------- +# Prompts & selection helpers (unchanged except for small tweaks) +# ----------------------------------------------------------------------------- + +def get_initial_prompt() -> str: + console.print("[bold cyan]Enter the initial user prompt for the agent.[/bold cyan]") + console.print("Finish with Ctrl+D (Unix) / Ctrl+Z (Windows).") + try: + text = sys.stdin.read().strip() + except EOFError: + text = "" + if not text: + console.print("[red]Empty prompt. Aborting.[/red]") + sys.exit(1) + return text + + +def select_dataset() -> Tuple[Path, dict]: + if not DATASETS_DIR.is_dir(): + console.print(f"[red]Datasets directory not found: {DATASETS_DIR}[/red]") + sys.exit(1) + datasets = [] + for p in DATASETS_DIR.glob("*.h5ad"): + meta_path = p.with_suffix(".json") + if meta_path.exists(): + datasets.append((p, json.loads(meta_path.read_text()))) + if not datasets: + console.print("[red]No datasets found.[/red]") + sys.exit(1) + table = Table(title="Available datasets") + table.add_column("Idx", justify="right") + table.add_column("Name") + table.add_column("Cells", justify="right") + for i, (p, meta) in enumerate(datasets, 1): + table.add_row(str(i), meta.get("dataset_title", p.stem), str(meta.get("cell_count", "?"))) + console.print(table) + idx = int(Prompt.ask("Choose dataset index", choices=[str(i) for i in range(1, len(datasets) + 1)])) - 1 + return datasets[idx] + + +def collect_resources() -> List[Tuple[Path, str]]: + """Prompt user for files/folders to add to sandbox. Returns list of tuples + (host_path, container_path).""" + resources: List[Tuple[Path, str]] = [] + console.print("\n[bold cyan]Add extra resources to the sandbox (optional).[/bold cyan]") + console.print("Enter absolute or relative paths one per line. Leave empty line to finish.") + while True: + path_str = Prompt.ask("Path", default="").strip() + if not path_str: + break + path = Path(path_str).expanduser().resolve() + if not path.exists(): + console.print(f"[yellow]Path does not exist: {path}[/yellow]") + continue + container_dst = f"{SANDBOX_RESOURCES_DIR}/{path.name}" + resources.append((path, container_dst)) + return resources + + +# ----------------------------------------------------------------------------- +# API helpers +# ----------------------------------------------------------------------------- + +def api_alive(max_retries: int = 10, delay: float = 1.5) -> bool: + for _ in range(max_retries): + try: + if requests.get(STATUS_ENDPOINT, timeout=2).json().get("status") == "ok": + return True + except requests.RequestException: + time.sleep(delay) + return False + + +def format_execute_response(resp: dict) -> str: + out_lines = ["Code execution result:"] + std_out, std_err = [], [] + images = [] + for item in resp.get("outputs", []): + if item["type"] == "stream": + (std_out if item.get("name") == "stdout" else std_err).append(item.get("text", "")) + elif item["type"] == "error": + std_err.append("Error: " + item.get("evalue", "")) + std_err.extend(item.get("traceback", [])) + elif item["type"] == "display_data": + for mime, b64 in item.get("data", {}).items(): + if mime.startswith("image/"): + fname = OUTPUTS_DIR / f"{datetime.now():%Y%m%d_%H%M%S_%f}.{mime.split('/')[1].split('+')[0]}" + fname.parent.mkdir(exist_ok=True) + with open(fname, "wb") as fh: + fh.write(base64.b64decode(b64)) + images.append(str(fname)) + if std_out: + out_lines.append("--- STDOUT ---") + out_lines.append("".join(std_out)[:1500]) + if std_err: + out_lines.append("--- STDERR ---") + out_lines.append("".join(std_err)[:1500]) + if images: + out_lines.append("Saved images: " + ", ".join(images)) + out_lines.append(f"Final Status: {resp.get('final_status')}") + return "\n".join(out_lines) + + +# ----------------------------------------------------------------------------- +# Main interactive runner +# ----------------------------------------------------------------------------- + +def run_interactive(prompt: str, dataset: Path, metadata: dict, resources: List[Tuple[Path, str]]) -> None: + # 1. Start sandbox container + mgr = SandboxManager() + console.print("Starting sandbox container …") + if not mgr.start_container(): + console.print("[red]Failed to start container.[/red]") + return + + try: + # 2. Wait for kernel API + if not api_alive(): + console.print("[red]Kernel API did not become responsive.[/red]") + return + + # 3. Copy dataset + subprocess.run(["docker", "cp", str(dataset), f"{SANDBOX_CONTAINER_NAME}:{SANDBOX_DATA_PATH}"], check=True) + + # 4. Copy extra resources + for host_path, cont_path in resources: + subprocess.run(["docker", "cp", str(host_path), f"{SANDBOX_CONTAINER_NAME}:{cont_path}"], check=True) + + # 5. Build system prompt + resource_lines = [f"- {cpath} (from {hpath})" for hpath, cpath in resources] or ["- (none)"] + system_prompt = textwrap.dedent( + f""" + You are an AI assistant tasked with analysing a single‑cell transcriptomics dataset. + The dataset is available at **{SANDBOX_DATA_PATH}** inside the execution environment. + + Additional resources copied for this session:\n""" + "\n".join(resource_lines) + "\n\n" + textwrap.dedent( + f""" + Dataset metadata:\n{json.dumps(metadata, indent=2)} + + Always wrap executable Python in a single triple‑backtick block with the language spec *python*. + Variables and imports persist between executions. + """ + ) + ) + + # 6. Chat loop + history = [ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": prompt}, + ] + + display("system", system_prompt) + display("user", prompt) + + openai = OpenAI(api_key=os.getenv("OPENAI_API_KEY")) + attempt = 0 + while True: + attempt += 1 + console.print(f"\n[bold]OpenAI call (turn {attempt})…[/bold]") + try: + resp = openai.chat.completions.create( + model="gpt-4o", messages=history, temperature=0.7 + ) + except APIError as e: + console.print(f"[red]OpenAI error: {e}[/red]") + break + + assistant_msg = resp.choices[0].message.content + history.append({"role": "assistant", "content": assistant_msg}) + display("assistant", assistant_msg) + + # Execute any code + code = extract_python_code(assistant_msg) + if code: + console.print("[cyan]Executing code inside sandbox…[/cyan]") + try: + api_resp = requests.post(EXECUTE_ENDPOINT, json={"code": code, "timeout": 120}, timeout=130).json() + user_feedback = format_execute_response(api_resp) + except Exception as e: + user_feedback = f"Code execution result:\n[Execution error: {e}]" + history.append({"role": "user", "content": user_feedback}) + display("user", user_feedback) + + # Ask user for next input + console.print("\n[bold]Enter next message (blank to continue, 'exit' to quit):[/bold]") + try: + user_input = input().strip() + except (EOFError, KeyboardInterrupt): + user_input = "exit" + if user_input.lower() in {"exit", "quit"}: + console.print("[green]Ending session.[/green]") + break + if user_input: + history.append({"role": "user", "content": user_input}) + display("user", user_input) + # else: blank → assistant continues next loop + + finally: + console.print("Stopping sandbox…") + mgr.stop_container(remove=True) + + +# ----------------------------------------------------------------------------- +# CLI +# ----------------------------------------------------------------------------- + +def main() -> None: + load_dotenv(dotenv_path=ENV_FILE) + if not os.getenv("OPENAI_API_KEY"): + console.print(f"[red]OPENAI_API_KEY not found in {ENV_FILE}.[/red]") + sys.exit(1) + + console.print("[bold blue]Interactive Agent Tester (API Mode)[/bold blue]") + prompt = get_initial_prompt() + dataset_path, metadata = select_dataset() + extra_resources = collect_resources() + run_interactive(prompt, dataset_path, metadata, extra_resources) + + +if __name__ == "__main__": + try: + main() + except KeyboardInterrupt: + console.print("\nInterrupted. Goodbye.") From 3a7f9f96302865e1d47a11d5dba95cff0c29dc5f Mon Sep 17 00:00:00 2001 From: djriffle Date: Thu, 22 May 2025 16:04:00 -0400 Subject: [PATCH 2/2] Adding Singularity Support --- benchmarking/InteractiveAgentTester.py | 373 ++++++++---------- benchmarking/sandbox/Singularity | 123 ++++++ ...hmarking_sandbox_management_singularity.py | 211 ++++++++++ 3 files changed, 504 insertions(+), 203 deletions(-) create mode 100644 benchmarking/sandbox/Singularity create mode 100644 benchmarking/sandbox/benchmarking_sandbox_management_singularity.py diff --git a/benchmarking/InteractiveAgentTester.py b/benchmarking/InteractiveAgentTester.py index 61c995c..a4e81fa 100644 --- a/benchmarking/InteractiveAgentTester.py +++ b/benchmarking/InteractiveAgentTester.py @@ -1,29 +1,17 @@ #!/usr/bin/env python3 """ -Interactive Agent Tester (API Mode) -================================== -A drop‑in replacement for the previous *one‑shot* tester. This version keeps the -same execution model (GPT‑powered assistant that sends code which is executed in -an isolated Docker sandbox exposing a FastAPI kernel service) but removes the -hard limit on message turns and lets **you** steer the dialogue interactively. - -Key additions -------------- -* **Unlimited conversation** – after every turn you can type a follow‑up message - that is appended to the chat history before the next assistant call. -* **Resource upload** – copy any number of files or whole folders from the host - into the running sandbox (stored under */home/sandboxuser/resources/*). A - summary of the uploaded paths is automatically prepended to the system prompt - so the assistant knows what is available. - -Usage ------ -$ python interactive_agent_tester.py # guided TUI (Rich) - -Exit the interactive loop at any time by typing `exit`, `quit` or pressing -`Ctrl‑C`. -""" +Interactive Agent Tester (Docker **or** Singularity backend) +========================================================== +A unified interactive tester that can drive either the **Docker sandbox** (`benchmarking_sandbox_management.py`) +or the **Apptainer/Singularity sandbox** (`benchmarking_sandbox_management_singularity.py`). + +At launch you choose a backend: + • *docker* – requires Docker daemon on this machine. + • *singularity* – requires `apptainer`/`singularity`; no Docker needed. +The rest of the behaviour (multi‑turn GPT orchestration, FastAPI kernel execution, +resource upload, unlimited chat loop) is unchanged. +""" from __future__ import annotations import argparse @@ -40,143 +28,153 @@ from pathlib import Path from typing import List, Tuple -# --- Third‑party deps --------------------------------------------------------- +# ── Third‑party deps ───────────────────────────────────────────────────────── try: from dotenv import load_dotenv from openai import OpenAI, APIError import requests from rich.console import Console - from rich.markdown import Markdown from rich.panel import Panel - from rich.prompt import Prompt, Confirm + from rich.prompt import Prompt from rich.syntax import Syntax from rich.table import Table -except ImportError as e: # graceful fallback if Rich not installed - print(f"Missing dependency: {e}. Please install required packages.", file=sys.stderr) +except ImportError as e: + print(f"Missing dependency: {e}. Install required packages.", file=sys.stderr) sys.exit(1) -# --- Local sandbox manager ---------------------------------------------------- +console = Console() + +# ── Runtime‑backend selection (ask the user **before** importing managers) ── +backend = Prompt.ask("Choose sandbox backend", choices=["docker", "singularity"], default="docker") + SCRIPT_DIR = Path(__file__).resolve().parent -sandbox_dir = SCRIPT_DIR / "sandbox" -sys.path.insert(0, str(sandbox_dir)) -try: - from benchmarking_sandbox_management import ( - SandboxManager, - CONTAINER_NAME as SANDBOX_CONTAINER_NAME, - API_PORT_HOST, - ) -finally: - sys.path.pop(0) - -# --- Constants ---------------------------------------------------------------- + +if backend == "docker": + sandbox_dir = SCRIPT_DIR / "sandbox" + sys.path.insert(0, str(sandbox_dir)) + try: + from benchmarking_sandbox_management import ( + SandboxManager as _BackendManager, + CONTAINER_NAME as _SANDBOX_HANDLE, + API_PORT_HOST as _API_PORT, + ) + finally: + sys.path.pop(0) + COPY_CMD = lambda src, dst: subprocess.run(["docker", "cp", src, dst], check=True) + +elif backend == "singularity": + sandbox_dir = SCRIPT_DIR / "sandbox" + sys.path.insert(0, str(sandbox_dir)) + try: + import benchmarking_sandbox_management_singularity as sing + except ImportError as e: + console.print(f"[red]Failed to import Singularity manager: {e}[/red]") + sys.exit(1) + + class _SingWrapper: # thin adapter to mimic Docker SandboxManager API + def __init__(self): + pass + def start_container(self): + return sing.start_instance() + def stop_container(self, remove: bool = True, container_obj=None): + return sing.stop_instance() + _BackendManager = _SingWrapper + _SANDBOX_HANDLE = sing.INSTANCE_NAME + _API_PORT = sing.API_PORT_HOST + + # Apptainer/ Singularity lacks a simple cp, so we issue a warning and rely on bind‑mounts + def COPY_CMD(src, dst): # noqa: N802 + console.print(f"[yellow]File copy inside Singularity instance not automated.\n" + f"Ensure the file {src} is reachable at {dst} via bind mount or in the definition file.[/yellow]") + +else: + console.print("[red]Unknown backend choice.[/red]") + sys.exit(1) + +# ── Constants (after backend choice) ───────────────────────────────────────── DATASETS_DIR = SCRIPT_DIR / "datasets" OUTPUTS_DIR = SCRIPT_DIR / "outputs" ENV_FILE = SCRIPT_DIR / ".env" SANDBOX_DATA_PATH = "/home/sandboxuser/data.h5ad" SANDBOX_RESOURCES_DIR = "/home/sandboxuser/resources" -API_BASE_URL = f"http://localhost:{API_PORT_HOST}" +API_BASE_URL = f"http://localhost:{_API_PORT}" EXECUTE_ENDPOINT = f"{API_BASE_URL}/execute" STATUS_ENDPOINT = f"{API_BASE_URL}/status" -console = Console() -# ----------------------------------------------------------------------------- -# Utility helpers -# ----------------------------------------------------------------------------- -def extract_python_code(text: str) -> str | None: - """Return the first ```python``` code block found in *text*, or *None*.""" - m = re.search(r"```python\s*([\s\S]+?)\s*```", text) +# ── Helper utilities ──────────────────────────────────────────────────────── + +def extract_python_code(txt: str) -> str | None: + m = re.search(r"```python\s*([\s\S]+?)\s*```", txt) return m.group(1).strip() if m else None def display(role: str, content: str) -> None: - """Pretty print a chat turn to the terminal using Rich formatting.""" - title_map = { - "system": "SYSTEM", - "user": "USER", - "assistant": "ASSISTANT", - } - style_map = { - "system": "dim blue", - "user": "cyan", - "assistant": "green", - } - title = title_map.get(role, role.upper()) - style = style_map.get(role, "white") + titles = {"system": "SYSTEM", "user": "USER", "assistant": "ASSISTANT"} + styles = {"system": "dim blue", "user": "cyan", "assistant": "green"} + title = titles.get(role, role.upper()) + style = styles.get(role, "white") if role == "assistant": code = extract_python_code(content) - text_part = re.sub(r"```python[\s\S]+?```", "", content, count=1).strip() - if text_part: - console.print(Panel(text_part, title=f"{title} (text)", border_style=style)) + txt = re.sub(r"```python[\s\S]+?```", "", content, count=1).strip() + if txt: + console.print(Panel(txt, title=f"{title} (text)", border_style=style)) if code: console.print(Panel(Syntax(code, "python", line_numbers=True), title=f"{title} (code)", border_style=style)) else: console.print(Panel(content, title=title, border_style=style)) -# ----------------------------------------------------------------------------- -# Prompts & selection helpers (unchanged except for small tweaks) -# ----------------------------------------------------------------------------- +# ── Dataset & prompt helpers ──────────────────────────────────────────────── def get_initial_prompt() -> str: - console.print("[bold cyan]Enter the initial user prompt for the agent.[/bold cyan]") - console.print("Finish with Ctrl+D (Unix) / Ctrl+Z (Windows).") + console.print("[bold cyan]Enter the initial user prompt (Ctrl+D to finish):[/bold cyan]") try: - text = sys.stdin.read().strip() + txt = sys.stdin.read().strip() except EOFError: - text = "" - if not text: + txt = "" + if not txt: console.print("[red]Empty prompt. Aborting.[/red]") sys.exit(1) - return text + return txt def select_dataset() -> Tuple[Path, dict]: - if not DATASETS_DIR.is_dir(): - console.print(f"[red]Datasets directory not found: {DATASETS_DIR}[/red]") + if not DATASETS_DIR.exists(): + console.print(f"[red]Datasets dir not found: {DATASETS_DIR}[/red]") sys.exit(1) - datasets = [] - for p in DATASETS_DIR.glob("*.h5ad"): - meta_path = p.with_suffix(".json") - if meta_path.exists(): - datasets.append((p, json.loads(meta_path.read_text()))) - if not datasets: + items = [(p, json.loads(p.with_suffix(".json").read_text())) for p in DATASETS_DIR.glob("*.h5ad") if p.with_suffix(".json").exists()] + if not items: console.print("[red]No datasets found.[/red]") sys.exit(1) - table = Table(title="Available datasets") - table.add_column("Idx", justify="right") - table.add_column("Name") - table.add_column("Cells", justify="right") - for i, (p, meta) in enumerate(datasets, 1): - table.add_row(str(i), meta.get("dataset_title", p.stem), str(meta.get("cell_count", "?"))) - console.print(table) - idx = int(Prompt.ask("Choose dataset index", choices=[str(i) for i in range(1, len(datasets) + 1)])) - 1 - return datasets[idx] + tbl = Table(title="Datasets") + tbl.add_column("Idx", justify="right") + tbl.add_column("Name") + tbl.add_column("Cells", justify="right") + for i, (p, meta) in enumerate(items, 1): + tbl.add_row(str(i), meta.get("dataset_title", p.stem), str(meta.get("cell_count", "?"))) + console.print(tbl) + idx = int(Prompt.ask("Choose index", choices=[str(i) for i in range(1, len(items)+1)])) - 1 + return items[idx] def collect_resources() -> List[Tuple[Path, str]]: - """Prompt user for files/folders to add to sandbox. Returns list of tuples - (host_path, container_path).""" - resources: List[Tuple[Path, str]] = [] - console.print("\n[bold cyan]Add extra resources to the sandbox (optional).[/bold cyan]") - console.print("Enter absolute or relative paths one per line. Leave empty line to finish.") + console.print("\n[bold cyan]Optional: list files/folders to copy into sandbox[/bold cyan] (blank line to finish)") + lst: List[Tuple[Path, str]] = [] while True: - path_str = Prompt.ask("Path", default="").strip() - if not path_str: + p = Prompt.ask("Path", default="").strip() + if not p: break - path = Path(path_str).expanduser().resolve() + path = Path(p).expanduser().resolve() if not path.exists(): console.print(f"[yellow]Path does not exist: {path}[/yellow]") continue - container_dst = f"{SANDBOX_RESOURCES_DIR}/{path.name}" - resources.append((path, container_dst)) - return resources + lst.append((path, f"{SANDBOX_RESOURCES_DIR}/{path.name}")) + return lst -# ----------------------------------------------------------------------------- -# API helpers -# ----------------------------------------------------------------------------- +# ── FastAPI kernel helpers ────────────────────────────────────────────────── def api_alive(max_retries: int = 10, delay: float = 1.5) -> bool: for _ in range(max_retries): @@ -189,153 +187,122 @@ def api_alive(max_retries: int = 10, delay: float = 1.5) -> bool: def format_execute_response(resp: dict) -> str: - out_lines = ["Code execution result:"] - std_out, std_err = [], [] - images = [] - for item in resp.get("outputs", []): - if item["type"] == "stream": - (std_out if item.get("name") == "stdout" else std_err).append(item.get("text", "")) - elif item["type"] == "error": - std_err.append("Error: " + item.get("evalue", "")) - std_err.extend(item.get("traceback", [])) - elif item["type"] == "display_data": - for mime, b64 in item.get("data", {}).items(): + lines = ["Code execution result:"] + stdout, stderr, imgs = [], [], [] + for itm in resp.get("outputs", []): + if itm["type"] == "stream": + (stdout if itm.get("name") == "stdout" else stderr).append(itm.get("text", "")) + elif itm["type"] == "error": + stderr.append("Error: " + itm.get("evalue", "")) + stderr.extend(itm.get("traceback", [])) + elif itm["type"] == "display_data": + for mime, b64 in itm.get("data", {}).items(): if mime.startswith("image/"): fname = OUTPUTS_DIR / f"{datetime.now():%Y%m%d_%H%M%S_%f}.{mime.split('/')[1].split('+')[0]}" fname.parent.mkdir(exist_ok=True) - with open(fname, "wb") as fh: - fh.write(base64.b64decode(b64)) - images.append(str(fname)) - if std_out: - out_lines.append("--- STDOUT ---") - out_lines.append("".join(std_out)[:1500]) - if std_err: - out_lines.append("--- STDERR ---") - out_lines.append("".join(std_err)[:1500]) - if images: - out_lines.append("Saved images: " + ", ".join(images)) - out_lines.append(f"Final Status: {resp.get('final_status')}") - return "\n".join(out_lines) - - -# ----------------------------------------------------------------------------- -# Main interactive runner -# ----------------------------------------------------------------------------- + with open(fname, "wb") as f: + f.write(base64.b64decode(b64)) + imgs.append(str(fname)) + if stdout: + lines += ["--- STDOUT ---", "".join(stdout)[:1500]] + if stderr: + lines += ["--- STDERR ---", "".join(stderr)[:1500]] + if imgs: + lines.append("Saved images: " + ", ".join(imgs)) + lines.append(f"Final Status: {resp.get('final_status')}") + return "\n".join(lines) + + +# ── Chat‑runner ───────────────────────────────────────────────────────────── def run_interactive(prompt: str, dataset: Path, metadata: dict, resources: List[Tuple[Path, str]]) -> None: - # 1. Start sandbox container - mgr = SandboxManager() - console.print("Starting sandbox container …") + mgr = _BackendManager() + console.print(f"Starting sandbox ({backend}) …") if not mgr.start_container(): - console.print("[red]Failed to start container.[/red]") + console.print("[red]Failed to start sandbox.[/red]") return try: - # 2. Wait for kernel API if not api_alive(): - console.print("[red]Kernel API did not become responsive.[/red]") + console.print("[red]Kernel API not responsive.[/red]") return + # dataset copy (Docker only, Singularity warns via COPY_CMD) + COPY_CMD(str(dataset), f"{_SANDBOX_HANDLE}:{SANDBOX_DATA_PATH}") + for h, c in resources: + COPY_CMD(str(h), f"{_SANDBOX_HANDLE}:{c}") - # 3. Copy dataset - subprocess.run(["docker", "cp", str(dataset), f"{SANDBOX_CONTAINER_NAME}:{SANDBOX_DATA_PATH}"], check=True) - - # 4. Copy extra resources - for host_path, cont_path in resources: - subprocess.run(["docker", "cp", str(host_path), f"{SANDBOX_CONTAINER_NAME}:{cont_path}"], check=True) - - # 5. Build system prompt - resource_lines = [f"- {cpath} (from {hpath})" for hpath, cpath in resources] or ["- (none)"] - system_prompt = textwrap.dedent( + resource_lines = [f"- {c} (from {h})" for h, c in resources] or ["- (none)"] + sys_prompt = textwrap.dedent( f""" - You are an AI assistant tasked with analysing a single‑cell transcriptomics dataset. - The dataset is available at **{SANDBOX_DATA_PATH}** inside the execution environment. - - Additional resources copied for this session:\n""" + "\n".join(resource_lines) + "\n\n" + textwrap.dedent( - f""" - Dataset metadata:\n{json.dumps(metadata, indent=2)} - - Always wrap executable Python in a single triple‑backtick block with the language spec *python*. - Variables and imports persist between executions. - """ + You are an AI assistant analysing a single‑cell dataset. The file lives inside the sandbox at **{SANDBOX_DATA_PATH}**. + Additional resources:\n""" + "\n".join(resource_lines) + "\n\n" + textwrap.dedent( + f"Dataset metadata:\n{json.dumps(metadata, indent=2)}\n\nWrap runnable Python in triple‑backtick ```python blocks. Imports & vars persist.""" ) ) - # 6. Chat loop history = [ - {"role": "system", "content": system_prompt}, + {"role": "system", "content": sys_prompt}, {"role": "user", "content": prompt}, ] - - display("system", system_prompt) + display("system", sys_prompt) display("user", prompt) openai = OpenAI(api_key=os.getenv("OPENAI_API_KEY")) - attempt = 0 + turn = 0 while True: - attempt += 1 - console.print(f"\n[bold]OpenAI call (turn {attempt})…[/bold]") + turn += 1 + console.print(f"\n[bold]OpenAI call (turn {turn})…[/bold]") try: - resp = openai.chat.completions.create( - model="gpt-4o", messages=history, temperature=0.7 - ) + rsp = openai.chat.completions.create(model="gpt-4o", messages=history, temperature=0.7) except APIError as e: console.print(f"[red]OpenAI error: {e}[/red]") break - - assistant_msg = resp.choices[0].message.content + assistant_msg = rsp.choices[0].message.content history.append({"role": "assistant", "content": assistant_msg}) display("assistant", assistant_msg) - # Execute any code code = extract_python_code(assistant_msg) if code: - console.print("[cyan]Executing code inside sandbox…[/cyan]") + console.print("[cyan]Executing code…[/cyan]") try: - api_resp = requests.post(EXECUTE_ENDPOINT, json={"code": code, "timeout": 120}, timeout=130).json() - user_feedback = format_execute_response(api_resp) - except Exception as e: - user_feedback = f"Code execution result:\n[Execution error: {e}]" - history.append({"role": "user", "content": user_feedback}) - display("user", user_feedback) - - # Ask user for next input - console.print("\n[bold]Enter next message (blank to continue, 'exit' to quit):[/bold]") + api_r = requests.post(EXECUTE_ENDPOINT, json={"code": code, "timeout": 120}, timeout=130).json() + feedback = format_execute_response(api_r) + except Exception as exc: + feedback = f"Code execution result:\n[Execution error: {exc}]" + history.append({"role": "user", "content": feedback}) + display("user", feedback) + + console.print("\n[bold]Next message (blank = continue, 'exit' to quit):[/bold]") try: - user_input = input().strip() + user_in = input().strip() except (EOFError, KeyboardInterrupt): - user_input = "exit" - if user_input.lower() in {"exit", "quit"}: - console.print("[green]Ending session.[/green]") + user_in = "exit" + if user_in.lower() in {"exit", "quit"}: break - if user_input: - history.append({"role": "user", "content": user_input}) - display("user", user_input) - # else: blank → assistant continues next loop - + if user_in: + history.append({"role": "user", "content": user_in}) + display("user", user_in) finally: console.print("Stopping sandbox…") mgr.stop_container(remove=True) -# ----------------------------------------------------------------------------- -# CLI -# ----------------------------------------------------------------------------- +# ── CLI entry ─────────────────────────────────────────────────────────────── -def main() -> None: - load_dotenv(dotenv_path=ENV_FILE) +def main(): + load_dotenv(Path(ENV_FILE)) if not os.getenv("OPENAI_API_KEY"): - console.print(f"[red]OPENAI_API_KEY not found in {ENV_FILE}.[/red]") + console.print(f"[red]OPENAI_API_KEY not set in {ENV_FILE}.[/red]") sys.exit(1) - console.print("[bold blue]Interactive Agent Tester (API Mode)[/bold blue]") prompt = get_initial_prompt() - dataset_path, metadata = select_dataset() - extra_resources = collect_resources() - run_interactive(prompt, dataset_path, metadata, extra_resources) + data_p, meta = select_dataset() + res = collect_resources() + run_interactive(prompt, data_p, meta, res) if __name__ == "__main__": try: main() except KeyboardInterrupt: - console.print("\nInterrupted. Goodbye.") + console.print("\nInterrupted.") diff --git a/benchmarking/sandbox/Singularity b/benchmarking/sandbox/Singularity new file mode 100644 index 0000000..a4872e0 --- /dev/null +++ b/benchmarking/sandbox/Singularity @@ -0,0 +1,123 @@ +Bootstrap: docker +From: python:3.11-slim +Stage: spython-base + +%files +./requirements.txt /tmp/requirements.txt +--chown=${NB_USER}:${NB_GID} ${HOME}/kernel_api.py +./kernel_api.py ${HOME}/kernel_api.py +--chown=${NB_USER}:${NB_GID} ${HOME}/start_kernel.py +./start_kernel.py ${HOME}/start_kernel.py +--chown=${NB_USER}:${NB_GID} ${HOME}/start.sh +./start.sh ${HOME}/start.sh +%post +# Use official Python slim image based on Debian (adjust version if needed) + +# Set DEBIAN_FRONTEND to noninteractive to prevent interactive prompts +DEBIAN_FRONTEND=noninteractive + +# --- Install System Dependencies --- +# Combine apt-get operations into a single layer to leverage caching. +# This layer rarely changes unless system dependencies are added/removed. +# Install tini, tzdata, build tools, C libraries, and utilities. +apt-get update && \ +apt-get install -y --no-install-recommends \ +tini \ +tzdata \ +build-essential \ +pkg-config \ +libhdf5-dev \ +libsodium-dev \ +libzmq3-dev \ +gcc \ +g++ \ +sudo \ +curl \ +wget \ +git \ +vim \ +nano \ +unzip \ +zip \ +# Configure timezone +&& ln -fs /usr/share/zoneinfo/Etc/UTC /etc/localtime \ +&& dpkg-reconfigure --frontend noninteractive tzdata \ +# Clean up apt cache +&& apt-get clean \ +&& rm -rf /var/lib/apt/lists/* + +# --- Create Non-Root User & Group --- +# These arguments and user setup steps rarely change. +NB_USER="sandboxuser" +NB_UID=1001 +NB_GID=1001 +su - =${NB_USER} # USER=${NB_USER} +HOME=/home/${NB_USER} +# Add user's local bin to PATH early +PATH=${HOME}/.local/bin:${PATH} + +# Create group, user, add to sudoers (run as root) +groupadd -g ${NB_GID} ${NB_USER} && \ +useradd -m -s /bin/bash -u ${NB_UID} -g ${NB_GID} ${NB_USER} && \ +adduser ${NB_USER} sudo && \ +echo "${NB_USER} ALL=(ALL) NOPASSWD:ALL" >> /etc/sudoers + +# --- Install Python Dependencies --- + +# Install Python packages. This layer is cached if requirements.txt hasn't changed. +# Run pip installs as the target user to ensure correct permissions and paths. +# Switch user and set working directory *before* pip install --user. +su - ${NB_USER} # USER ${NB_USER} +mkdir -p ${HOME} +cd ${HOME} + +python -m pip install --no-cache-dir --upgrade pip --user && \ +python -m pip install --no-cache-dir --user \ +# Core Jupyter components (pin versions for stability) +ipython==8.12.0 \ +traitlets==5.9.0 \ +jupyter_client==8.3.0 \ +jupyter_core==5.3.1 \ +pyzmq==25.1.0 \ +tornado==6.3.2 \ +ipykernel==6.25.1 \ +# FastAPI dependencies +fastapi \ +uvicorn[standard] \ +python-multipart \ +# Install user requirements from the temporary location + -r /tmp/requirements.txt + +# --- Application Setup --- +# Copy application code and scripts AFTER dependencies are installed. +# Changes to these files will only invalidate the cache from this point. + +# Create user directories and make scripts executable in a single layer +mkdir -p ${HOME}/.local/share/jupyter \ +${HOME}/.ipython/profile_default/startup \ +${HOME}/.ipython/profile_default/static && \ +chmod +x ${HOME}/start_kernel.py ${HOME}/start.sh + +# --- Runtime Configuration --- +# Expose the FastAPI port (informational) +# EXPOSE 8000 + +# Set environment variable for kernel port (used by start_kernel.py) +IPY_BASE_PORT=4000 + +# Use tini as the entrypoint; it will execute the CMD +# Ensure tini installed via apt is in the default PATH or use /usr/bin/tini + +# Set the default command to run the startup script from user's home +%environment +export DEBIAN_FRONTEND=noninteractive +export USER=${NB_USER} +export HOME=/home/${NB_USER} +export PATH=${HOME}/.local/bin:${PATH} +export IPY_BASE_PORT=4000 +%runscript +cd ${HOME} +exec /usr/bin/tini -- /home/sandboxuser/start.sh "$@" +%startscript +cd ${HOME} +exec /usr/bin/tini -- /home/sandboxuser/start.sh "$@" diff --git a/benchmarking/sandbox/benchmarking_sandbox_management_singularity.py b/benchmarking/sandbox/benchmarking_sandbox_management_singularity.py new file mode 100644 index 0000000..f6db849 --- /dev/null +++ b/benchmarking/sandbox/benchmarking_sandbox_management_singularity.py @@ -0,0 +1,211 @@ +#!/usr/bin/env python3 +"""Singularity Sandbox Manager (Docker‑free) +========================================== +Pure‑Singularity version that **does not require Docker at all**. It expects a +`Singularity` (definition) file in the same directory and builds a `sandbox.sif` +from it when necessary. + +Commands (same as before) +------------------------- + build – build `sandbox.sif` from the local Singularity file + start – start an *instance* exposing the FastAPI kernel on host port 8000 + stop – stop & remove the instance + status – show instance + port status + logs – tail the instance log (default 50 lines) + +Run with no args for an interactive REPL. +""" +from __future__ import annotations + +import argparse +import logging +import os +import shlex +import shutil +import subprocess +import sys +import time +from pathlib import Path + +# --------------------------------------------------------------------------- +# Paths & constants +# --------------------------------------------------------------------------- +SCRIPT_DIR = Path(__file__).resolve().parent +DEF_FILE = SCRIPT_DIR / "Singularity" # definition file expected here +SIF_PATH = SCRIPT_DIR / "sandbox.sif" # output image +INSTANCE_NAME = "benchmarking_sandbox_instance" +API_PORT_INSIDE = 8000 +API_PORT_HOST = 8000 + +SING_BIN = shutil.which("apptainer") or shutil.which("singularity") +if not SING_BIN: + print("Singularity/Apptainer executable not found in PATH. Do you need to load a module?", file=sys.stderr) + sys.exit(1) + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def run(cmd: list[str], *, capture: bool = False, check: bool = True): + logging.debug("$ %s", " ".join(shlex.quote(c) for c in cmd)) + return subprocess.run(cmd, text=True, capture_output=capture, check=check) + + +def build_sif(rebuild: bool = False) -> bool: + """Build sandbox.sif from local Singularity def file if needed.""" + if not DEF_FILE.exists(): + logging.error("Definition file not found: %s", DEF_FILE) + return False + if SIF_PATH.exists() and not rebuild: + logging.info("Using cached SIF: %s", SIF_PATH) + return True + + logging.info("Building %s from %s …", SIF_PATH, DEF_FILE) + cmd = [SING_BIN, "build", str(SIF_PATH), str(DEF_FILE)] + try: + run(cmd) + logging.info("Build finished ✓") + return True + except subprocess.CalledProcessError as e: + logging.error("Singularity build failed (%s)", e.returncode) + return False + + +def instance_running() -> bool: + try: + out = run([SING_BIN, "instance", "list"], capture=True).stdout + return INSTANCE_NAME in out + except subprocess.CalledProcessError: + return False + + +def start_instance(rebuild: bool = False) -> bool: + if instance_running(): + logging.warning("Instance already running – restarting…") + stop_instance() + + if not build_sif(rebuild=rebuild): + return False + + logging.info("Starting instance %s …", INSTANCE_NAME) + cmd = [ + SING_BIN, "instance", "start", + "--cleanenv", + "--net", + "--network-args", f"portmap={API_PORT_HOST}:tcp:{API_PORT_INSIDE}", + str(SIF_PATH), + INSTANCE_NAME, + ] + try: + run(cmd) + time.sleep(3) + if instance_running(): + logging.info("Instance running. Access API at http://localhost:%d", API_PORT_HOST) + return True + logging.error("Instance failed to appear in list.") + return False + except subprocess.CalledProcessError as e: + logging.error("Failed to start instance: %s", e) + return False + + +def stop_instance() -> bool: + if not instance_running(): + logging.info("Instance not running.") + return True + logging.info("Stopping instance %s …", INSTANCE_NAME) + try: + run([SING_BIN, "instance", "stop", INSTANCE_NAME]) + return True + except subprocess.CalledProcessError as e: + logging.error("Failed to stop instance: %s", e) + return False + + +def show_status(): + logging.info("Instance: %s", "running" if instance_running() else "stopped") + logging.info("API port (host): %d", API_PORT_HOST) + + +def show_logs(lines: int = 50): + if not instance_running(): + logging.warning("Instance not running.") + return + log_dir = Path.home() / ".apptainer" / "instances" / "logs" / os.getenv("USER", "") + log_file = log_dir / f"{INSTANCE_NAME}.log" + if not log_file.exists(): + logging.warning("Log file not found: %s", log_file) + return + print("\n--- logs ---") + print(run(["tail", "-n", str(lines), str(log_file)], capture=True).stdout) + print("------------") + +# --------------------------------------------------------------------------- +# Interactive REPL +# --------------------------------------------------------------------------- + +def repl(): + print("Singularity Sandbox Manager (type 'help')") + while True: + try: + line = input("cmd> ").strip() + except EOFError: + break + if not line: + continue + cmd, *args = shlex.split(line) + if cmd in {"exit", "quit"}: + break + elif cmd == "help": + print("build | start [--rebuild] | stop | status | logs [N] | exit") + elif cmd == "build": + rebuild = "--rebuild" in args + build_sif(rebuild=rebuild) + elif cmd == "start": + rebuild = "--rebuild" in args + start_instance(rebuild=rebuild) + elif cmd == "stop": + stop_instance() + elif cmd == "status": + show_status() + elif cmd == "logs": + n = int(args[0]) if args else 50 + show_logs(n) + else: + print("Unknown command.") + stop_instance() + +# --------------------------------------------------------------------------- +# CLI +# --------------------------------------------------------------------------- + +if __name__ == "__main__": + logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s") + + if len(sys.argv) == 1: + repl() + sys.exit(0) + + p = argparse.ArgumentParser("Singularity Sandbox Manager") + sp = p.add_subparsers(dest="cmd", required=True) + + sp.add_parser("build").add_argument("--rebuild", action="store_true") + sp.add_parser("start").add_argument("--rebuild", action="store_true") + sp.add_parser("stop") + sp.add_parser("status") + lp = sp.add_parser("logs") + lp.add_argument("n", nargs="?", type=int, default=50) + + a = p.parse_args() + ok = True + if a.cmd == "build": + ok = build_sif(rebuild=a.rebuild) + elif a.cmd == "start": + ok = start_instance(rebuild=a.rebuild) + elif a.cmd == "stop": + ok = stop_instance() + elif a.cmd == "status": + show_status() + elif a.cmd == "logs": + show_logs(a.n) + sys.exit(0 if ok else 1)