From 534f2a6fc0c894229fe844c109167684fac7d31e Mon Sep 17 00:00:00 2001 From: Claude Date: Wed, 3 Jun 2026 01:17:48 +0000 Subject: [PATCH 1/7] Add multi-worker concurrency for page-level and session-level parallelism Introduce two layers of concurrency, available identically across CLI, web, and the Python API: - Page-level: TestConfig.workers (CLI --workers, web 'workers' body field) tests multiple pages of one run in parallel via a thread-safe Frontier, each worker owning its own browser/context. Defaults to 1 (unchanged sequential behaviour); capped at 16. Auth is performed once and replicated to every worker via Playwright storage_state. - Session-level: BatchRunner runs multiple independent sessions through a bounded thread pool. CLI exposes --batch-file/--pool-size; the web server now uses it instead of unbounded thread-per-job (QA_AGENT_JOB_POOL_SIZE). Also stop mutating the caller's TestConfig (deep-copy in QAAgent), route worker-thread stdout via a worker_thread_init hook so web SSE keeps working, and re-export the full public API from qa_agent. Adds unit tests for the Frontier/PageIndexer primitives, BatchRunner pool, parallel agent runs, plus integration smoke tests for the real sync-Playwright-in-threads path. --- CHANGELOG.md | 7 + README.md | 54 +++++ qa_agent/__init__.py | 31 +++ qa_agent/agent.py | 339 ++++++++++++++++++++++++-------- qa_agent/batch.py | 137 +++++++++++++ qa_agent/cli.py | 108 +++++++++- qa_agent/concurrency.py | 150 ++++++++++++++ qa_agent/config.py | 17 ++ qa_agent/web/server.py | 116 ++++++----- tests/conftest.py | 35 ++++ tests/integration/test_smoke.py | 48 +++++ tests/test_agent.py | 156 ++++++++++++++- tests/test_batch.py | 86 ++++++++ tests/test_concurrency.py | 145 ++++++++++++++ tests/web/test_server.py | 6 +- 15 files changed, 1301 insertions(+), 134 deletions(-) create mode 100644 qa_agent/batch.py create mode 100644 qa_agent/concurrency.py create mode 100644 tests/test_batch.py create mode 100644 tests/test_concurrency.py diff --git a/CHANGELOG.md b/CHANGELOG.md index dda971b..75fc566 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added + +- **Multi-worker concurrency** — two layers, available identically across CLI, web, and the Python API: + - *Page-level*: `TestConfig.workers` (CLI `--workers`, web `workers` in the `POST /api/run` body) tests multiple pages of a single run in parallel, each worker driving its own browser/context. Defaults to `1` (sequential, unchanged behaviour); capped at 16. Authentication is performed once and replicated to every worker via Playwright `storage_state`. + - *Session-level*: `BatchRunner` (`from qa_agent import BatchRunner`) runs multiple independent sessions through a bounded thread pool. The CLI exposes it via `--batch-file`/`--pool-size`; the web server now uses it instead of an unbounded thread-per-job model (`QA_AGENT_JOB_POOL_SIZE`, default 4). +- **Expanded public API** — `from qa_agent import QAAgent, TestConfig, BatchRunner, …` now re-exports the full public surface for library use. + ## [0.2.3] - 2026-05-22 ### Fixed diff --git a/README.md b/README.md index 9cae965..c4f8586 100644 --- a/README.md +++ b/README.md @@ -128,6 +128,25 @@ print(f"Pages tested: {len(session.pages_tested)}") print(f"Total findings: {session.total_findings}") ``` +Set `workers` to test pages in parallel, and use `BatchRunner` to run several +independent sessions concurrently with a bounded pool: + +```python +from qa_agent import BatchRunner, TestConfig + +configs = [ + TestConfig(urls=["https://example.com"], workers=4), + TestConfig(urls=["https://other.test"]), +] + +with BatchRunner(pool_size=4) as runner: + for result in runner.run_all(configs): + if isinstance(result, Exception): + print(f"session failed: {result}") + else: + print(f"{result.session_id}: {result.total_findings} findings") +``` + → [Full Python API Reference](https://github.com/billrichards/qa-agent/blob/main/docs/api-reference.md) — all classes, methods, and configuration options. --- @@ -242,6 +261,41 @@ qa-agent --mode focused https://example.com # default — test only given URLs qa-agent --mode explore https://example.com # crawl and test discovered pages ``` +### Concurrency + +Test multiple pages in parallel with cooperating workers. Each worker drives its +own browser, so memory and CPU scale with the worker count (capped at 16). + +```bash +qa-agent --workers 4 --mode explore https://example.com # 4 pages at a time +``` + +Run several independent sessions concurrently from a JSON spec file. Each entry +needs `urls` plus optional per-run overrides (`mode`, `max_depth`, `max_pages`, +`instructions`, `workers`); all other settings come from the command-line flags. + +```bash +qa-agent --batch-file runs.json --pool-size 4 +``` + +```json +[ + {"urls": ["https://example.com"], "mode": "explore", "workers": 4}, + {"urls": ["https://other.test/login"], "instructions": "Check the checkout flow"} +] +``` + +| Flag | Default | Description | +|---|---|---| +| `--workers N` | `1` | Concurrent page-workers per run (max 16) | +| `--batch-file FILE` | — | JSON file of multiple runs to execute concurrently | +| `--pool-size N` | `4` | Max concurrent runs for `--batch-file` (max 8) | + +> Total live browsers ≈ `pool-size × workers`, so size both with that +> multiplicative cost in mind. The web API accepts the same `workers` value in +> the `POST /api/run` body, and the pool size is set server-side via the +> `QA_AGENT_JOB_POOL_SIZE` environment variable. + ### Exploration (explore mode) | Flag | Default | Description | diff --git a/qa_agent/__init__.py b/qa_agent/__init__.py index 5b668bc..1fc465e 100644 --- a/qa_agent/__init__.py +++ b/qa_agent/__init__.py @@ -12,3 +12,34 @@ except PackageNotFoundError: # Package not installed (e.g. running from source without install) __version__ = "0.2.3" + +from .agent import QAAgent # noqa: E402 +from .batch import BatchJob, BatchRunner # noqa: E402 +from .config import ( # noqa: E402 + AuthConfig, + OutputFormat, + RecordingConfig, + ScreenshotConfig, + TestConfig, + TestMode, +) +from .llm_client import LLMProvider # noqa: E402 +from .models import Finding, PageAnalysis, Severity, TestSession # noqa: E402 + +__all__ = [ + "QAAgent", + "BatchRunner", + "BatchJob", + "TestConfig", + "AuthConfig", + "ScreenshotConfig", + "RecordingConfig", + "TestMode", + "OutputFormat", + "LLMProvider", + "TestSession", + "PageAnalysis", + "Finding", + "Severity", + "__version__", +] diff --git a/qa_agent/agent.py b/qa_agent/agent.py index c533c73..7329d58 100644 --- a/qa_agent/agent.py +++ b/qa_agent/agent.py @@ -1,5 +1,6 @@ """Core QA Agent implementation.""" +import copy import os import re import threading @@ -11,6 +12,7 @@ from playwright.sync_api import Browser, BrowserContext, Page, sync_playwright from playwright.sync_api import TimeoutError as PlaywrightTimeoutError +from .concurrency import Frontier, PageIndexer from .config import OutputFormat, TestConfig, TestMode from .models import Finding, FindingCategory, PageAnalysis, Severity, TestPlan, TestSession from .reporters import ConsoleReporter, JSONReporter, MarkdownReporter, PDFReporter @@ -43,8 +45,12 @@ def _extract_domain(url: str) -> str: class QAAgent: """Main QA Agent that orchestrates exploratory testing.""" - def __init__(self, config: TestConfig, playwright_factory=None): - self.config = config + def __init__(self, config: TestConfig, playwright_factory=None, worker_thread_init=None): + # Deep-copy so per-session output-dir derivation below never mutates the + # caller's config — essential when the same TestConfig template is handed + # to several concurrent agents (e.g. by BatchRunner). + self.config = copy.deepcopy(config) + config = self.config self.session: TestSession | None = None self.browser: Browser | None = None self.context: BrowserContext | None = None @@ -66,6 +72,15 @@ def __init__(self, config: TestConfig, playwright_factory=None): # Used by tests to inject a mock playwright without touching the network. self._playwright_factory = playwright_factory + # Optional callable run at the very start of each worker thread. The web + # layer uses this to route the worker's stdout to the right job queue. + self._worker_thread_init = worker_thread_init + + # Concurrency state (used only when config.workers > 1). + self._session_lock = threading.Lock() # guards session aggregation + self._page_indexer = PageIndexer() # worker-safe screenshot indices + self._recording_paths: list[str] = [] # one video per worker context + # Generate the session ID here so all output paths can be organized # under a session-specific subdirectory before reporters are created. self.session_id = str(uuid.uuid4())[:8] @@ -111,23 +126,25 @@ def run(self) -> TestSession: if self.config.instructions: self._generate_test_plan() - _pw_factory = self._playwright_factory if self._playwright_factory is not None else sync_playwright - with _pw_factory() as playwright: - self._setup_browser(playwright) + if self.config.workers > 1: + self._run_concurrent() + else: + with self._factory()() as playwright: + self._setup_browser(playwright) - try: - # Authenticate if needed - if self.config.auth: - self._authenticate() + try: + # Authenticate if needed + if self.config.auth: + self._authenticate() - # Run tests based on mode - if self.config.mode == TestMode.FOCUSED: - self._run_focused_mode() - else: - self._run_explore_mode() + # Run tests based on mode + if self.config.mode == TestMode.FOCUSED: + self._run_focused_mode() + else: + self._run_explore_mode() - finally: - self._cleanup() + finally: + self._cleanup() self.session.end_time = datetime.now() @@ -220,15 +237,22 @@ def _apply_test_plan(self): if self.test_plan.notes: self.console.print_progress(f"Notes: {self.test_plan.notes}") - def _setup_browser(self, playwright): - """Set up browser and context.""" - browser_options = { - "headless": self.config.headless, - } + def _factory(self): + """Return the playwright context-manager factory (real or injected mock).""" + return self._playwright_factory if self._playwright_factory is not None else sync_playwright + + def _launch_browser(self, playwright) -> Browser: + """Launch a Chromium browser with the configured options.""" + return playwright.chromium.launch(headless=self.config.headless) - self.browser = playwright.chromium.launch(**browser_options) + def _new_context_page(self, browser: Browser, storage_state=None) -> tuple[BrowserContext, Page]: + """Create a browser context + page with the configured options. - context_options = { + ``storage_state`` (a dict exported from a previously authenticated + context) seeds cookies/localStorage so per-worker contexts inherit the + logged-in session without each repeating the login flow. + """ + context_options: dict = { "viewport": { "width": self.config.viewport_width, "height": self.config.viewport_height, @@ -245,28 +269,47 @@ def _setup_browser(self, playwright): if self.config.auth and self.config.auth.headers: context_options["extra_http_headers"] = self.config.auth.headers - self.context = self.browser.new_context(**context_options) - self.context.set_default_timeout(self.config.timeout) + if storage_state is not None: + context_options["storage_state"] = storage_state - self.page = self.context.new_page() + context = browser.new_context(**context_options) + context.set_default_timeout(self.config.timeout) + page = context.new_page() + return context, page + + def _setup_browser(self, playwright): + """Set up the shared browser/context/page for the sequential code path.""" + self.browser = self._launch_browser(playwright) + self.context, self.page = self._new_context_page(self.browser) # Set up error detector self.error_detector = ErrorDetector(self.page, self.config) self.error_detector.attach_listeners() - def _authenticate(self): - """Perform authentication if configured.""" + def _authenticate(self, page: Page | None = None, context: BrowserContext | None = None): + """Perform authentication if configured. + + Operates on the given ``page``/``context`` when supplied (used by + concurrent workers re-authenticating on their own browser), otherwise + falls back to the agent's shared ``self.page``/``self.context``. + """ auth = self.config.auth + if auth is None: + return + context = context if context is not None else self.context + assert context is not None - # Handle cookies + # Handle cookies (no page needed) if auth.cookies: - self.context.add_cookies([auth.cookies] if isinstance(auth.cookies, dict) else auth.cookies) + context.add_cookies([auth.cookies] if isinstance(auth.cookies, dict) else auth.cookies) return # Handle form-based auth + page = page if page is not None else self.page + assert page is not None if auth.auth_url and auth.username and auth.password: self.console.print_progress(f"Authenticating at {auth.auth_url}") - self.page.goto(auth.auth_url) + page.goto(auth.auth_url) ctx = self.config.invocation_context if ctx == "cli": @@ -295,7 +338,7 @@ def _authenticate(self): # Find and fill username username_selector = auth.username_selector or 'input[type="email"], input[type="text"][name*="user"], input[name*="email"], input#username, input#email' try: - self.page.fill(username_selector, auth.username) + page.fill(username_selector, auth.username) except Exception as e: msg = f"Could not fill username field: {e}" if isinstance(e, PlaywrightTimeoutError) and not auth.username_selector: @@ -305,7 +348,7 @@ def _authenticate(self): # Find and fill password password_selector = auth.password_selector or 'input[type="password"]' try: - self.page.fill(password_selector, auth.password) + page.fill(password_selector, auth.password) except Exception as e: msg = f"Could not fill password field: {e}" if isinstance(e, PlaywrightTimeoutError) and not auth.password_selector: @@ -315,8 +358,8 @@ def _authenticate(self): # Submit submit_selector = auth.submit_selector or 'button[type="submit"], input[type="submit"], button:has-text("Login"), button:has-text("Sign in")' try: - self.page.click(submit_selector) - self.page.wait_for_load_state("networkidle", timeout=10000) + page.click(submit_selector) + page.wait_for_load_state("networkidle", timeout=10000) except Exception as e: msg = f"Could not submit login form: {e}" if isinstance(e, PlaywrightTimeoutError) and not auth.submit_selector: @@ -335,7 +378,7 @@ def _run_focused_mode(self): self.visited_urls.add(url) def _run_explore_mode(self): - """Explore and test pages, following links.""" + """Explore and test pages, following links (sequential path).""" # Initialize with seed URLs self.urls_to_visit = list(self.config.urls) depth_map = {url: 0 for url in self.urls_to_visit} @@ -358,7 +401,7 @@ def _run_explore_mode(self): # Discover new links if current_depth < self.config.max_depth: - new_links = self._discover_links(url) + new_links = self._discover_links(self.page, url) for link in new_links: new_url = link['href'] if new_url not in self.visited_urls and new_url not in self.urls_to_visit: @@ -366,17 +409,158 @@ def _run_explore_mode(self): self.urls_to_visit.append(new_url) depth_map[new_url] = current_depth + 1 + # -- concurrent (multi-worker) path -------------------------------------- + + def _run_concurrent(self): + """Test pages with ``config.workers`` cooperating worker threads. + + Each worker owns its own browser/context/page and pulls URLs from a + shared thread-safe :class:`Frontier`. Authentication, when configured, + is performed once on a bootstrap context and exported as a + ``storage_state`` dict that seeds every worker context. + """ + storage_state = self._bootstrap_auth() + + if self.config.mode == TestMode.FOCUSED: + frontier = Frontier( + max_pages=max(1, len(self.config.urls)), + max_depth=0, # focused mode never discovers links + stop_event=self.stop_event, + ) + num_workers = min(self.config.workers, max(1, len(self.config.urls))) + else: + frontier = Frontier( + max_pages=self.config.max_pages, + max_depth=self.config.max_depth, + stop_event=self.stop_event, + ) + num_workers = self.config.workers + + frontier.seed(self.config.urls, depth=0) + + threads = [ + threading.Thread( + target=self._worker_loop, + args=(frontier, storage_state), + name=f"qa-worker-{i}", + daemon=True, + ) + for i in range(num_workers) + ] + for t in threads: + t.start() + for t in threads: + t.join() + + # Surface collected recordings (one video per worker context). + if self._recording_paths: + self.session.recording_path = self._recording_paths[0] + if len(self._recording_paths) > 1: + self.session.config_summary["recording_paths"] = list(self._recording_paths) + + def _bootstrap_auth(self): + """Authenticate once and return an exported ``storage_state`` (or None).""" + if not self.config.auth: + return None + storage_state = None + with self._factory()() as playwright: + browser = self._launch_browser(playwright) + context, page = self._new_context_page(browser) + # Temporarily expose on self so _authenticate's fallbacks work. + self.context, self.page = context, page + try: + self._authenticate(page=page, context=context) + storage_state = context.storage_state() + except Exception as exc: + self.console.print_error(f"Bootstrap authentication failed: {exc}") + finally: + context.close() + browser.close() + return storage_state + + def _worker_loop(self, frontier: Frontier, storage_state): + """Worker thread: own a browser, drain the frontier until exhausted.""" + if self._worker_thread_init is not None: + try: + self._worker_thread_init() + except Exception: + pass + + with self._factory()() as playwright: + browser = self._launch_browser(playwright) + context, page = self._new_context_page(browser, storage_state=storage_state) + error_detector = ErrorDetector(page, self.config) + if self.config.test_console_errors or self.config.test_network_errors: + error_detector.attach_listeners() + + try: + while True: + claimed = frontier.claim() + if claimed is None: + break + url, depth = claimed + try: + self._test_page_on(page, error_detector, url, self._page_indexer.next()) + if self.config.mode == TestMode.EXPLORE and depth < self.config.max_depth: + links = self._discover_links(page, url) + new_urls = [ + link['href'] for link in links + if not self._should_skip_url(link['href'], link.get('text', '')) + ] + frontier.add_links(new_urls, depth) + except Exception as exc: + self.console.print_error(f"Worker error on {url}: {exc}") + finally: + frontier.complete_one() + finally: + self._capture_recording(page) + try: + context.close() + browser.close() + except Exception: + pass + + def _capture_recording(self, page: Page) -> None: + """Record a worker context's video path, if recording is enabled.""" + if not self.config.recording.enabled: + return + try: + video = page.video + if video: + path = video.path() + if path: + with self._session_lock: + self._recording_paths.append(path) + except Exception: + pass + + def _add_page_analysis(self, page_analysis: PageAnalysis) -> None: + """Thread-safe aggregation of a page result into the session.""" + assert self.session is not None + with self._session_lock: + self.session.add_page_analysis(page_analysis) + def _test_page(self, url: str): - """Test a single page.""" + """Test a single page on the agent's shared browser (sequential path).""" assert self.page is not None + assert self.error_detector is not None + self._test_page_on(self.page, self.error_detector, url, self._page_indexer.next()) + + def _test_page_on(self, page: Page, error_detector: "ErrorDetector", url: str, page_index: int): + """Test a single page on the given page/error_detector. + + ``page_index`` is a globally unique, worker-safe counter used to name + screenshots so concurrent workers never collide. + """ + assert page is not None assert self.session is not None self.console.print_page_start(url) try: start_time = time.time() - response = self.page.goto(url, wait_until="domcontentloaded") + response = page.goto(url, wait_until="domcontentloaded") if response is None or response.status < 400: - self.page.wait_for_load_state("networkidle", timeout=10000) + page.wait_for_load_state("networkidle", timeout=10000) load_time = (time.time() - start_time) * 1000 except Exception as e: self.console.print_error(f"Error loading page: {e}") @@ -385,14 +569,14 @@ def _test_page(self, url: str): # If we were redirected to the login page, re-authenticate once and continue. # Guard against infinite loops when credentials are wrong. auth = self.config.auth - if auth and auth.auth_url and self.page.url != url: + if auth and auth.auth_url and page.url != url: auth_path = urlparse(auth.auth_url).path.rstrip('/') - current_path = urlparse(self.page.url).path.rstrip('/') + current_path = urlparse(page.url).path.rstrip('/') if auth_path and current_path == auth_path: if self._reauth_count < 1: self._reauth_count += 1 self.console.print_progress("Detected redirect to login page, re-authenticating...") - self._authenticate() + self._authenticate(page=page, context=page.context) else: self.console.print_error( "Re-authentication attempted but still redirected to login page — " @@ -423,19 +607,19 @@ def _test_page(self, url: str): images_count=0, findings=[finding], ) - self.session.add_page_analysis(page_analysis) - if self.error_detector is not None: - self.error_detector.console_messages = [] - self.error_detector.network_errors = [] - self.error_detector.js_errors = [] + self._add_page_analysis(page_analysis) + if error_detector is not None: + error_detector.console_messages = [] + error_detector.network_errors = [] + error_detector.js_errors = [] return # Gather page info - page_info = self._analyze_page_structure() + page_info = self._analyze_page_structure(page) page_analysis = PageAnalysis( url=url, - title=self.page.title(), + title=page.title(), load_time_ms=load_time, interactive_elements=page_info["interactive_elements"], forms_count=page_info["forms_count"], @@ -449,7 +633,7 @@ def _test_page(self, url: str): if self.config.test_keyboard: self.console.print_test_category("keyboard navigation") - tester = KeyboardTester(self.page, self.config) + tester = KeyboardTester(page, self.config) findings = tester.run() all_findings.extend(findings) for f in findings: @@ -457,7 +641,7 @@ def _test_page(self, url: str): if self.config.test_mouse: self.console.print_test_category("mouse interaction") - tester = MouseTester(self.page, self.config) + tester = MouseTester(page, self.config) findings = tester.run() all_findings.extend(findings) for f in findings: @@ -465,7 +649,7 @@ def _test_page(self, url: str): if self.config.test_forms: self.console.print_test_category("form handling") - tester = FormTester(self.page, self.config) + tester = FormTester(page, self.config) findings = tester.run() all_findings.extend(findings) for f in findings: @@ -473,7 +657,7 @@ def _test_page(self, url: str): if self.config.test_accessibility: self.console.print_test_category("accessibility") - tester = AccessibilityTester(self.page, self.config) + tester = AccessibilityTester(page, self.config) findings = tester.run() all_findings.extend(findings) for f in findings: @@ -481,30 +665,30 @@ def _test_page(self, url: str): if self.config.test_wcag_compliance: self.console.print_test_category("WCAG 2.1 AA compliance") - tester = WCAGComplianceTester(self.page, self.config) + tester = WCAGComplianceTester(page, self.config) findings = tester.run() all_findings.extend(findings) for f in findings: self.console.print_finding(f) if self.config.test_console_errors or self.config.test_network_errors: - assert self.error_detector is not None + assert error_detector is not None self.console.print_test_category("error detection") - findings = self.error_detector.run() + findings = error_detector.run() all_findings.extend(findings) for f in findings: self.console.print_finding(f) - self.error_detector.get_summary() + error_detector.get_summary() page_analysis.console_errors = [ - m["text"] for m in self.error_detector.console_messages + m["text"] for m in error_detector.console_messages if m["type"] == "error" ] - page_analysis.network_errors = self.error_detector.network_errors + page_analysis.network_errors = error_detector.network_errors if self.test_plan and self.test_plan.custom_steps: self.console.print_test_category("custom AI steps") - tester = CustomTester(self.page, self.config, self.test_plan) + tester = CustomTester(page, self.config, self.test_plan) findings = tester.run() all_findings.extend(findings) for f in findings: @@ -512,26 +696,26 @@ def _test_page(self, url: str): # Take screenshot if there were errors if all_findings and self.config.screenshots.on_error: - screenshot_path = self._take_screenshot(f"page_{len(self.visited_urls)}") + screenshot_path = self._take_screenshot(page, f"page_{page_index}") if screenshot_path: for finding in all_findings: if not finding.screenshot_path: finding.screenshot_path = screenshot_path page_analysis.findings = all_findings - self.session.add_page_analysis(page_analysis) + self._add_page_analysis(page_analysis) # Reset error detector for next page - if self.error_detector is not None: - self.error_detector.console_messages = [] - self.error_detector.network_errors = [] - self.error_detector.js_errors = [] + if error_detector is not None: + error_detector.console_messages = [] + error_detector.network_errors = [] + error_detector.js_errors = [] - def _analyze_page_structure(self) -> dict: + def _analyze_page_structure(self, page: Page) -> dict: """Analyze the structure of the current page.""" - assert self.page is not None + assert page is not None try: - return dict(self.page.evaluate("""() => ({ + return dict(page.evaluate("""() => ({ interactive_elements: document.querySelectorAll('a, button, input, select, textarea, [onclick], [role="button"]').length, forms_count: document.querySelectorAll('form').length, links_count: document.querySelectorAll('a[href]').length, @@ -553,11 +737,11 @@ def _analyze_page_structure(self) -> dict: 'log out', 'logout', 'sign out', 'signout', 'delete account', 'deactivate account', ] - def _discover_links(self, current_url: str) -> list[dict]: + def _discover_links(self, page: Page, current_url: str) -> list[dict]: """Discover links on the current page for exploration.""" - assert self.page is not None + assert page is not None try: - raw = self.page.evaluate("""() => { + raw = page.evaluate("""() => { const links = document.querySelectorAll('a[href]'); return Array.from(links).map(a => ({ href: a.href, @@ -619,19 +803,20 @@ def _should_skip_url(self, url: str, link_text: str = "") -> bool: return False - def _take_screenshot(self, name: str) -> str | None: + def _take_screenshot(self, page: Page, name: str) -> str | None: """Take a screenshot and return the path.""" if not self.config.screenshots.enabled: return None - assert self.page is not None + assert page is not None os.makedirs(self.config.screenshots.output_dir, exist_ok=True) - timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + # Microsecond precision so concurrent workers never produce the same filename. + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S_%f") filename = f"{name}_{timestamp}.png" filepath = os.path.join(self.config.screenshots.output_dir, filename) try: - self.page.screenshot( + page.screenshot( path=filepath, full_page=self.config.screenshots.full_page ) diff --git a/qa_agent/batch.py b/qa_agent/batch.py new file mode 100644 index 0000000..9284c88 --- /dev/null +++ b/qa_agent/batch.py @@ -0,0 +1,137 @@ +"""Bounded pool for running multiple QA sessions concurrently. + +This is the *job-level* (layer 2) concurrency, distinct from the per-run +page-worker concurrency in :mod:`qa_agent.concurrency`. A :class:`BatchRunner` +owns a bounded :class:`~concurrent.futures.ThreadPoolExecutor`; each submitted +:class:`~qa_agent.config.TestConfig` runs a full ``QAAgent(config).run()`` on a +pool thread. It is shared by both the web server (replacing its unbounded +thread-per-job model) and the CLI/library ``batch`` entry point, so all three +usage modes get the same bounded, back-pressured behaviour. + +Note: page-workers (``config.workers``) are spawned *within* each pool thread, +so the total live browsers ≈ ``pool_size × workers``. Size both with that +multiplicative cost in mind. +""" + +from __future__ import annotations + +import threading +import uuid +from concurrent.futures import Future, ThreadPoolExecutor +from dataclasses import dataclass, field + +from .agent import QAAgent +from .config import TestConfig +from .models import TestSession + +DEFAULT_POOL_SIZE = 4 +POOL_SIZE_MAX = 8 + + +@dataclass +class BatchJob: + """Handle for a single submitted session.""" + + job_id: str + future: Future[TestSession] + agent: QAAgent + stop_event: threading.Event + session_id: str + domain: str = "" + _started: threading.Event = field(default_factory=threading.Event) + + @property + def status(self) -> str: + """Derive a coarse status from the underlying future.""" + if self.future.cancelled(): + return "stopped" + if self.future.done(): + return "failed" if self.future.exception() is not None else "completed" + if self._started.is_set(): + return "running" + return "queued" + + def result(self, timeout: float | None = None) -> TestSession: + """Block for the session result (re-raises any worker exception).""" + return self.future.result(timeout=timeout) + + def stop(self) -> None: + """Request a graceful stop (and cancel if still queued).""" + self.stop_event.set() + self.future.cancel() + + +class BatchRunner: + """Run multiple :class:`TestConfig` sessions with bounded concurrency.""" + + def __init__(self, pool_size: int = DEFAULT_POOL_SIZE, thread_name_prefix: str = "qa-job"): + self.pool_size = max(1, min(POOL_SIZE_MAX, int(pool_size))) + self._executor = ThreadPoolExecutor( + max_workers=self.pool_size, thread_name_prefix=thread_name_prefix + ) + + def submit( + self, + config: TestConfig, + *, + worker_thread_init=None, + stop_event: threading.Event | None = None, + job_id: str | None = None, + ) -> BatchJob: + """Submit one session to the pool and return its :class:`BatchJob`.""" + stop_event = stop_event if stop_event is not None else threading.Event() + agent = QAAgent(config, worker_thread_init=worker_thread_init) + agent.stop_event = stop_event + + domain = "" + if config.urls: + from urllib.parse import urlparse + domain = urlparse(config.urls[0]).netloc.split(":")[0] + + job = BatchJob( + job_id=job_id or str(uuid.uuid4())[:8], + future=None, # type: ignore[arg-type] # set just below + agent=agent, + stop_event=stop_event, + session_id=agent.session_id, + domain=domain, + ) + + def _task() -> TestSession: + job._started.set() + # Initialise this pool thread (e.g. route its stdout) before running. + # The agent calls the same hook again on each page-worker sub-thread; + # it is thread-local and idempotent, so both layers are covered. + if worker_thread_init is not None: + try: + worker_thread_init() + except Exception: + pass + return agent.run() + + job.future = self._executor.submit(_task) + return job + + def run_all(self, configs: list[TestConfig]) -> list[TestSession | Exception]: + """Submit all configs, wait, and return results in input order. + + A failing session yields its exception in place rather than aborting the + whole batch. + """ + jobs = [self.submit(cfg) for cfg in configs] + results: list[TestSession | Exception] = [] + for job in jobs: + try: + results.append(job.result()) + except Exception as exc: # noqa: BLE001 - isolate per-job failure + results.append(exc) + return results + + def shutdown(self, wait: bool = True) -> None: + self._executor.shutdown(wait=wait) + + def __enter__(self) -> BatchRunner: + return self + + def __exit__(self, *exc) -> None: + self.shutdown(wait=True) diff --git a/qa_agent/cli.py b/qa_agent/cli.py index a6e1224..39cbd42 100644 --- a/qa_agent/cli.py +++ b/qa_agent/cli.py @@ -83,8 +83,8 @@ def main(): # Positional arguments parser.add_argument( "urls", - nargs="+", - help="URL(s) to test", + nargs="*", + help="URL(s) to test (omit only when using --batch-file)", ) # Mode options @@ -115,6 +115,28 @@ def main(): dest="max_interactions_per_page", help="Maximum number of interactions per page (default: 50)", ) + + # Concurrency + parser.add_argument( + "--workers", + type=int, + default=1, + help="Number of concurrent page-workers per run (default: 1, max: 16). " + "Each worker drives its own browser, so memory scales with this value.", + ) + parser.add_argument( + "--batch-file", + help="Path to a JSON file describing multiple runs to execute concurrently. " + "Each entry is an object with at least 'urls' and optional overrides " + "(mode, max_depth, max_pages, instructions, workers). Other settings are " + "inherited from the command-line flags.", + ) + parser.add_argument( + "--pool-size", + type=int, + default=4, + help="Max concurrent runs when using --batch-file (default: 4, max: 8).", + ) parser.add_argument( "--same-domain", action="store_true", @@ -284,6 +306,10 @@ def main(): if args.no_cache and not (args.instructions or args.instructions_file): parser.error("--no-cache can only be used with --instructions or --instructions-file") + # Validate: need URLs unless running a batch file + if not args.urls and not args.batch_file: + parser.error("at least one URL is required (or use --batch-file)") + # Parse output formats output_formats = [] for fmt in args.output.split(","): @@ -387,9 +413,15 @@ def main(): llm_provider=LLMProvider(args.llm_provider), ai_model=args.ai_model or None, use_plan_cache=not args.no_cache, + workers=args.workers, invocation_context="cli", ) + # Batch mode: run multiple sessions concurrently from a spec file. + if args.batch_file: + _run_batch(args, config) + return + # Run the agent agent = QAAgent(config) @@ -416,5 +448,77 @@ def main(): sys.exit(2) +def _run_batch(args, template: TestConfig) -> None: + """Run multiple sessions concurrently from a JSON --batch-file. + + Each spec is an object with at least ``urls`` plus optional per-run overrides + (``mode``, ``max_depth``, ``max_pages``, ``instructions``, ``workers``); all + other settings are inherited from ``template`` (the command-line flags). + Exits non-zero if any session has critical/high findings or tested no pages. + """ + import copy + + from .batch import BatchRunner + + try: + specs = json.loads(Path(args.batch_file).read_text(encoding="utf-8")) + except Exception as e: + print(f"Error reading batch file: {e}", file=sys.stderr) + sys.exit(2) + + if not isinstance(specs, list) or not specs: + print("Batch file must be a non-empty JSON array of run specs", file=sys.stderr) + sys.exit(2) + + configs: list[TestConfig] = [] + for i, spec in enumerate(specs): + if not isinstance(spec, dict) or not spec.get("urls"): + print(f"Batch spec #{i} must be an object with a non-empty 'urls'", file=sys.stderr) + sys.exit(2) + cfg = copy.deepcopy(template) + cfg.urls = list(spec["urls"]) + if "mode" in spec: + cfg.mode = TestMode.EXPLORE if spec["mode"] == "explore" else TestMode.FOCUSED + if "max_depth" in spec: + cfg.max_depth = int(spec["max_depth"]) + if "max_pages" in spec: + cfg.max_pages = int(spec["max_pages"]) + if "instructions" in spec: + cfg.instructions = spec["instructions"] or None + if "workers" in spec: + cfg.workers = int(spec["workers"]) + cfg.__post_init__() # re-clamp workers after override + configs.append(cfg) + + print(f"Running {len(configs)} sessions (pool size {args.pool_size})…") + runner = BatchRunner(pool_size=args.pool_size) + try: + results = runner.run_all(configs) + finally: + runner.shutdown(wait=True) + + exit_code = 0 + for cfg, result in zip(configs, results, strict=False): + label = cfg.urls[0] if cfg.urls else "?" + if isinstance(result, Exception): + print(f" ✗ {label}: failed — {result}") + exit_code = max(exit_code, 2) + continue + critical_high = ( + result.findings_by_severity.get("critical", 0) + + result.findings_by_severity.get("high", 0) + ) + print( + f" • {label}: {len(result.pages_tested)} pages, " + f"{result.total_findings} findings ({result.status})" + ) + if not result.pages_tested: + exit_code = max(exit_code, 2) + elif critical_high > 0: + exit_code = max(exit_code, 1) + + sys.exit(exit_code) + + if __name__ == "__main__": main() diff --git a/qa_agent/concurrency.py b/qa_agent/concurrency.py new file mode 100644 index 0000000..717acbc --- /dev/null +++ b/qa_agent/concurrency.py @@ -0,0 +1,150 @@ +"""Thread-safe primitives for running page-workers concurrently. + +These helpers let multiple worker threads cooperate on a single test run: + +* :class:`Frontier` is a thread-safe BFS frontier shared by all workers. It + enforces the ``max_pages`` / ``max_depth`` budget *correctly under + concurrency* (a URL is marked visited the moment it is claimed, so N workers + can never overshoot the page budget) and coordinates clean termination — a + worker's :meth:`Frontier.claim` returns ``None`` only when the queue is empty + **and** no other worker is still in-flight (so no links remain to discover). + +The actual per-page work and Playwright lifecycle live in :mod:`qa_agent.agent`; +this module is deliberately free of any browser dependency so it can be unit +tested without Playwright. +""" + +from __future__ import annotations + +import itertools +import threading + + +class Frontier: + """A thread-safe BFS frontier with a page/depth budget. + + Parameters + ---------- + max_pages: + Maximum number of distinct URLs that may be claimed across all workers. + max_depth: + Maximum BFS depth. URLs deeper than this are never claimed, and + :meth:`add_links` will not enqueue children beyond it. + stop_event: + Optional :class:`threading.Event`; when set, :meth:`claim` returns + ``None`` so workers wind down promptly. + """ + + def __init__( + self, + max_pages: int, + max_depth: int, + stop_event: threading.Event | None = None, + ) -> None: + self.max_pages = max(0, int(max_pages)) + self.max_depth = max(0, int(max_depth)) + self._stop_event = stop_event + + self._queue: list[str] = [] # FIFO of pending URLs + self._depth: dict[str, int] = {} # URL -> depth (queued or visited) + self._visited: set[str] = set() # claimed URLs (in-progress or done) + self._in_progress = 0 + self._cond = threading.Condition() + + # -- seeding / enqueueing ------------------------------------------------- + + def seed(self, urls: list[str], depth: int = 0) -> None: + """Add the initial URLs at the given depth.""" + with self._cond: + for url in urls: + if url and url not in self._depth and url not in self._visited: + self._queue.append(url) + self._depth[url] = depth + self._cond.notify_all() + + def add_links(self, urls: list[str], parent_depth: int) -> None: + """Enqueue freshly discovered child URLs (deduped, depth-bounded).""" + new_depth = parent_depth + 1 + if new_depth > self.max_depth: + return + with self._cond: + for url in urls: + if not url or url in self._depth or url in self._visited: + continue + self._queue.append(url) + self._depth[url] = new_depth + self._cond.notify_all() + + # -- claiming / completing ------------------------------------------------ + + def claim(self) -> tuple[str, int] | None: + """Claim the next testable URL, or ``None`` when work is exhausted. + + Blocks while the queue is momentarily empty but other workers are still + in-flight (they may yet discover new links). Returns ``None`` when the + page budget is hit, ``stop_event`` is set, or the queue is drained and + no worker is in-flight. + """ + with self._cond: + while True: + if self._stop_event is not None and self._stop_event.is_set(): + return None + if len(self._visited) >= self.max_pages: + return None + + claimed = self._pop_claimable_locked() + if claimed is not None: + url, depth = claimed + self._visited.add(url) + self._in_progress += 1 + return claimed + + # Nothing claimable right now. + if self._in_progress == 0: + return None # queue drained and no one can enqueue more + + # Another worker is in-flight; wait for links or completion. + self._cond.wait(timeout=0.5) + + def _pop_claimable_locked(self) -> tuple[str, int] | None: + """Pop the next URL that is unvisited and within depth (caller holds lock).""" + while self._queue: + url = self._queue.pop(0) + if url in self._visited: + continue + depth = self._depth.get(url, 0) + if depth > self.max_depth: + continue + return (url, depth) + return None + + def complete_one(self) -> None: + """Signal that an in-flight page finished (wakes blocked claimers).""" + with self._cond: + if self._in_progress > 0: + self._in_progress -= 1 + self._cond.notify_all() + + # -- introspection -------------------------------------------------------- + + @property + def visited_count(self) -> int: + with self._cond: + return len(self._visited) + + +class PageIndexer: + """Hands out monotonically increasing, thread-safe page indices. + + Used for worker-safe screenshot filenames so concurrent workers never + collide. Starts at 0 to match the legacy sequential naming + (``page_0``, ``page_1``, …). + """ + + def __init__(self) -> None: + self._counter = itertools.count() + self._lock = threading.Lock() + + def next(self) -> int: + with self._lock: + return next(self._counter) diff --git a/qa_agent/config.py b/qa_agent/config.py index 96d774f..fb51374 100644 --- a/qa_agent/config.py +++ b/qa_agent/config.py @@ -103,3 +103,20 @@ class TestConfig: # Invocation context — used to tailor diagnostic hints invocation_context: Literal["cli", "web"] | None = None + + # Number of concurrent page-workers per run. 1 = sequential (default). + # Each worker drives its own browser/context, so total browsers scale with + # this value; it is clamped to a sane ceiling in __post_init__. + workers: int = 1 + + # Hard ceiling on concurrent page-workers to bound browser RAM/CPU. + WORKERS_MAX = 16 + + def __post_init__(self) -> None: + # Clamp worker count to [1, WORKERS_MAX]. Defensive against bad input + # from CLI flags or web request bodies. + try: + workers = int(self.workers) + except (TypeError, ValueError): + workers = 1 + self.workers = max(1, min(self.WORKERS_MAX, workers)) diff --git a/qa_agent/web/server.py b/qa_agent/web/server.py index cbc5cf3..08395df 100644 --- a/qa_agent/web/server.py +++ b/qa_agent/web/server.py @@ -3,6 +3,7 @@ import html as html_lib import io import json +import os import queue import re import sys @@ -15,7 +16,7 @@ from flask import Flask, Response, abort, jsonify, render_template, request, send_file -from ..agent import QAAgent +from ..batch import BatchRunner from ..config import ( AuthConfig, OutputFormat, @@ -87,21 +88,28 @@ def fileno(self) -> int: class _QueueWriter: - """Writes stdout lines into a job's event queue as SSE-ready dicts.""" + """Writes stdout lines into a job's event queue as SSE-ready dicts. + + Thread-safe: a job's page-workers run in separate threads but share one + writer, so ``write`` is guarded by a lock to avoid corrupting the line + buffer or interleaving partial lines. + """ def __init__(self, q: queue.Queue, events: list) -> None: self._q = q self._events = events self._buf = "" + self._lock = threading.Lock() def write(self, text: str) -> int: - self._buf += text - while "\n" in self._buf: - line, self._buf = self._buf.split("\n", 1) - clean = _ANSI_RE.sub("", line) - if clean.strip(): - self._emit("log", {"message": clean}) - self._detect_structured(clean) + with self._lock: + self._buf += text + while "\n" in self._buf: + line, self._buf = self._buf.split("\n", 1) + clean = _ANSI_RE.sub("", line) + if clean.strip(): + self._emit("log", {"message": clean}) + self._detect_structured(clean) return len(text) def flush(self) -> None: @@ -133,6 +141,12 @@ def _detect_structured(self, line: str) -> None: _jobs: dict[str, dict[str, Any]] = {} _jobs_lock = threading.Lock() +# Bounded pool that runs whole sessions (layer-2 concurrency). Replaces the old +# unbounded thread-per-request model so the server applies backpressure. Size is +# env-overridable; remember total browsers ≈ JOB_POOL_SIZE × config.workers. +JOB_POOL_SIZE = int(os.environ.get("QA_AGENT_JOB_POOL_SIZE", "4")) +_runner = BatchRunner(pool_size=JOB_POOL_SIZE) + def _make_job(job_id: str) -> dict: return { @@ -151,57 +165,59 @@ def _make_job(job_id: str) -> dict: } -def _run_job(job_id: str, config: TestConfig) -> None: - """Execute QAAgent in a background thread, streaming output to the job queue.""" +def _submit_job(job_id: str, config: TestConfig) -> None: + """Submit a session to the bounded pool, streaming output to the job queue. + + The job's per-thread stdout stream is set via the agent's + ``worker_thread_init`` hook so output from BOTH the pool thread running the + session and any page-worker sub-threads it spawns routes to this job's queue + (thread-local stdout would otherwise miss the sub-threads). + """ job = _jobs[job_id] q = job["queue"] events = job["events"] writer = _QueueWriter(q, events) - _local.stream = writer - - try: - job["status"] = "running" - agent = QAAgent(config) - agent.stop_event = job["stop_event"] - - # Capture session_id and domain before run() (agent sets them in __init__) - job["session_id"] = agent.session_id - domain = urlparse(config.urls[0]).netloc.split(":")[0] if config.urls else "unknown" - job["domain"] = domain + domain = urlparse(config.urls[0]).netloc.split(":")[0] if config.urls else "unknown" + job["domain"] = domain - session = agent.run() + def _init_stream() -> None: + _local.stream = writer - job["total_findings"] = session.total_findings - job["pages_tested"] = len(session.pages_tested) - - if job["stop_event"].is_set(): - job["status"] = "stopped" - status = "stopped" - else: - job["status"] = "completed" - status = "completed" - - complete_data = { - "session_id": session.session_id, - "domain": domain, - "total_findings": session.total_findings, - "status": status, - } - msg = {"type": "complete", "data": complete_data} - events.append(msg) - q.put(msg) + bjob = _runner.submit( + config, + worker_thread_init=_init_stream, + stop_event=job["stop_event"], + ) + job["session_id"] = bjob.session_id + job["status"] = "running" - except Exception as exc: - job["status"] = "failed" - job["error"] = str(exc) - msg = {"type": "error", "data": {"message": str(exc)}} + def _on_done(future) -> None: + try: + session = future.result() + job["total_findings"] = session.total_findings + job["pages_tested"] = len(session.pages_tested) + status = "stopped" if job["stop_event"].is_set() else "completed" + job["status"] = status + msg = { + "type": "complete", + "data": { + "session_id": session.session_id, + "domain": domain, + "total_findings": session.total_findings, + "status": status, + }, + } + except Exception as exc: # noqa: BLE001 - report failure to the client + job["status"] = "failed" + job["error"] = str(exc) + msg = {"type": "error", "data": {"message": str(exc)}} events.append(msg) q.put(msg) - - finally: q.put(None) # sentinel — stream generator stops here _local.stream = None + bjob.future.add_done_callback(_on_done) + # ── Pages ───────────────────────────────────────────────────────────────────── @@ -254,8 +270,7 @@ def api_run(): with _jobs_lock: _jobs[job_id] = job - t = threading.Thread(target=_run_job, args=(job_id, config), daemon=True) - t.start() + _submit_job(job_id, config) return jsonify({ "job_id": job_id, @@ -717,6 +732,7 @@ def _build_config(body: dict) -> TestConfig: llm_provider=_parse_llm_provider(body.get("llm_provider", "anthropic")), ai_model=body.get("ai_model") or None, use_plan_cache=bool(body.get("use_plan_cache", True)), + workers=max(1, min(16, int(body.get("workers", 1)))), auth=auth, screenshots=ScreenshotConfig( enabled=ss_enabled, diff --git a/tests/conftest.py b/tests/conftest.py index 32406c9..ec7f34b 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -121,6 +121,7 @@ def make_mock_playwright_factory(page: MagicMock | None = None): pw = MagicMock() context.new_page.return_value = page + context.storage_state.return_value = {"cookies": [], "origins": []} browser.new_context.return_value = context chromium.launch.return_value = browser pw.chromium = chromium @@ -132,6 +133,40 @@ def factory(): return factory, page, context, browser +def make_multi_mock_playwright_factory(): + """Return a factory whose contexts/pages are DISTINCT per call. + + Use for multi-worker (``config.workers > 1``) tests so each worker gets its + own mock page rather than a shared singleton. Returns ``(factory, pages)`` + where ``pages`` is a live list that accumulates every page handed out. + """ + pages: list[MagicMock] = [] + + def _new_page(*_a, **_k): + page = _make_mock_page() + pages.append(page) + return page + + def _new_context(*_a, **_k): + context = MagicMock() + context.new_page.side_effect = _new_page + context.storage_state.return_value = {"cookies": [], "origins": []} + return context + + browser = MagicMock() + browser.new_context.side_effect = _new_context + chromium = MagicMock() + chromium.launch.return_value = browser + pw = MagicMock() + pw.chromium = chromium + + @contextmanager + def factory(): + yield pw + + return factory, pages + + # --------------------------------------------------------------------------- # pytest fixtures # --------------------------------------------------------------------------- diff --git a/tests/integration/test_smoke.py b/tests/integration/test_smoke.py index e12d32c..b2ad631 100644 --- a/tests/integration/test_smoke.py +++ b/tests/integration/test_smoke.py @@ -182,3 +182,51 @@ def test_no_exception_raised(self, fixture_server, tmp_path): agent = QAAgent(config) session = agent.run() # must not raise assert session is not None + + +@pytest.mark.integration +class TestSmokeParallel: + """Exercise the real sync-Playwright-in-threads multi-worker path.""" + + def test_focused_two_workers_tests_all_pages(self, fixture_server, tmp_path): + """workers=2 over 3 fixture pages must test all 3 with real browsers.""" + from qa_agent.agent import QAAgent + from qa_agent.config import OutputFormat, TestConfig + + config = TestConfig( + urls=[ + f"{fixture_server}/index.html", + f"{fixture_server}/page2.html", + f"{fixture_server}/login.html", + ], + output_formats=[OutputFormat.JSON], + output_dir=str(tmp_path), + headless=True, + workers=2, + ) + agent = QAAgent(config) + session = agent.run() + + tested = {pa.url for pa in session.pages_tested} + assert len(tested) == 3, f"expected all 3 pages tested, got {tested}" + + def test_parallel_matches_sequential_findings(self, fixture_server, tmp_path): + """The same URLs should yield the same finding count run in parallel or serially.""" + from qa_agent.agent import QAAgent + from qa_agent.config import OutputFormat, TestConfig + + def _run(workers: int, out): + config = TestConfig( + urls=[f"{fixture_server}/index.html", f"{fixture_server}/page2.html"], + output_formats=[OutputFormat.JSON], + output_dir=str(out), + headless=True, + workers=workers, + ) + return QAAgent(config).run() + + seq = _run(1, tmp_path / "seq") + par = _run(2, tmp_path / "par") + + assert len(par.pages_tested) == len(seq.pages_tested) + assert par.total_findings == seq.total_findings diff --git a/tests/test_agent.py b/tests/test_agent.py index c518182..ac32f1e 100644 --- a/tests/test_agent.py +++ b/tests/test_agent.py @@ -12,7 +12,7 @@ from qa_agent.agent import QAAgent, _extract_domain from qa_agent.config import AuthConfig, OutputFormat, TestConfig, TestMode from qa_agent.models import Finding, FindingCategory, Severity -from tests.conftest import make_mock_playwright_factory +from tests.conftest import make_mock_playwright_factory, make_multi_mock_playwright_factory # --------------------------------------------------------------------------- # _extract_domain @@ -562,3 +562,157 @@ def test_timeout_with_custom_selector_no_hint(self, capsys): out = capsys.readouterr().out assert "--auth-file" not in out assert "AuthConfig" not in out + + +# --------------------------------------------------------------------------- +# Multi-worker (concurrent) page testing +# --------------------------------------------------------------------------- + +class TestQAAgentParallel: + """config.workers > 1 spawns cooperating page-workers.""" + + def _patch_testers(self, accessibility_run=None): + from unittest.mock import patch as _patch + targets = { + "qa_agent.agent.KeyboardTester.run": [], + "qa_agent.agent.MouseTester.run": [], + "qa_agent.agent.FormTester.run": [], + "qa_agent.agent.AccessibilityTester.run": [], + "qa_agent.agent.ErrorDetector.run": [], + "qa_agent.agent.ErrorDetector.attach_listeners": None, + "qa_agent.agent.ErrorDetector.get_summary": {}, + } + patchers = [] + for target, retval in targets.items(): + if target == "qa_agent.agent.AccessibilityTester.run" and accessibility_run: + patchers.append(_patch(target, accessibility_run)) + else: + patchers.append(_patch(target, return_value=retval)) + return patchers + + def test_focused_three_workers_tests_all_pages(self): + config = _make_config( + urls=["https://example.com/a", "https://example.com/b", "https://example.com/c"], + workers=3, + ) + factory, pages = make_multi_mock_playwright_factory() + agent = QAAgent(config, playwright_factory=factory) + + patchers = self._patch_testers() + for p in patchers: + p.start() + try: + session = agent.run() + finally: + for p in patchers: + p.stop() + + urls = {pa.url for pa in session.pages_tested} + assert urls == { + "https://example.com/a", + "https://example.com/b", + "https://example.com/c", + } + assert len(session.pages_tested) == 3 + + def test_findings_aggregate_across_workers(self): + config = _make_config( + urls=["https://example.com/a", "https://example.com/b"], + workers=2, + ) + factory, _pages = make_multi_mock_playwright_factory() + agent = QAAgent(config, playwright_factory=factory) + + def acc_run(self): + return [Finding( + title="Issue", + description="d", + category=FindingCategory.ACCESSIBILITY, + severity=Severity.LOW, + url="https://example.com", + )] + + patchers = self._patch_testers(accessibility_run=acc_run) + for p in patchers: + p.start() + try: + session = agent.run() + finally: + for p in patchers: + p.stop() + + # One finding per page, aggregated without loss under concurrency. + assert session.total_findings == 2 + + def test_does_not_mutate_caller_config(self): + config = _make_config(urls=["https://example.com/a"], workers=2) + original_output_dir = config.output_dir + factory, _pages = make_multi_mock_playwright_factory() + agent = QAAgent(config, playwright_factory=factory) + + patchers = self._patch_testers() + for p in patchers: + p.start() + try: + agent.run() + finally: + for p in patchers: + p.stop() + + # The agent deep-copies config; the caller's object is untouched. + assert config.output_dir == original_output_dir + assert agent.config.output_dir != original_output_dir + + def test_explore_respects_max_pages(self): + config = _make_config(urls=["https://example.com"], workers=3) + config.mode = TestMode.EXPLORE + config.max_pages = 4 + config.max_depth = 5 + factory, _pages = make_multi_mock_playwright_factory() + agent = QAAgent(config, playwright_factory=factory) + + counter = [0] + clock = threading.Lock() + + def fake_discover(self, page, url): + with clock: + n = counter[0] + counter[0] += 2 + return [ + {"href": f"https://example.com/p{n}", "text": ""}, + {"href": f"https://example.com/p{n+1}", "text": ""}, + ] + + patchers = self._patch_testers() + patchers.append(patch("qa_agent.agent.QAAgent._discover_links", fake_discover)) + for p in patchers: + p.start() + try: + session = agent.run() + finally: + for p in patchers: + p.stop() + + assert len(session.pages_tested) <= 4 + + def test_stop_event_halts_workers(self): + config = _make_config( + urls=[f"https://example.com/{i}" for i in range(10)], + workers=2, + ) + factory, _pages = make_multi_mock_playwright_factory() + agent = QAAgent(config, playwright_factory=factory) + agent.stop_event = threading.Event() + # Stop immediately so few (ideally zero) pages are claimed. + agent.stop_event.set() + + patchers = self._patch_testers() + for p in patchers: + p.start() + try: + session = agent.run() + finally: + for p in patchers: + p.stop() + + assert len(session.pages_tested) < 10 diff --git a/tests/test_batch.py b/tests/test_batch.py new file mode 100644 index 0000000..a2cdb16 --- /dev/null +++ b/tests/test_batch.py @@ -0,0 +1,86 @@ +"""Tests for the BatchRunner job pool (layer-2 concurrency).""" + +from __future__ import annotations + +import threading +import time + +from qa_agent.batch import BatchRunner +from qa_agent.config import TestConfig, TestMode +from qa_agent.models import TestSession + + +def _cfg(url: str = "https://example.com") -> TestConfig: + return TestConfig(urls=[url], mode=TestMode.FOCUSED) + + +class TestBatchRunner: + def test_pool_size_clamped(self): + assert BatchRunner(pool_size=999).pool_size <= 8 + assert BatchRunner(pool_size=0).pool_size == 1 + + def test_run_all_preserves_order(self, monkeypatch): + def fake_run(self): + return TestSession(session_id=self.config.urls[0], start_time=__import__("datetime").datetime.now()) + + monkeypatch.setattr("qa_agent.batch.QAAgent.run", fake_run, raising=True) + with BatchRunner(pool_size=4) as runner: + cfgs = [_cfg(f"https://example.com/{i}") for i in range(5)] + results = runner.run_all(cfgs) + assert [r.session_id for r in results] == [f"https://example.com/{i}" for i in range(5)] + + def test_per_job_exception_isolated(self, monkeypatch): + def fake_run(self): + if "boom" in self.config.urls[0]: + raise RuntimeError("kaboom") + return TestSession(session_id="ok", start_time=__import__("datetime").datetime.now()) + + monkeypatch.setattr("qa_agent.batch.QAAgent.run", fake_run, raising=True) + with BatchRunner(pool_size=4) as runner: + results = runner.run_all([_cfg("https://example.com/ok"), _cfg("https://example.com/boom")]) + assert isinstance(results[0], TestSession) + assert isinstance(results[1], RuntimeError) + + def test_concurrency_is_bounded(self, monkeypatch): + """No more than pool_size sessions run simultaneously.""" + pool_size = 2 + active = [0] + peak = [0] + lock = threading.Lock() + + def fake_run(self): + with lock: + active[0] += 1 + peak[0] = max(peak[0], active[0]) + time.sleep(0.1) + with lock: + active[0] -= 1 + return TestSession(session_id="x", start_time=__import__("datetime").datetime.now()) + + monkeypatch.setattr("qa_agent.batch.QAAgent.run", fake_run, raising=True) + with BatchRunner(pool_size=pool_size) as runner: + runner.run_all([_cfg(f"https://example.com/{i}") for i in range(6)]) + assert peak[0] <= pool_size + + def test_submit_returns_job_with_status(self, monkeypatch): + def fake_run(self): + return TestSession(session_id="s", start_time=__import__("datetime").datetime.now()) + + monkeypatch.setattr("qa_agent.batch.QAAgent.run", fake_run, raising=True) + with BatchRunner(pool_size=2) as runner: + job = runner.submit(_cfg()) + result = job.result(timeout=5) + assert result.session_id == "s" + assert job.status == "completed" + + def test_does_not_mutate_caller_config(self, monkeypatch): + """The shared template's output_dir must be untouched after a run.""" + def fake_run(self): + return TestSession(session_id="s", start_time=__import__("datetime").datetime.now()) + + monkeypatch.setattr("qa_agent.batch.QAAgent.run", fake_run, raising=True) + cfg = _cfg() + original_output_dir = cfg.output_dir + with BatchRunner(pool_size=2) as runner: + runner.run_all([cfg]) + assert cfg.output_dir == original_output_dir diff --git a/tests/test_concurrency.py b/tests/test_concurrency.py new file mode 100644 index 0000000..c66aa5d --- /dev/null +++ b/tests/test_concurrency.py @@ -0,0 +1,145 @@ +"""Tests for the thread-safe Frontier and PageIndexer primitives.""" + +from __future__ import annotations + +import threading +import time + +from qa_agent.concurrency import Frontier, PageIndexer + + +class TestFrontierBasics: + def test_seed_and_claim_focused(self): + f = Frontier(max_pages=3, max_depth=0) + f.seed(["a", "b", "c"]) + claimed = [] + while True: + item = f.claim() + if item is None: + break + claimed.append(item[0]) + f.complete_one() + assert sorted(claimed) == ["a", "b", "c"] + + def test_dedup_on_seed_and_add(self): + f = Frontier(max_pages=10, max_depth=2) + f.seed(["a", "a"]) + f.add_links(["a", "b"], parent_depth=0) + seen = [] + while True: + item = f.claim() + if item is None: + break + seen.append(item[0]) + f.complete_one() + assert sorted(seen) == ["a", "b"] + + def test_depth_limit_blocks_deeper_links(self): + f = Frontier(max_pages=10, max_depth=1) + f.seed(["root"]) + url, depth = f.claim() + assert depth == 0 + f.add_links(["child"], parent_depth=0) # depth 1 — allowed + f.add_links(["grandchild"], parent_depth=1) # depth 2 — rejected + f.complete_one() + rest = [] + while True: + item = f.claim() + if item is None: + break + rest.append(item[0]) + f.complete_one() + assert rest == ["child"] + + def test_stop_event_halts_claims(self): + stop = threading.Event() + f = Frontier(max_pages=10, max_depth=0, stop_event=stop) + f.seed(["a", "b"]) + stop.set() + assert f.claim() is None + + +class TestFrontierConcurrency: + def test_max_pages_never_exceeded(self): + """Many workers hammering claim/add must not overshoot the budget.""" + f = Frontier(max_pages=50, max_depth=5) + f.seed([f"seed-{i}" for i in range(5)]) + claimed_lock = threading.Lock() + claimed: list[str] = [] + counter = [0] + + def worker(): + while True: + item = f.claim() + if item is None: + return + url, depth = item + with claimed_lock: + claimed.append(url) + # Each page spawns 3 children to keep the frontier busy. + if depth < 5: + with claimed_lock: + n = counter[0] + counter[0] += 3 + f.add_links([f"u-{n}", f"u-{n+1}", f"u-{n+2}"], depth) + f.complete_one() + + threads = [threading.Thread(target=worker) for _ in range(8)] + for t in threads: + t.start() + for t in threads: + t.join(timeout=10) + assert all(not t.is_alive() for t in threads), "workers deadlocked" + assert len(claimed) <= 50 + assert len(claimed) == len(set(claimed)), "a URL was tested twice" + + def test_clean_termination_when_drained_mid_flight(self): + """No deadlock when the queue empties while a worker is still in-flight.""" + f = Frontier(max_pages=10, max_depth=2) + f.seed(["only"]) + + results: list[str] = [] + lock = threading.Lock() + + def worker(): + while True: + item = f.claim() + if item is None: + return + url, depth = item + # Simulate slow work so other workers block in claim() waiting. + time.sleep(0.05) + if url == "only": + f.add_links(["child"], depth) + with lock: + results.append(url) + f.complete_one() + + threads = [threading.Thread(target=worker) for _ in range(4)] + for t in threads: + t.start() + for t in threads: + t.join(timeout=10) + assert all(not t.is_alive() for t in threads), "deadlock on termination" + assert sorted(results) == ["child", "only"] + + +class TestPageIndexer: + def test_unique_indices_across_threads(self): + idx = PageIndexer() + got: list[int] = [] + lock = threading.Lock() + + def worker(): + for _ in range(100): + v = idx.next() + with lock: + got.append(v) + + threads = [threading.Thread(target=worker) for _ in range(8)] + for t in threads: + t.start() + for t in threads: + t.join() + assert len(got) == 800 + assert len(set(got)) == 800, "duplicate page index handed out" diff --git a/tests/web/test_server.py b/tests/web/test_server.py index 5e2769c..fdd8b8e 100644 --- a/tests/web/test_server.py +++ b/tests/web/test_server.py @@ -6,7 +6,7 @@ import queue import sys import threading -from unittest.mock import MagicMock, patch +from unittest.mock import patch import pytest @@ -141,9 +141,7 @@ def test_ai_model_empty_string_becomes_none(self): class TestApiRun: def test_post_happy_path_returns_202(self, client): - with patch("qa_agent.web.server._run_job"), \ - patch("threading.Thread") as mock_thread: - mock_thread.return_value = MagicMock() + with patch("qa_agent.web.server._submit_job"): resp = client.post("/api/run", json={"urls": ["https://example.com"]}) assert resp.status_code == 202 data = resp.get_json() From 81563a1c3c4b4fe7ecb52fafd06ae7b22316e0a4 Mon Sep 17 00:00:00 2001 From: Claude Date: Fri, 5 Jun 2026 00:36:11 +0000 Subject: [PATCH 2/7] Fix mypy 2.x type errors in concurrent agent paths CI runs mypy 2.1.0 (stricter than local 1.x): annotate _launch_browser's playwright param so it returns Browser not Any, cast user-supplied cookies to satisfy add_cookies' SetCookieParam typing, and str() the recording video path. --- qa_agent/agent.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/qa_agent/agent.py b/qa_agent/agent.py index 7329d58..ef48eb1 100644 --- a/qa_agent/agent.py +++ b/qa_agent/agent.py @@ -7,9 +7,10 @@ import time import uuid from datetime import datetime +from typing import Any, cast from urllib.parse import urlparse -from playwright.sync_api import Browser, BrowserContext, Page, sync_playwright +from playwright.sync_api import Browser, BrowserContext, Page, Playwright, sync_playwright from playwright.sync_api import TimeoutError as PlaywrightTimeoutError from .concurrency import Frontier, PageIndexer @@ -241,7 +242,7 @@ def _factory(self): """Return the playwright context-manager factory (real or injected mock).""" return self._playwright_factory if self._playwright_factory is not None else sync_playwright - def _launch_browser(self, playwright) -> Browser: + def _launch_browser(self, playwright: Playwright) -> Browser: """Launch a Chromium browser with the configured options.""" return playwright.chromium.launch(headless=self.config.headless) @@ -299,9 +300,11 @@ def _authenticate(self, page: Page | None = None, context: BrowserContext | None context = context if context is not None else self.context assert context is not None - # Handle cookies (no page needed) + # Handle cookies (no page needed). Cookies are user-supplied dicts that + # match Playwright's SetCookieParam shape at runtime; cast for the typer. if auth.cookies: - context.add_cookies([auth.cookies] if isinstance(auth.cookies, dict) else auth.cookies) + cookies = [auth.cookies] if isinstance(auth.cookies, dict) else auth.cookies + context.add_cookies(cast("Any", cookies)) return # Handle form-based auth @@ -530,7 +533,7 @@ def _capture_recording(self, page: Page) -> None: path = video.path() if path: with self._session_lock: - self._recording_paths.append(path) + self._recording_paths.append(str(path)) except Exception: pass From 229206f605f78c6efce5f27f629d253e42b1f8e1 Mon Sep 17 00:00:00 2001 From: Claude Date: Fri, 5 Jun 2026 01:14:09 +0000 Subject: [PATCH 3/7] feat(web): add workers and pool-size UI controls MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add "Page Workers" number input (1–16) to the Browser Settings section of the run form; collectFormData and applyConfig now include workers so it flows through the existing POST /api/run body → _build_config path. - Add "Server Settings" collapsible card below the run form with a Pool Size input (1–8); index.js fetches the current value from the new GET /api/server-config endpoint on page load and the Apply button PATCHes the new value. - New GET /api/server-config endpoint returns {pool_size, pool_size_max, workers_max}; new PATCH /api/server-config recreates the BatchRunner singleton with the requested pool size (clamped 1–8). - 16 new tests in TestApiServerConfig covering GET fields, PATCH update/clamp/validation, and _build_config workers handling. https://claude.ai/code/session_0159uLk7nyt7cbv4PX1bTe5o --- qa_agent/web/server.py | 31 ++++++++++++ qa_agent/web/static/app.js | 2 + qa_agent/web/static/index.js | 37 ++++++++++++++ qa_agent/web/templates/index.html | 27 ++++++++++ tests/web/test_server.py | 83 +++++++++++++++++++++++++++++++ 5 files changed, 180 insertions(+) diff --git a/qa_agent/web/server.py b/qa_agent/web/server.py index 08395df..75d3890 100644 --- a/qa_agent/web/server.py +++ b/qa_agent/web/server.py @@ -145,7 +145,9 @@ def _detect_structured(self, line: str) -> None: # unbounded thread-per-request model so the server applies backpressure. Size is # env-overridable; remember total browsers ≈ JOB_POOL_SIZE × config.workers. JOB_POOL_SIZE = int(os.environ.get("QA_AGENT_JOB_POOL_SIZE", "4")) +_pool_size = JOB_POOL_SIZE _runner = BatchRunner(pool_size=JOB_POOL_SIZE) +_POOL_SIZE_MAX = 8 def _make_job(job_id: str) -> dict: @@ -344,6 +346,35 @@ def api_stop(job_id: str): return jsonify({"job_id": job_id, "status": "stopping"}) +@app.route("/api/server-config", methods=["GET"]) +def api_server_config_get(): + return jsonify({ + "pool_size": _pool_size, + "pool_size_max": _POOL_SIZE_MAX, + "workers_max": 16, + }) + + +@app.route("/api/server-config", methods=["PATCH"]) +def api_server_config_patch(): + global _runner, _pool_size + body = request.get_json(force=True, silent=True) or {} + if "pool_size" not in body: + return jsonify({"error": "pool_size is required"}), 400 + try: + new_size = max(1, min(_POOL_SIZE_MAX, int(body["pool_size"]))) + except (TypeError, ValueError): + return jsonify({"error": "pool_size must be an integer"}), 400 + + if new_size != _pool_size: + old_runner = _runner + _runner = BatchRunner(pool_size=new_size) + _pool_size = new_size + old_runner.shutdown(wait=False) + + return jsonify({"pool_size": _pool_size}) + + @app.route("/api/jobs") def api_jobs(): """Return all active (non-completed) in-memory jobs.""" diff --git a/qa_agent/web/static/app.js b/qa_agent/web/static/app.js index d38a8b4..ee39a81 100644 --- a/qa_agent/web/static/app.js +++ b/qa_agent/web/static/app.js @@ -114,6 +114,7 @@ function collectFormData(form) { output_formats: outputFormats.length ? outputFormats : ['console', 'markdown', 'json'], output_dir: fd.get('output_dir') || null, headless: !!fd.get('headless'), + workers: parseInt(fd.get('workers') || '1', 10), viewport_width: parseInt(fd.get('viewport_width') || '1280', 10), viewport_height: parseInt(fd.get('viewport_height') || '720', 10), timeout: parseInt(fd.get('timeout') || '30000', 10), @@ -170,6 +171,7 @@ function applyConfig(cfg) { } } setCheck(form, 'headless', cfg.headless !== false); + setNum(form, 'workers', cfg.workers); setNum(form, 'viewport_width', cfg.viewport_width); setNum(form, 'viewport_height', cfg.viewport_height); setNum(form, 'timeout', cfg.timeout); diff --git a/qa_agent/web/static/index.js b/qa_agent/web/static/index.js index 67c2f1b..4470eaf 100644 --- a/qa_agent/web/static/index.js +++ b/qa_agent/web/static/index.js @@ -78,6 +78,43 @@ document.querySelectorAll('input[name="mode"]').forEach(radio => { updateLLMOptions(); })(); +// Pool size — fetch current value and wire up the Apply button +(function () { + const input = document.getElementById('pool-size-input'); + const btn = document.getElementById('update-pool-btn'); + const msg = document.getElementById('pool-size-msg'); + if (!input || !btn || !msg) return; + + fetch('/api/server-config') + .then(r => r.json()) + .then(cfg => { input.value = cfg.pool_size; input.max = cfg.pool_size_max || 8; }) + .catch(() => {}); + + btn.addEventListener('click', async () => { + const newSize = parseInt(input.value, 10); + btn.disabled = true; + msg.textContent = 'Updating…'; + try { + const res = await fetch('/api/server-config', { + method: 'PATCH', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ pool_size: newSize }), + }); + const data = await res.json(); + if (res.ok) { + input.value = data.pool_size; + msg.textContent = `Pool size set to ${data.pool_size}.`; + } else { + msg.textContent = `Error: ${data.error || 'Update failed'}`; + } + } catch (err) { + msg.textContent = `Error: ${err.message}`; + } finally { + btn.disabled = false; + } + }); +})(); + // Load instructions from a local text/markdown file into the textarea document.getElementById('instructions_file')?.addEventListener('change', function () { const file = this.files[0]; diff --git a/qa_agent/web/templates/index.html b/qa_agent/web/templates/index.html index e940583..25c444d 100644 --- a/qa_agent/web/templates/index.html +++ b/qa_agent/web/templates/index.html @@ -84,6 +84,11 @@

Run Test

+
+ + + Concurrent pages per run (1–16) +
@@ -241,6 +246,28 @@

Run Test

+ +
+
+ Server Settings +
+

+ These settings apply to the server process and affect all concurrent runs. +

+
+
+ + + Max simultaneous runs (1–8). Total browsers ≈ pool size × page workers. +
+
+ +
+
+
+
+
+
{% endblock %} {% block scripts %} diff --git a/tests/web/test_server.py b/tests/web/test_server.py index fdd8b8e..a718a93 100644 --- a/tests/web/test_server.py +++ b/tests/web/test_server.py @@ -554,3 +554,86 @@ def fake_import(name, *args, **kwargs): finally: if saved is not None: sys.modules["qa_agent.web.server"] = saved + + +# --------------------------------------------------------------------------- +# /api/server-config +# --------------------------------------------------------------------------- + +class TestApiServerConfig: + def test_get_returns_pool_size(self, client): + resp = client.get("/api/server-config") + assert resp.status_code == 200 + data = resp.get_json() + assert "pool_size" in data + assert isinstance(data["pool_size"], int) + assert data["pool_size"] >= 1 + + def test_get_returns_pool_size_max(self, client): + resp = client.get("/api/server-config") + data = resp.get_json() + assert "pool_size_max" in data + assert data["pool_size_max"] >= 1 + + def test_get_returns_workers_max(self, client): + resp = client.get("/api/server-config") + data = resp.get_json() + assert "workers_max" in data + assert data["workers_max"] == 16 + + def test_patch_updates_pool_size(self, client): + from qa_agent.web import server as srv_mod + original_size = srv_mod._pool_size + try: + new_size = max(1, original_size - 1) if original_size > 1 else 2 + resp = client.patch("/api/server-config", json={"pool_size": new_size}) + assert resp.status_code == 200 + data = resp.get_json() + assert data["pool_size"] == new_size + assert srv_mod._pool_size == new_size + finally: + client.patch("/api/server-config", json={"pool_size": original_size}) + + def test_patch_clamps_to_max(self, client): + from qa_agent.web import server as srv_mod + original_size = srv_mod._pool_size + try: + resp = client.patch("/api/server-config", json={"pool_size": 9999}) + assert resp.status_code == 200 + assert resp.get_json()["pool_size"] <= srv_mod._POOL_SIZE_MAX + finally: + client.patch("/api/server-config", json={"pool_size": original_size}) + + def test_patch_clamps_to_min(self, client): + from qa_agent.web import server as srv_mod + original_size = srv_mod._pool_size + try: + resp = client.patch("/api/server-config", json={"pool_size": 0}) + assert resp.status_code == 200 + assert resp.get_json()["pool_size"] >= 1 + finally: + client.patch("/api/server-config", json={"pool_size": original_size}) + + def test_patch_missing_pool_size_returns_400(self, client): + resp = client.patch("/api/server-config", json={}) + assert resp.status_code == 400 + + def test_patch_invalid_type_returns_400(self, client): + resp = client.patch("/api/server-config", json={"pool_size": "not-a-number"}) + assert resp.status_code == 400 + + def test_build_config_reads_workers(self): + config = _build_config({"urls": ["https://example.com"], "workers": 4}) + assert config.workers == 4 + + def test_build_config_clamps_workers_max(self): + config = _build_config({"urls": ["https://example.com"], "workers": 999}) + assert config.workers == 16 + + def test_build_config_clamps_workers_min(self): + config = _build_config({"urls": ["https://example.com"], "workers": 0}) + assert config.workers == 1 + + def test_build_config_workers_defaults_to_1(self): + config = _build_config({"urls": ["https://example.com"]}) + assert config.workers == 1 From a1b9d430c27336a2b009be1bbd869a3afa0aad5e Mon Sep 17 00:00:00 2001 From: Bill Richards Date: Sat, 13 Jun 2026 17:12:32 -0400 Subject: [PATCH 4/7] docs: add CLAUDE.md with build/test commands and architecture overview Gives Claude Code instances a quick orientation: editable-install requirement for packaging tests, test/lint/build commands, and a summary of the request-flow architecture and extension points. --- CLAUDE.md | 111 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 111 insertions(+) create mode 100644 CLAUDE.md diff --git a/CLAUDE.md b/CLAUDE.md new file mode 100644 index 0000000..f2d4c43 --- /dev/null +++ b/CLAUDE.md @@ -0,0 +1,111 @@ +# CLAUDE.md + +This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository. + +## Setup + +```bash +pip install -e ".[dev,web,pdf]" +playwright install chromium +``` + +The package must be installed (editable is fine) for `python -m qa_agent`, the +`qa-agent`/`qa-agent-web` entry points, and subprocess-based tests (e.g. +`tests/_cli_exit_helper.py`) to find the `qa_agent` module. If packaging tests +fail with `ModuleNotFoundError: No module named 'qa_agent'` or version +mismatches, run `pip install -e .` first — check with `pip show qa-agent`. + +## Commands + +```bash +# Unit tests (fast, no browser) +pytest -v -m "not integration and not network" + +# Single test +pytest tests/test_agent.py::TestClassName::test_name -v + +# Integration tests (real Playwright against local fixture server) +pytest -v -m integration --no-cov + +# Lint / format / type-check +ruff check . +ruff format . +mypy qa_agent + +# Build +rm -rf build/ dist/ && python -m build +``` + +Coverage is enforced at 70% via `--cov-fail-under=70` in `pyproject.toml` +(applies to default `pytest` invocations). Running a small subset of tests +without `--no-cov` will fail on the coverage gate even if the tests +themselves pass — use `-p no:cacheprovider -o addopts=""` or `--no-cov` to +bypass when checking a few tests in isolation. + +Integration tests serve fixtures from `tests/fixtures/test-target/` (a +73-page HTML fixture site driven by `manifest.json`, which is the source of +truth for parametrized integration tests — each entry maps a fixture file to +an expected finding title/category). Start the fixture server manually for +debugging: + +```bash +cd tests/fixtures/test-target && python3 -m http.server 8181 +``` + +## Architecture + +Request flow: `cli.py` parses args into a `TestConfig` (`config.py`) → +if `--instructions`/`--instructions-file` is set, `ai_planner.py` calls +`llm_client.py` (Anthropic/OpenAI via stdlib `urllib`, no SDK deps) to +produce a `TestPlan`, cached on disk by `plan_cache.py` (24h TTL) → +`agent.py` (`QAAgent`) launches Playwright, iterates/crawls target URLs, and +runs each enabled tester from `testers/` against every page, collecting +`Finding` objects → reporters in `reporters/` consume the resulting +`TestSession` and write console/markdown/json/pdf output. + +- **Concurrency**: `concurrency.py` implements page-level worker pools + (`--workers`, max 16) within a single run, and `batch.py` (`BatchRunner`) + runs multiple independent `TestConfig` sessions concurrently with a bounded + pool (`--pool-size`/`--batch-file`, max 8). Total live browsers ≈ + `pool_size × workers`. +- **Rate limiting**: `rate_limiter.py` (`HostRateLimiter`) paces + `page.goto()` navigations per-hostname (`--rate-limit`, default 3 req/s, + `0` disables). One shared instance per `QAAgent` run covers all its + workers; `BatchRunner` can hold a single shared instance passed to every + `QAAgent` it constructs so concurrent batch jobs hitting the same host + share one budget. +- **Testers** (`testers/`) all extend `BaseTester` (`testers/base.py`), + receive a Playwright `Page` + `TestConfig`, and return `list[Finding]`. + `custom.py` runs AI-generated steps from the cached `TestPlan`. + `wcag_compliance.py` is opt-in (`--wcag-compliance`) and excluded from + coverage. +- **Reporters** (`reporters/`) all extend `BaseReporter` and consume a + `TestSession`; JSON is always written regardless of `--output` (web UI + relies on it for session discovery). +- **Web UI** (`web/`): Flask app (`server.py`) with SSE streaming for live + run output; templates/static assets are in `web/templates/` and + `web/static/`. No auth — local/internal use only. +- **Models** (`models.py`): `Finding`, `FindingCategory`, `Severity`, + `PageAnalysis`, `TestSession`, `TestPlan` — the shared data contracts + between testers, the agent, and reporters. + +### Adding a new tester + +1. New module in `testers/` extending `BaseTester`, implement `run() -> + list[Finding]`. +2. Export from `testers/__init__.py`. +3. Add a `test_*` bool to `TestConfig` (`config.py`). +4. Wire into `agent.py` `_test_page()`. +5. Add `--skip-*`/opt-in flag in `cli.py` if needed. +6. Add tests in `tests/testers/`. + +### Severity levels + +`CRITICAL` (security/data loss) · `HIGH` (major usability blockers) · +`MEDIUM` (UX/accessibility) · `LOW` (minor/best-practice) · `INFO`. + +### Exit codes (CLI) + +`0` no critical/high findings · `1` critical/high findings found · `2` error +during run · `130` interrupted (Ctrl+C). Covered by +`tests/test_packaging.py::TestExitCodeSmoke` via `tests/_cli_exit_helper.py`. From f1cc3e11f431217a86dfbb8e06730fae2b46bde6 Mon Sep 17 00:00:00 2001 From: Bill Richards Date: Sat, 13 Jun 2026 17:12:38 -0400 Subject: [PATCH 5/7] feat: add per-host navigation rate limiting Add HostRateLimiter, a thread-safe min-interval-per-host throttle for page.goto() navigations, enabled by default at 3 req/s (config.rate_limit, --rate-limit, 0 disables). One shared limiter covers all workers within a QAAgent run; BatchRunner can hold a single shared limiter passed to every QAAgent it constructs so concurrent batch jobs targeting the same host share a budget. The web server applies the same default (QA_AGENT_RATE_LIMIT). Addresses "too many connections" errors when --workers/--pool-size fan out many concurrent browsers against a single dev/staging target. --- README.md | 9 ++++ qa_agent/agent.py | 25 +++++++++- qa_agent/batch.py | 21 ++++++++- qa_agent/cli.py | 11 ++++- qa_agent/config.py | 19 ++++++++ qa_agent/rate_limiter.py | 43 +++++++++++++++++ qa_agent/web/server.py | 9 +++- tests/test_agent.py | 88 ++++++++++++++++++++++++++++++++++ tests/test_batch.py | 33 +++++++++++++ tests/test_cli.py | 51 ++++++++++++++++++++ tests/test_config.py | 21 +++++++++ tests/test_rate_limiter.py | 97 ++++++++++++++++++++++++++++++++++++++ 12 files changed, 421 insertions(+), 6 deletions(-) create mode 100644 qa_agent/rate_limiter.py create mode 100644 tests/test_rate_limiter.py diff --git a/README.md b/README.md index 22cd4a5..e66d7e1 100644 --- a/README.md +++ b/README.md @@ -289,12 +289,21 @@ qa-agent --batch-file runs.json --pool-size 4 | `--workers N` | `1` | Concurrent page-workers per run (max 16) | | `--batch-file FILE` | — | JSON file of multiple runs to execute concurrently | | `--pool-size N` | `4` | Max concurrent runs for `--batch-file` (max 8) | +| `--rate-limit N` | `3.0` | Max page navigations/sec to any single host (0 = unlimited) | > Total live browsers ≈ `pool-size × workers`, so size both with that > multiplicative cost in mind. The web API accepts the same `workers` value in > the `POST /api/run` body, and the pool size is set server-side via the > `QA_AGENT_JOB_POOL_SIZE` environment variable. +By default, navigations to any single host are throttled to 3 requests/second +across all workers and batch jobs, to avoid overwhelming dev/staging servers +with "too many connections" when running with many concurrent browsers. Raise +or disable this with `--rate-limit` (e.g. `--rate-limit 10` or `--rate-limit 0` +for unlimited). The limit applies only to page navigations (`page.goto()`), not +in-page interactions like clicks or form fills. The web server uses the same +3 req/s default, overridable via the `QA_AGENT_RATE_LIMIT` environment variable. + ### Exploration (explore mode) | Flag | Default | Description | diff --git a/qa_agent/agent.py b/qa_agent/agent.py index ef48eb1..33db531 100644 --- a/qa_agent/agent.py +++ b/qa_agent/agent.py @@ -16,6 +16,7 @@ from .concurrency import Frontier, PageIndexer from .config import OutputFormat, TestConfig, TestMode from .models import Finding, FindingCategory, PageAnalysis, Severity, TestPlan, TestSession +from .rate_limiter import HostRateLimiter from .reporters import ConsoleReporter, JSONReporter, MarkdownReporter, PDFReporter from .reporters.base import BaseReporter from .testers import ( @@ -43,10 +44,21 @@ def _extract_domain(url: str) -> str: return safe or "unknown" +def _hostname(url: str) -> str: + """Return the bare hostname (no port) from a URL, for rate-limiter keys.""" + return urlparse(url).netloc.split(":")[0] + + class QAAgent: """Main QA Agent that orchestrates exploratory testing.""" - def __init__(self, config: TestConfig, playwright_factory=None, worker_thread_init=None): + def __init__( + self, + config: TestConfig, + playwright_factory=None, + worker_thread_init=None, + rate_limiter: HostRateLimiter | None = None, + ): # Deep-copy so per-session output-dir derivation below never mutates the # caller's config — essential when the same TestConfig template is handed # to several concurrent agents (e.g. by BatchRunner). @@ -82,6 +94,15 @@ def __init__(self, config: TestConfig, playwright_factory=None, worker_thread_in self._page_indexer = PageIndexer() # worker-safe screenshot indices self._recording_paths: list[str] = [] # one video per worker context + # Shared per-host navigation rate limiter. One instance per QAAgent run + # so all page-workers throttle against the same per-host budget. + # BatchRunner may pass a shared instance so concurrent sessions hitting + # the same host also share the budget; otherwise each agent builds its + # own from config.rate_limit. + self._rate_limiter = ( + rate_limiter if rate_limiter is not None else HostRateLimiter(config.rate_limit) + ) + # Generate the session ID here so all output paths can be organized # under a session-specific subdirectory before reporters are created. self.session_id = str(uuid.uuid4())[:8] @@ -312,6 +333,7 @@ def _authenticate(self, page: Page | None = None, context: BrowserContext | None assert page is not None if auth.auth_url and auth.username and auth.password: self.console.print_progress(f"Authenticating at {auth.auth_url}") + self._rate_limiter.acquire(_hostname(auth.auth_url)) page.goto(auth.auth_url) ctx = self.config.invocation_context @@ -561,6 +583,7 @@ def _test_page_on(self, page: Page, error_detector: "ErrorDetector", url: str, p try: start_time = time.time() + self._rate_limiter.acquire(_hostname(url)) response = page.goto(url, wait_until="domcontentloaded") if response is None or response.status < 400: page.wait_for_load_state("networkidle", timeout=10000) diff --git a/qa_agent/batch.py b/qa_agent/batch.py index 9284c88..a49cd42 100644 --- a/qa_agent/batch.py +++ b/qa_agent/batch.py @@ -23,6 +23,7 @@ from .agent import QAAgent from .config import TestConfig from .models import TestSession +from .rate_limiter import HostRateLimiter DEFAULT_POOL_SIZE = 4 POOL_SIZE_MAX = 8 @@ -64,11 +65,23 @@ def stop(self) -> None: class BatchRunner: """Run multiple :class:`TestConfig` sessions with bounded concurrency.""" - def __init__(self, pool_size: int = DEFAULT_POOL_SIZE, thread_name_prefix: str = "qa-job"): + def __init__( + self, + pool_size: int = DEFAULT_POOL_SIZE, + thread_name_prefix: str = "qa-job", + rate_limit: float | None = None, + ): self.pool_size = max(1, min(POOL_SIZE_MAX, int(pool_size))) self._executor = ThreadPoolExecutor( max_workers=self.pool_size, thread_name_prefix=thread_name_prefix ) + # Shared per-host rate limiter so concurrent batch jobs targeting the + # same host (e.g. multiple specs against the same dev server) share + # one navigation budget rather than each getting an independent + # allowance. None → each QAAgent builds its own from config.rate_limit. + self._rate_limiter: HostRateLimiter | None = ( + HostRateLimiter(rate_limit) if rate_limit is not None else None + ) def submit( self, @@ -80,7 +93,11 @@ def submit( ) -> BatchJob: """Submit one session to the pool and return its :class:`BatchJob`.""" stop_event = stop_event if stop_event is not None else threading.Event() - agent = QAAgent(config, worker_thread_init=worker_thread_init) + agent = QAAgent( + config, + worker_thread_init=worker_thread_init, + rate_limiter=self._rate_limiter, + ) agent.stop_event = stop_event domain = "" diff --git a/qa_agent/cli.py b/qa_agent/cli.py index 8f21aad..eb8be1a 100644 --- a/qa_agent/cli.py +++ b/qa_agent/cli.py @@ -138,6 +138,14 @@ def main(): default=4, help="Max concurrent runs when using --batch-file (default: 4, max: 8).", ) + parser.add_argument( + "--rate-limit", + type=float, + default=3.0, + help="Max page navigations per second to any single host (default: 3.0). " + "Shared across all workers and batch jobs targeting that host, to " + "avoid overwhelming dev/staging servers. Set to 0 to disable.", + ) parser.add_argument( "--same-domain", action="store_true", @@ -415,6 +423,7 @@ def main(): ai_model=args.ai_model or None, use_plan_cache=not args.no_cache, workers=args.workers, + rate_limit=args.rate_limit, invocation_context="cli", ) @@ -495,7 +504,7 @@ def _run_batch(args, template: TestConfig) -> None: configs.append(cfg) print(f"Running {len(configs)} sessions (pool size {args.pool_size})…") - runner = BatchRunner(pool_size=args.pool_size) + runner = BatchRunner(pool_size=args.pool_size, rate_limit=args.rate_limit) try: results = runner.run_all(configs) finally: diff --git a/qa_agent/config.py b/qa_agent/config.py index fb51374..63874e8 100644 --- a/qa_agent/config.py +++ b/qa_agent/config.py @@ -112,6 +112,17 @@ class TestConfig: # Hard ceiling on concurrent page-workers to bound browser RAM/CPU. WORKERS_MAX = 16 + # Max page navigations per second to any single host (page.goto() only). + # Shared across all workers/batch jobs targeting that host, to avoid + # overwhelming fragile dev/staging servers with "too many connections" + # when --workers / --batch-file fan out many concurrent browsers. + # 0 disables throttling entirely. + rate_limit: float = 3.0 + + # Ceiling on rate_limit to prevent runaway configs from disabling + # effective throttling via an absurdly high rate. + RATE_LIMIT_MAX = 50.0 + def __post_init__(self) -> None: # Clamp worker count to [1, WORKERS_MAX]. Defensive against bad input # from CLI flags or web request bodies. @@ -120,3 +131,11 @@ def __post_init__(self) -> None: except (TypeError, ValueError): workers = 1 self.workers = max(1, min(self.WORKERS_MAX, workers)) + + # Clamp rate_limit to [0, RATE_LIMIT_MAX]. 0 (or negative) means + # "unlimited" and is preserved as exactly 0.0 rather than floored up. + try: + rate_limit = float(self.rate_limit) + except (TypeError, ValueError): + rate_limit = 3.0 + self.rate_limit = 0.0 if rate_limit <= 0 else min(self.RATE_LIMIT_MAX, rate_limit) diff --git a/qa_agent/rate_limiter.py b/qa_agent/rate_limiter.py new file mode 100644 index 0000000..654c149 --- /dev/null +++ b/qa_agent/rate_limiter.py @@ -0,0 +1,43 @@ +"""Thread-safe per-host request rate limiting. + +Used to pace outgoing ``page.goto()`` navigations against a single host so +concurrent page-workers / batch sessions don't overwhelm a target server with +"too many connections" bursts. +""" + +from __future__ import annotations + +import threading +import time + + +class HostRateLimiter: + """Enforces a minimum interval between navigations to the same hostname. + + ``requests_per_second <= 0`` disables throttling entirely; ``acquire()`` + then becomes a no-op with no locking overhead. + """ + + def __init__(self, requests_per_second: float) -> None: + self._rate = float(requests_per_second) + self._min_interval = (1.0 / self._rate) if self._rate > 0 else 0.0 + self._lock = threading.Lock() + self._last_request: dict[str, float] = {} + + @property + def enabled(self) -> bool: + return self._rate > 0 + + def acquire(self, hostname: str) -> None: + """Block until it is safe to issue the next request to ``hostname``.""" + if not self.enabled: + return + while True: + with self._lock: + now = time.monotonic() + last = self._last_request.get(hostname) + if last is None or (now - last) >= self._min_interval: + self._last_request[hostname] = now + return + wait = self._min_interval - (now - last) + time.sleep(wait) diff --git a/qa_agent/web/server.py b/qa_agent/web/server.py index 75d3890..779feec 100644 --- a/qa_agent/web/server.py +++ b/qa_agent/web/server.py @@ -146,7 +146,11 @@ def _detect_structured(self, line: str) -> None: # env-overridable; remember total browsers ≈ JOB_POOL_SIZE × config.workers. JOB_POOL_SIZE = int(os.environ.get("QA_AGENT_JOB_POOL_SIZE", "4")) _pool_size = JOB_POOL_SIZE -_runner = BatchRunner(pool_size=JOB_POOL_SIZE) + +# Shared per-host navigation rate limit (req/s) across all jobs in the pool, +# mirroring the CLI's --rate-limit default. Set QA_AGENT_RATE_LIMIT=0 to disable. +DEFAULT_RATE_LIMIT = float(os.environ.get("QA_AGENT_RATE_LIMIT", "3.0")) +_runner = BatchRunner(pool_size=JOB_POOL_SIZE, rate_limit=DEFAULT_RATE_LIMIT) _POOL_SIZE_MAX = 8 @@ -368,7 +372,7 @@ def api_server_config_patch(): if new_size != _pool_size: old_runner = _runner - _runner = BatchRunner(pool_size=new_size) + _runner = BatchRunner(pool_size=new_size, rate_limit=DEFAULT_RATE_LIMIT) _pool_size = new_size old_runner.shutdown(wait=False) @@ -764,6 +768,7 @@ def _build_config(body: dict) -> TestConfig: ai_model=body.get("ai_model") or None, use_plan_cache=bool(body.get("use_plan_cache", True)), workers=max(1, min(16, int(body.get("workers", 1)))), + rate_limit=float(body.get("rate_limit", 3.0)), auth=auth, screenshots=ScreenshotConfig( enabled=ss_enabled, diff --git a/tests/test_agent.py b/tests/test_agent.py index ac32f1e..7148471 100644 --- a/tests/test_agent.py +++ b/tests/test_agent.py @@ -12,6 +12,7 @@ from qa_agent.agent import QAAgent, _extract_domain from qa_agent.config import AuthConfig, OutputFormat, TestConfig, TestMode from qa_agent.models import Finding, FindingCategory, Severity +from qa_agent.rate_limiter import HostRateLimiter from tests.conftest import make_mock_playwright_factory, make_multi_mock_playwright_factory # --------------------------------------------------------------------------- @@ -716,3 +717,90 @@ def test_stop_event_halts_workers(self): p.stop() assert len(session.pages_tested) < 10 + + +# --------------------------------------------------------------------------- +# Per-host navigation rate limiting +# --------------------------------------------------------------------------- + +class TestRateLimiting: + def _patch_testers(self): + from unittest.mock import patch as _patch + targets = [ + "qa_agent.agent.KeyboardTester.run", + "qa_agent.agent.MouseTester.run", + "qa_agent.agent.FormTester.run", + "qa_agent.agent.AccessibilityTester.run", + "qa_agent.agent.ErrorDetector.run", + "qa_agent.agent.ErrorDetector.attach_listeners", + "qa_agent.agent.ErrorDetector.get_summary", + ] + return [_patch(t, return_value=[]) for t in targets] + + def test_default_config_has_enabled_rate_limiter(self): + agent, _page = _make_agent() + assert agent._rate_limiter.enabled is True + + def test_zero_rate_limit_disables_limiter(self): + config = _make_config(rate_limit=0) + agent, _page = _make_agent(config) + assert agent._rate_limiter.enabled is False + + def test_explicit_rate_limiter_is_used_as_is(self): + config = _make_config() + shared = HostRateLimiter(5.0) + factory, page, _, _ = make_mock_playwright_factory() + page.evaluate.return_value = { + "interactive_elements": 0, + "forms_count": 0, + "links_count": 0, + "images_count": 0, + } + agent1 = QAAgent(config, playwright_factory=factory, rate_limiter=shared) + agent2 = QAAgent(config, playwright_factory=factory, rate_limiter=shared) + assert agent1._rate_limiter is shared + assert agent2._rate_limiter is shared + + def test_navigation_acquires_rate_limiter_per_page(self): + # High rate keeps the per-host min-interval negligible so this test + # doesn't add real sleep time while still exercising acquire(). + config = _make_config(urls=["https://example.com/a", "https://example.com/b"], rate_limit=1000) + agent, page = _make_agent(config) + + patchers = self._patch_testers() + for p in patchers: + p.start() + try: + with patch.object( + agent._rate_limiter, "acquire", wraps=agent._rate_limiter.acquire + ) as spy: + session = agent.run() + finally: + for p in patchers: + p.stop() + + assert spy.call_count == len(session.pages_tested) + spy.assert_any_call("example.com") + + def test_multi_worker_pages_share_one_limiter(self): + config = _make_config( + urls=["https://example.com/a", "https://example.com/b", "https://example.com/c"], + workers=3, + rate_limit=1000, + ) + factory, _pages = make_multi_mock_playwright_factory() + agent = QAAgent(config, playwright_factory=factory) + + patchers = self._patch_testers() + for p in patchers: + p.start() + try: + with patch.object( + agent._rate_limiter, "acquire", wraps=agent._rate_limiter.acquire + ) as spy: + session = agent.run() + finally: + for p in patchers: + p.stop() + + assert spy.call_count == len(session.pages_tested) == 3 diff --git a/tests/test_batch.py b/tests/test_batch.py index a2cdb16..a886a6d 100644 --- a/tests/test_batch.py +++ b/tests/test_batch.py @@ -84,3 +84,36 @@ def fake_run(self): with BatchRunner(pool_size=2) as runner: runner.run_all([cfg]) assert cfg.output_dir == original_output_dir + + +class TestBatchRunnerRateLimiter: + def _fake_run(self): + return TestSession(session_id="s", start_time=__import__("datetime").datetime.now()) + + def test_no_rate_limit_means_each_agent_builds_its_own(self, monkeypatch): + monkeypatch.setattr("qa_agent.batch.QAAgent.run", self._fake_run, raising=True) + with BatchRunner(pool_size=2) as runner: + job1 = runner.submit(_cfg("https://example.com/a")) + job2 = runner.submit(_cfg("https://example.com/b")) + job1.result(timeout=5) + job2.result(timeout=5) + assert job1.agent._rate_limiter is not job2.agent._rate_limiter + assert job1.agent._rate_limiter.enabled is True # default config.rate_limit + + def test_shared_rate_limit_is_passed_to_every_agent(self, monkeypatch): + monkeypatch.setattr("qa_agent.batch.QAAgent.run", self._fake_run, raising=True) + with BatchRunner(pool_size=2, rate_limit=5.0) as runner: + job1 = runner.submit(_cfg("https://example.com/a")) + job2 = runner.submit(_cfg("https://example.com/b")) + job1.result(timeout=5) + job2.result(timeout=5) + assert job1.agent._rate_limiter is runner._rate_limiter + assert job2.agent._rate_limiter is runner._rate_limiter + assert runner._rate_limiter.enabled is True + + def test_shared_rate_limit_zero_disables_throttling(self, monkeypatch): + monkeypatch.setattr("qa_agent.batch.QAAgent.run", self._fake_run, raising=True) + with BatchRunner(pool_size=2, rate_limit=0) as runner: + job = runner.submit(_cfg()) + job.result(timeout=5) + assert job.agent._rate_limiter.enabled is False diff --git a/tests/test_cli.py b/tests/test_cli.py index a88a4eb..6af7bbc 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -139,6 +139,57 @@ def fake_init(self, config, **kwargs): assert captured_config["config"].mode == TestMode.EXPLORE + def test_default_rate_limit(self, monkeypatch): + captured_config = {} + + def fake_init(self, config, **kwargs): + captured_config["config"] = config + + with patch("qa_agent.cli.QAAgent.__init__", fake_init), \ + patch("qa_agent.cli.QAAgent.run", return_value=_fake_session()): + monkeypatch.setattr(sys, "argv", ["qa-agent", "https://example.com"]) + import qa_agent.cli as cli_mod + try: + cli_mod.main() + except SystemExit: + pass + + assert captured_config["config"].rate_limit == 3.0 + + def test_rate_limit_flag(self, monkeypatch): + captured_config = {} + + def fake_init(self, config, **kwargs): + captured_config["config"] = config + + with patch("qa_agent.cli.QAAgent.__init__", fake_init), \ + patch("qa_agent.cli.QAAgent.run", return_value=_fake_session()): + monkeypatch.setattr(sys, "argv", ["qa-agent", "--rate-limit", "10", "https://example.com"]) + import qa_agent.cli as cli_mod + try: + cli_mod.main() + except SystemExit: + pass + + assert captured_config["config"].rate_limit == 10.0 + + def test_rate_limit_zero_disables(self, monkeypatch): + captured_config = {} + + def fake_init(self, config, **kwargs): + captured_config["config"] = config + + with patch("qa_agent.cli.QAAgent.__init__", fake_init), \ + patch("qa_agent.cli.QAAgent.run", return_value=_fake_session()): + monkeypatch.setattr(sys, "argv", ["qa-agent", "--rate-limit", "0", "https://example.com"]) + import qa_agent.cli as cli_mod + try: + cli_mod.main() + except SystemExit: + pass + + assert captured_config["config"].rate_limit == 0.0 + def test_json_always_appended(self, monkeypatch): """JSON format must be in output_formats even when not explicitly requested.""" from qa_agent.config import OutputFormat diff --git a/tests/test_config.py b/tests/test_config.py index daed24e..3cf323e 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -121,6 +121,27 @@ def test_defaults(self): assert rc.video_size == {"width": 1280, "height": 720} +class TestRateLimit: + def test_default_rate_limit(self): + assert TestConfig().rate_limit == 3.0 + + def test_zero_stays_disabled(self): + assert TestConfig(rate_limit=0).rate_limit == 0.0 + + def test_negative_normalizes_to_zero(self): + assert TestConfig(rate_limit=-5).rate_limit == 0.0 + + def test_value_within_range_preserved(self): + assert TestConfig(rate_limit=10.0).rate_limit == 10.0 + + def test_value_above_max_is_clamped(self): + cfg = TestConfig(rate_limit=999) + assert cfg.rate_limit == cfg.RATE_LIMIT_MAX + + def test_non_numeric_falls_back_to_default(self): + assert TestConfig(rate_limit="not-a-number").rate_limit == 3.0 + + class TestTestMode: def test_values(self): assert TestMode.FOCUSED.value == "focused" diff --git a/tests/test_rate_limiter.py b/tests/test_rate_limiter.py new file mode 100644 index 0000000..beef9a0 --- /dev/null +++ b/tests/test_rate_limiter.py @@ -0,0 +1,97 @@ +"""Tests for qa_agent/rate_limiter.py — per-host navigation throttling.""" + +from __future__ import annotations + +import threading + +from qa_agent.rate_limiter import HostRateLimiter + + +class TestDisabled: + def test_zero_rate_is_disabled(self): + assert HostRateLimiter(0).enabled is False + + def test_negative_rate_is_disabled(self): + assert HostRateLimiter(-1).enabled is False + + def test_disabled_acquire_never_sleeps(self, monkeypatch): + sleep_calls = [] + monkeypatch.setattr("qa_agent.rate_limiter.time.sleep", lambda s: sleep_calls.append(s)) + limiter = HostRateLimiter(0) + limiter.acquire("example.com") + limiter.acquire("example.com") + assert sleep_calls == [] + + +class TestEnabled: + def test_positive_rate_is_enabled(self): + assert HostRateLimiter(3.0).enabled is True + + def test_first_request_no_wait(self, monkeypatch): + monkeypatch.setattr("qa_agent.rate_limiter.time.monotonic", lambda: 100.0) + sleep_calls = [] + monkeypatch.setattr("qa_agent.rate_limiter.time.sleep", lambda s: sleep_calls.append(s)) + + limiter = HostRateLimiter(2.0) # min interval 0.5s + limiter.acquire("example.com") + assert sleep_calls == [] + + def test_second_request_within_interval_sleeps(self, monkeypatch): + clock = [100.0] + monkeypatch.setattr("qa_agent.rate_limiter.time.monotonic", lambda: clock[0]) + + sleep_calls = [] + + def fake_sleep(seconds): + sleep_calls.append(seconds) + clock[0] += seconds + + monkeypatch.setattr("qa_agent.rate_limiter.time.sleep", fake_sleep) + + limiter = HostRateLimiter(2.0) # min interval 0.5s + limiter.acquire("example.com") # t=100.0, no wait + limiter.acquire("example.com") # still t=100.0 -> must wait 0.5s + assert sleep_calls == [0.5] + + def test_request_after_interval_does_not_sleep(self, monkeypatch): + clock = [100.0] + monkeypatch.setattr("qa_agent.rate_limiter.time.monotonic", lambda: clock[0]) + sleep_calls = [] + monkeypatch.setattr("qa_agent.rate_limiter.time.sleep", lambda s: sleep_calls.append(s)) + + limiter = HostRateLimiter(2.0) # min interval 0.5s + limiter.acquire("example.com") # t=100.0 + clock[0] = 100.5 + limiter.acquire("example.com") # exactly one interval later -> no wait + assert sleep_calls == [] + + def test_different_hosts_have_independent_budgets(self, monkeypatch): + monkeypatch.setattr("qa_agent.rate_limiter.time.monotonic", lambda: 100.0) + sleep_calls = [] + monkeypatch.setattr("qa_agent.rate_limiter.time.sleep", lambda s: sleep_calls.append(s)) + + limiter = HostRateLimiter(2.0) + limiter.acquire("a.example.com") + limiter.acquire("b.example.com") + assert sleep_calls == [] + + +class TestThreadSafety: + def test_concurrent_acquires_complete_without_error(self): + """A high (but non-zero) rate keeps this fast while exercising the lock.""" + limiter = HostRateLimiter(1000.0) # 1ms min interval + results: list[int] = [] + lock = threading.Lock() + + def worker(): + limiter.acquire("example.com") + with lock: + results.append(1) + + threads = [threading.Thread(target=worker) for _ in range(10)] + for t in threads: + t.start() + for t in threads: + t.join(timeout=5) + + assert len(results) == 10 From 42beffb2a9e867191e27ec0190feea1092c024bf Mon Sep 17 00:00:00 2001 From: Bill Richards Date: Sat, 13 Jun 2026 17:27:43 -0400 Subject: [PATCH 6/7] Surface worker_thread_init failures and document server pool assumptions Log/report instead of silently swallowing worker_thread_init exceptions in both the page-worker and batch-pool paths, and document the single-interpreter assumption behind the web server's global pool/rate limiter. Co-Authored-By: Claude Sonnet 4.6 --- qa_agent/agent.py | 4 ++-- qa_agent/batch.py | 5 ++++- qa_agent/web/server.py | 5 +++++ 3 files changed, 11 insertions(+), 3 deletions(-) diff --git a/qa_agent/agent.py b/qa_agent/agent.py index 33db531..3d0f23b 100644 --- a/qa_agent/agent.py +++ b/qa_agent/agent.py @@ -508,8 +508,8 @@ def _worker_loop(self, frontier: Frontier, storage_state): if self._worker_thread_init is not None: try: self._worker_thread_init() - except Exception: - pass + except Exception as exc: + self.console.print_error(f"worker_thread_init failed: {exc}") with self._factory()() as playwright: browser = self._launch_browser(playwright) diff --git a/qa_agent/batch.py b/qa_agent/batch.py index a49cd42..324e9fa 100644 --- a/qa_agent/batch.py +++ b/qa_agent/batch.py @@ -15,6 +15,7 @@ from __future__ import annotations +import logging import threading import uuid from concurrent.futures import Future, ThreadPoolExecutor @@ -25,6 +26,8 @@ from .models import TestSession from .rate_limiter import HostRateLimiter +logger = logging.getLogger(__name__) + DEFAULT_POOL_SIZE = 4 POOL_SIZE_MAX = 8 @@ -123,7 +126,7 @@ def _task() -> TestSession: try: worker_thread_init() except Exception: - pass + logger.warning("worker_thread_init failed for job %s", job.job_id, exc_info=True) return agent.run() job.future = self._executor.submit(_task) diff --git a/qa_agent/web/server.py b/qa_agent/web/server.py index 779feec..3b00775 100644 --- a/qa_agent/web/server.py +++ b/qa_agent/web/server.py @@ -145,6 +145,11 @@ def _detect_structured(self, line: str) -> None: # unbounded thread-per-request model so the server applies backpressure. Size is # env-overridable; remember total browsers ≈ JOB_POOL_SIZE × config.workers. JOB_POOL_SIZE = int(os.environ.get("QA_AGENT_JOB_POOL_SIZE", "4")) + +# These globals assume a single interpreter (Flask dev server or gunicorn +# --workers=1). Multi-process WSGI deployments are not supported; each process +# would get its own pool and rate limiter, defeating the pool-size limit and +# per-host rate budget. _pool_size = JOB_POOL_SIZE # Shared per-host navigation rate limit (req/s) across all jobs in the pool, From d943b19082f8c76f9a3a07e3ab647f828f321ce2 Mon Sep 17 00:00:00 2001 From: Bill Richards Date: Sat, 13 Jun 2026 17:37:20 -0400 Subject: [PATCH 7/7] Fix duplicate log lines for progress/finding events in web UI _QueueWriter emitted a generic 'log' SSE event for every line AND a 'progress'/'finding' event for Testing:/[SEVERITY] lines, causing run.js to render those lines twice in the live log. Co-Authored-By: Claude Sonnet 4.6 --- qa_agent/web/server.py | 16 ++++++++++++---- tests/web/test_server.py | 12 ++++++++++++ 2 files changed, 24 insertions(+), 4 deletions(-) diff --git a/qa_agent/web/server.py b/qa_agent/web/server.py index 3b00775..45a04d8 100644 --- a/qa_agent/web/server.py +++ b/qa_agent/web/server.py @@ -107,9 +107,8 @@ def write(self, text: str) -> int: while "\n" in self._buf: line, self._buf = self._buf.split("\n", 1) clean = _ANSI_RE.sub("", line) - if clean.strip(): + if clean.strip() and not self._detect_structured(clean): self._emit("log", {"message": clean}) - self._detect_structured(clean) return len(text) def flush(self) -> None: @@ -120,12 +119,18 @@ def _emit(self, event_type: str, data: dict) -> None: self._events.append(msg) self._q.put(msg) - def _detect_structured(self, line: str) -> None: + def _detect_structured(self, line: str) -> bool: + """Emit a structured event for recognized lines. + + Returns ``True`` if a "progress"/"finding" event was emitted in place + of the generic "log" event, so the caller doesn't double-render the + same line via both handlers in run.js. + """ # Page start: "Testing: " m = re.search(r"Testing:\s+(https?://\S+)", line) if m: self._emit("progress", {"url": m.group(1), "message": line.strip()}) - return + return True # Finding: "[SEVERITY] " m = re.search(r"\[(CRITICAL|HIGH|MEDIUM|LOW|INFO)\]\s+(.+)", line, re.IGNORECASE) @@ -134,6 +139,9 @@ def _detect_structured(self, line: str) -> None: "severity": m.group(1).lower(), "title": m.group(2).strip(), }) + return True + + return False # ── Job management ───────────────────────────────────────────────────────────── diff --git a/tests/web/test_server.py b/tests/web/test_server.py index a718a93..f045477 100644 --- a/tests/web/test_server.py +++ b/tests/web/test_server.py @@ -386,6 +386,12 @@ def test_testing_url_emits_progress_event(self): progress = next(e for e in events if e["type"] == "progress") assert "https://example.com" in progress["data"]["url"] + def test_testing_url_does_not_also_emit_log_event(self): + """Avoid duplicate lines in the UI: progress lines aren't also logged.""" + writer, q, events = self._writer() + writer.write("Testing: https://example.com\n") + assert not any(e["type"] == "log" for e in events) + def test_critical_finding_emits_finding_event(self): writer, q, events = self._writer() writer.write("[CRITICAL] Something is very broken\n") @@ -393,6 +399,12 @@ def test_critical_finding_emits_finding_event(self): finding = next(e for e in events if e["type"] == "finding") assert finding["data"]["severity"] == "critical" + def test_finding_line_does_not_also_emit_log_event(self): + """Avoid duplicate lines in the UI: finding lines aren't also logged.""" + writer, q, events = self._writer() + writer.write("[CRITICAL] Something is very broken\n") + assert not any(e["type"] == "log" for e in events) + def test_high_finding_emits_finding_event(self): writer, q, events = self._writer() writer.write("[HIGH] A high severity issue\n")