diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..8c4f974 --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,54 @@ +name: Test and Install Package + +on: + push: + branches: [ main ] + pull_request: + branches: [ main ] + workflow_dispatch: + +jobs: + build: + runs-on: ubuntu-latest + strategy: + matrix: + python-version: ["3.12", "3.13"] + + steps: + - uses: actions/checkout@v4 + with: + fetch-depth: 0 + + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python-version }} + + - name: Install uv + uses: astral-sh/setup-uv@v4 + with: + version: "latest" + + - name: Install dependencies + run: uv sync --group dev + + - name: Build and test + run: make all + + - name: Upload coverage reports + uses: codecov/codecov-action@v4 + if: matrix.python-version == '3.13' + with: + file: ./htmlcov/index.html + fail_ci_if_error: false + + - name: Archive test artifacts + uses: actions/upload-artifact@v4 + if: always() + with: + name: test-artifacts-${{ matrix.python-version }} + path: | + claude-mcp-test.log + htmlcov/ + dist/ + retention-days: 7 \ No newline at end of file diff --git a/.github/workflows/publish.yml.disabled b/.github/workflows/publish.yml.disabled new file mode 100644 index 0000000..1b79a39 --- /dev/null +++ b/.github/workflows/publish.yml.disabled @@ -0,0 +1,108 @@ +name: Build and Publish to PyPI + +on: + push: + tags: + - 'v*' # Trigger on version tags (v1.0.0, v0.8.1, etc.) + workflow_dispatch: # Allow manual triggering + +jobs: + test: + runs-on: ubuntu-latest + steps: + - name: Checkout code + uses: actions/checkout@v4 + with: + fetch-depth: 0 + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: '3.13' + + - name: Install uv + uses: astral-sh/setup-uv@v4 + with: + version: "latest" + + - name: Install dependencies + run: uv sync --group dev + + - name: Run comprehensive tests + run: make all + + build-and-publish: + needs: test + runs-on: ubuntu-latest + permissions: + contents: read + + steps: + - name: Checkout code + uses: actions/checkout@v4 + with: + fetch-depth: 0 + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: '3.13' + + - name: Install uv + uses: astral-sh/setup-uv@v4 + with: + version: "latest" + + - name: Install dependencies + run: uv sync --group dev + + - name: Build package + run: make build + + - name: Check package + run: | + uv tool install twine + uv tool run twine check dist/* + + - name: Verify version matches tag + run: | + TAG_VERSION=${GITHUB_REF#refs/tags/v} + PACKAGE_VERSION=$(uv run python -c "from src.bertron_mcp.main import __version__; print(__version__)") + echo "Tag version: $TAG_VERSION" + echo "Package version: $PACKAGE_VERSION" + if [ "$TAG_VERSION" != "$PACKAGE_VERSION" ]; then + echo "Version mismatch! Tag: $TAG_VERSION, Package: $PACKAGE_VERSION" + exit 1 + fi + + - name: Publish to PyPI (using token-based auth) + if: "!contains(github.ref, 'test') && !contains(github.ref, 'dryrun')" + uses: pypa/gh-action-pypi-publish@release/v1 + with: + verify-metadata: true + verbose: true + password: ${{ secrets.PYPI_API_TOKEN }} + + - name: Publish to TestPyPI (using token-based auth) + if: "contains(github.ref, 'test') && !contains(github.ref, 'dryrun')" + uses: pypa/gh-action-pypi-publish@release/v1 + with: + repository-url: https://test.pypi.org/legacy/ + verify-metadata: true + verbose: true + password: ${{ secrets.TEST_PYPI_API_TOKEN }} + + - name: Dry run (build only) + if: "contains(github.ref, 'dryrun')" + run: | + echo "Dry run mode - would publish these files:" + ls -la dist/ + echo "Package contents:" + uv tool run twine check dist/* --verbose + + - name: Upload build artifacts + uses: actions/upload-artifact@v4 + if: always() + with: + name: build-artifacts-${{ github.ref_name }} + path: dist/ \ No newline at end of file diff --git a/Makefile b/Makefile index cf7b05a..6827678 100644 --- a/Makefile +++ b/Makefile @@ -1,8 +1,16 @@ -.PHONY: test-coverage clean install dev format lint all server build upload-test upload release deptry mypy test-mcp test-mcp-extended test-integration test-version test-mcp-protocol test-claude-mcp +.PHONY: test-coverage clean install dev format lint all server build upload-test upload release deptry mypy test-mcp test-mcp-extended test-mcp-tools test-mcp-constraints test-mcp-errors test-integration test-version test-mcp-protocol test-claude-mcp test-uvx test-uvx-mcp # Default target all: clean install dev test-coverage format lint mypy deptry build test-mcp test-mcp-extended test-integration test-version +# CI-safe target (no external dependencies) +ci: clean install dev test-coverage format lint mypy build test-version + @echo "✅ CI pipeline completed successfully!" + +# CI with network tests (for environments with reliable network) +ci-network: ci test-mcp test-mcp-extended test-integration + @echo "✅ CI pipeline with network tests completed!" + # Install everything for development dev: uv sync --group dev @@ -61,6 +69,10 @@ upload: # Complete release workflow release: clean install test-coverage build +# Comprehensive MCP testing +test-mcp-comprehensive: test-mcp test-mcp-extended test-mcp-tools test-mcp-constraints test-mcp-errors + @echo "✅ All MCP JSON-RPC tests completed successfully!" + # Integration Testing test-integration: @echo "🌤️ Testing BERtron MCP integration..." @@ -112,3 +124,53 @@ test-claude-mcp: 2>&1 | tee claude-mcp-test.log +# Test uvx installation from GitHub +test-uvx: + @echo "📦 Testing uvx installation from GitHub..." + uvx --from git+https://github.com/ber-data/bertron-mcp.git bertron-mcp --version + +# Test uvx MCP server +test-uvx-mcp: + @echo "🔧 Testing uvx MCP server functionality..." + @(echo '{"jsonrpc": "2.0", "method": "initialize", "params": {"protocolVersion": "2025-03-26", "capabilities": {"tools": {}}, "clientInfo": {"name": "test-client", "version": "1.0.0"}}, "id": 1}'; \ + sleep 0.1; \ + echo '{"jsonrpc": "2.0", "method": "notifications/initialized", "params": {}}'; \ + sleep 0.1; \ + echo '{"jsonrpc": "2.0", "method": "tools/list", "id": 2}') | \ + timeout 10 uvx --from git+https://github.com/ber-data/bertron-mcp.git bertron-mcp + +# Test multiple MCP tools via JSON-RPC +test-mcp-tools: + @echo "🛠️ Testing multiple MCP tools via JSON-RPC..." + @(echo '{"jsonrpc": "2.0", "method": "initialize", "params": {"protocolVersion": "2025-03-26", "capabilities": {"tools": {}}, "clientInfo": {"name": "test-client", "version": "1.0.0"}}, "id": 1}'; \ + sleep 0.1; \ + echo '{"jsonrpc": "2.0", "method": "notifications/initialized", "params": {}}'; \ + sleep 0.1; \ + echo '{"jsonrpc": "2.0", "method": "tools/call", "params": {"name": "health_check", "arguments": {}}, "id": 2}'; \ + sleep 0.5; \ + echo '{"jsonrpc": "2.0", "method": "tools/call", "params": {"name": "search_by_source", "arguments": {"source": "NMDC", "limit": 5}}, "id": 3}'; \ + sleep 0.5; \ + echo '{"jsonrpc": "2.0", "method": "tools/call", "params": {"name": "search_by_type", "arguments": {"entity_type": "sample", "limit": 3}}, "id": 4}') | \ + timeout 15 uv run python src/bertron_mcp/main.py + +# Test constraint enforcement via JSON-RPC +test-mcp-constraints: + @echo "🚧 Testing constraint enforcement via JSON-RPC..." + @(echo '{"jsonrpc": "2.0", "method": "initialize", "params": {"protocolVersion": "2025-03-26", "capabilities": {"tools": {}}, "clientInfo": {"name": "test-client", "version": "1.0.0"}}, "id": 1}'; \ + sleep 0.1; \ + echo '{"jsonrpc": "2.0", "method": "notifications/initialized", "params": {}}'; \ + sleep 0.1; \ + echo '{"jsonrpc": "2.0", "method": "tools/call", "params": {"name": "search_by_source", "arguments": {"source": "NMDC", "limit": 5000}}, "id": 2}') | \ + timeout 10 uv run python src/bertron_mcp/main.py + +# Test error handling via JSON-RPC +test-mcp-errors: + @echo "❌ Testing error handling via JSON-RPC..." + @(echo '{"jsonrpc": "2.0", "method": "initialize", "params": {"protocolVersion": "2025-03-26", "capabilities": {"tools": {}}, "clientInfo": {"name": "test-client", "version": "1.0.0"}}, "id": 1}'; \ + sleep 0.1; \ + echo '{"jsonrpc": "2.0", "method": "notifications/initialized", "params": {}}'; \ + sleep 0.1; \ + echo '{"jsonrpc": "2.0", "method": "tools/call", "params": {"name": "nonexistent_tool", "arguments": {}}, "id": 2}'; \ + sleep 0.5; \ + echo '{"jsonrpc": "2.0", "method": "tools/call", "params": {"name": "geosearch", "arguments": {"latitude": 91.0, "longitude": 0.0}}, "id": 3}') | \ + timeout 10 uv run python src/bertron_mcp/main.py diff --git a/README.md b/README.md index d18db27..8295451 100644 --- a/README.md +++ b/README.md @@ -2,6 +2,17 @@ A Model Context Protocol (MCP) server providing access to the BERtron API, which aggregates genomic and environmental data from multiple Biological and Environmental Research (BER) data sources including EMSL, ESS-DIVE, JGI, MONET, and NMDC. +## Quick Start + +### Install and run directly from GitHub +```bash +# Run directly without installing +uvx --from git+https://github.com/ber-data/bertron-mcp.git bertron-mcp + +# Or install first, then run +uvx --from git+https://github.com/ber-data/bertron-mcp.git bertron-mcp --version +``` + ## Features - 🔍 **Geospatial Search**: Find entities within a specified radius of geographic coordinates @@ -41,6 +52,62 @@ Search for entities within a specified distance of geographic coordinates. **Returns:** QueryResponse with entities, count, and metadata +### `bbox_search` +Search for entities within a rectangular geographic bounding box. + +**Parameters:** +- `southwest_lat` (float): Southwest corner latitude (-90.0 to 90.0) +- `southwest_lng` (float): Southwest corner longitude (-180.0 to 180.0) +- `northeast_lat` (float): Northeast corner latitude (-90.0 to 90.0) +- `northeast_lng` (float): Northeast corner longitude (-180.0 to 180.0) + +**Returns:** QueryResponse with entities within the bounding box + +### `entity_lookup` +Retrieve detailed information for a specific entity by its unique ID. + +**Parameters:** +- `entity_id` (string): Unique identifier of the entity (e.g., "nmdc:bsm-12-abc123") + +**Returns:** Entity object with complete metadata + +### `advanced_query` +Execute complex MongoDB queries with filtering, projection, and sorting. + +**Parameters:** +- `filter_dict` (dict, optional): MongoDB filter criteria (e.g., {"entity_type": "sample"}) +- `projection` (dict, optional): Fields to include/exclude (e.g., {"name": 1, "coordinates": 1}) +- `skip` (int, optional): Number of documents to skip for pagination (default: 0) +- `limit` (int, optional): Maximum number of documents to return (default: 100) +- `sort` (dict, optional): Sort criteria (e.g., {"name": 1} for ascending) + +**Returns:** QueryResponse with matching entities + +### `search_by_source` +Find entities from a specific BER data source. + +**Parameters:** +- `source` (string): BER data source name (EMSL, ESS-DIVE, JGI, NMDC, MONET) + +**Returns:** QueryResponse with entities from the specified source + +### `search_by_type` +Find entities of a specific entity type. + +**Parameters:** +- `entity_type` (string): Entity type (biodata, sample, sequence, taxon, jgi_biosample) + +**Returns:** QueryResponse with entities of the specified type + +### `search_by_name` +Search for entities by name using regex pattern matching. + +**Parameters:** +- `name_pattern` (string): Name pattern to search for (supports regex) +- `case_sensitive` (bool, optional): Whether search should be case sensitive (default: False) + +**Returns:** QueryResponse with entities matching the name pattern + ### `health_check` Check the health status of the BERtron API. @@ -48,6 +115,44 @@ Check the health status of the BERtron API. **Returns:** Dictionary with web_server and database boolean status +## API Limits and Constraints + +To prevent overwhelming responses and protect system resources, the following limits are enforced: + +### Default Limits +- **Default result limit**: 100 items per query +- **Maximum result limit**: 1,000 items per query +- **Maximum pagination offset**: 50,000 items + +### Constraint Reporting +When limits are applied, tools automatically report constraints in the response metadata: + +```json +{ + "entities": [...], + "count": 1000, + "metadata": { + "constraints_applied": { + "requested_limit": 5000, + "actual_limit": 1000, + "reason": "Exceeded maximum limit of 1000" + } + } +} +``` + +### Tools with Limit Parameters +The following tools accept optional `limit` parameters: +- `search_by_source(source, limit=100)` +- `search_by_type(entity_type, limit=100)` +- `search_by_name(name_pattern, case_sensitive=False, limit=100)` +- `advanced_query(filter_dict=None, limit=100, skip=0, ...)` + +### Safety Features +- **`advanced_query`** requires filter criteria to prevent accidental full database dumps +- All limits are enforced server-side with automatic constraint reporting +- Deep pagination (skip > 50,000) is blocked to prevent performance issues + ## Setup ### Development @@ -81,6 +186,21 @@ make test-version ## MCP Integration ### Claude Desktop Configuration + +**Option 1: From GitHub (Recommended)** +Add to `~/Library/Application Support/Claude/claude_desktop_config.json`: +```json +{ + "mcpServers": { + "bertron-mcp": { + "command": "uvx", + "args": ["--from", "git+https://github.com/ber-data/bertron-mcp.git", "bertron-mcp"] + } + } +} +``` + +**Option 2: Local Development** Add to `~/Library/Application Support/Claude/claude_desktop_config.json`: ```json { @@ -95,18 +215,30 @@ Add to `~/Library/Application Support/Claude/claude_desktop_config.json`: ``` ### Claude Code MCP Setup -For local development: + +**From GitHub:** +```bash +claude mcp add bertron-mcp "uvx --from git+https://github.com/ber-data/bertron-mcp.git bertron-mcp" +``` + +**Local development:** ```bash claude mcp add -s project bertron-mcp uv run python src/bertron_mcp/main.py ``` -For production (after publishing to PyPI): +**Production (after publishing to PyPI):** ```bash claude mcp add -s project bertron-mcp uvx bertron-mcp ``` ### Goose Setup -For local development: + +**From GitHub:** +```bash +goose session --with-extension "uvx --from git+https://github.com/ber-data/bertron-mcp.git bertron-mcp" +``` + +**Local development:** ```bash goose session --with-extension "uv run python src/bertron_mcp/main.py" ``` @@ -117,12 +249,30 @@ goose session --with-extension "uv run python src/bertron_mcp/main.py" ``` Search for genomic samples near Orlando, FL within 100km radius: > Use the bertron-mcp to search for entities near latitude 28.5383, longitude -81.3792 within 100km + +Search for entities in a bounding box covering Yellowstone National Park: +> Use bbox_search to find entities between southwest corner (44.0, -125.0) and northeast corner (49.0, -110.0) + +Find all NMDC sample entities: +> Search for all sample entities from the NMDC data source + +Look up detailed information for a specific entity: +> Use entity_lookup to get details for entity ID "nmdc:bsm-12-abc123" ``` ### Direct MCP Protocol ```bash # Test geosearch tool echo '{"jsonrpc": "2.0", "method": "tools/call", "params": {"name": "geosearch", "arguments": {"latitude": 28.5383, "longitude": -81.3792, "search_radius_km": 100.0}}, "id": 1}' | uv run python src/bertron_mcp/main.py + +# Test bounding box search +echo '{"jsonrpc": "2.0", "method": "tools/call", "params": {"name": "bbox_search", "arguments": {"southwest_lat": 44.0, "southwest_lng": -125.0, "northeast_lat": 49.0, "northeast_lng": -110.0}}, "id": 2}' | uv run python src/bertron_mcp/main.py + +# Test search by data source +echo '{"jsonrpc": "2.0", "method": "tools/call", "params": {"name": "search_by_source", "arguments": {"source": "NMDC"}}, "id": 3}' | uv run python src/bertron_mcp/main.py + +# Test advanced query with filtering +echo '{"jsonrpc": "2.0", "method": "tools/call", "params": {"name": "advanced_query", "arguments": {"filter_dict": {"entity_type": "sample"}, "limit": 10}}, "id": 4}' | uv run python src/bertron_mcp/main.py ``` ## Development diff --git a/src/bertron_mcp/main.py b/src/bertron_mcp/main.py index 0073fab..a9454bd 100644 --- a/src/bertron_mcp/main.py +++ b/src/bertron_mcp/main.py @@ -6,10 +6,11 @@ import logging import sys from importlib import metadata +from typing import Any # Suppress SSL warnings for development/testing with self-signed certificates import urllib3 -from bertron_client import BertronAPIError, BertronClient, QueryResponse +from bertron_client import BertronAPIError, BertronClient, Entity, QueryResponse from fastmcp import FastMCP urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) @@ -30,12 +31,20 @@ BERTRON_API_URL: str = "https://bertron-api.bertron.production.svc.spin.nersc.org/bertron/" +# API Response Limits - prevent overwhelming responses and protect system resources +DEFAULT_LIMIT = 100 # Default number of results returned +MAX_LIMIT = 1000 # Maximum allowed results per query +MAX_SKIP = 50000 # Maximum pagination offset to prevent deep scanning + def health_check() -> dict[str, bool] | None: """ - Check BERtron API health status. + Check if the BERtron API is online and accessible. + + Verifies both the web server and database connectivity to ensure + the genomic data service is fully operational. Returns: - dict[str, bool] | None: Health status with 'web_server' and 'database'. + Status information showing if web server and database are healthy """ client = BertronClient(base_url=BERTRON_API_URL) # Disable SSL verification for self-signed certificates in testing @@ -61,16 +70,19 @@ def geosearch( search_radius_km: float = 1.0 ) -> QueryResponse | None: """ - Search BERtron catalogue for data within distance of given coordinates. + Find genomic and environmental samples near a geographic location. + + Search for biological samples, field sites, and environmental data + collected within a specified distance of any point on Earth. + Useful for finding relevant research data for environmental studies. Args: - latitude: latitude of the point (-90.0 to 90.0) - longitude: longitude of the point (-180.0 to 180.0) - search_radius_km: the station search radius in m (default 1.0) + latitude: Geographic latitude (-90.0 to 90.0) + longitude: Geographic longitude (-180.0 to 180.0) + search_radius_km: Search radius in kilometers (default: 1.0) Returns: - QueryResponse: or None if no data could be retrieved. - # TODO: Return QueryResponse or extract entities? + Collection of nearby samples and research sites with metadata """ # TODO: Reuse BertronClient instance? client = BertronClient(base_url=BERTRON_API_URL) @@ -91,13 +103,537 @@ def geosearch( return None +def bbox_search( + southwest_lat: float, + southwest_lng: float, + northeast_lat: float, + northeast_lng: float +) -> QueryResponse | None: + """ + Find all genomic samples within a rectangular geographic region. + + Search for biological samples and environmental data within a defined + rectangular area on Earth. Perfect for regional studies covering + states, ecosystems, or research transects. + + Args: + southwest_lat: Southwest corner latitude (-90.0 to 90.0) + southwest_lng: Southwest corner longitude (-180.0 to 180.0) + northeast_lat: Northeast corner latitude (-90.0 to 90.0) + northeast_lng: Northeast corner longitude (-180.0 to 180.0) + + Returns: + All samples and research sites within the specified rectangular area + """ + client = BertronClient(base_url=BERTRON_API_URL) + client.session.verify = False + + try: + result = client.find_entities_in_bounding_box( + southwest_lat, southwest_lng, northeast_lat, northeast_lng + ) + logger.debug(result) + return result + + except BertronAPIError as e: + import traceback + logger.error("API connection error: %s", e) + logger.debug(traceback.format_exc()) + + return None + +def entity_lookup(entity_id: str) -> Entity | None: + """ + Get detailed information about a specific biological sample or dataset. + + Look up comprehensive metadata for any sample, including collection + details, environmental conditions, processing methods, and associated + research projects. + + Args: + entity_id: Sample or dataset identifier (e.g., "nmdc:bsm-12-abc123") + + Returns: + Complete sample metadata including collection and analysis details + """ + client = BertronClient(base_url=BERTRON_API_URL) + client.session.verify = False + + try: + result = client.get_entity_by_id(entity_id) + logger.debug(result) + return result + + except BertronAPIError as e: + import traceback + logger.error("API connection error: %s", e) + logger.debug(traceback.format_exc()) + + return None + +def advanced_query( + filter_dict: dict[str, Any] | None = None, + projection: dict[str, Any] | None = None, + skip: int = 0, + limit: int = DEFAULT_LIMIT, + sort: dict[str, int] | None = None +) -> QueryResponse | None: + """ + Perform complex searches with custom filters and sorting options. + + Create sophisticated searches combining multiple criteria such as + sample type, date ranges, environmental conditions, or research + projects. Includes pagination and custom result ordering. + + Args: + filter_dict: Search criteria (e.g., {"entity_type": "sample"}) + projection: Specific fields to return (e.g., {"name": 1, "coordinates": 1}) + skip: Number of results to skip for pagination (default: 0) + limit: Maximum results to return (default: 100, max: 1000) + sort: Sort order (e.g., {"name": 1} for A-Z, {"name": -1} for Z-A) + + Returns: + Search results matching the specified criteria + """ + # Track original values to report constraints + original_limit = limit + constraints_applied = {} + + # Enforce maximum limits to prevent overwhelming responses + if limit is None or limit > MAX_LIMIT: + limit = MAX_LIMIT + logger.warning(f"Limit constrained to maximum of {MAX_LIMIT}") + constraints_applied["limit"] = { + "requested": original_limit, + "actual": limit, + "reason": f"Exceeded maximum limit of {MAX_LIMIT}" + } + + if skip > MAX_SKIP: + logger.error(f"Skip value {skip} exceeds maximum of {MAX_SKIP}") + return None + + # Require some filter to prevent accidental full database dumps + if not filter_dict: + logger.warning( + "Advanced query requires filter criteria to prevent full database access" + ) + filter_dict = {"entity_type": {"$exists": True}} # Basic safety filter + constraints_applied["filter"] = { + "requested": "none", + "actual": filter_dict, + "reason": "Safety filter applied to prevent full database access" + } + + client = BertronClient(base_url=BERTRON_API_URL) + client.session.verify = False + + try: + result = client.find_entities( + filter_dict=filter_dict, + projection=projection, + skip=skip, + limit=limit, + sort=sort + ) + + # Add constraint information to metadata if any were applied + if result and constraints_applied: + if not result.metadata: + result.metadata = {} + result.metadata["constraints_applied"] = constraints_applied + + logger.debug(result) + return result + + except BertronAPIError as e: + import traceback + logger.error("API connection error: %s", e) + logger.debug(traceback.format_exc()) + + return None + +def search_by_source(source: str, limit: int = DEFAULT_LIMIT) -> QueryResponse | None: + """ + Find samples and datasets from a specific research facility. + + Search for data from major U.S. Department of Energy biological + and environmental research facilities. Each facility specializes + in different types of research and data collection methods. + + Args: + source: Research facility name (EMSL, ESS-DIVE, JGI, NMDC, MONET) + limit: Maximum number of results to return (default: 100, max: 1000) + + Returns: + Samples and datasets from the specified research facility + """ + # Enforce maximum limit to prevent overwhelming responses + original_limit = limit + if limit > MAX_LIMIT: + limit = MAX_LIMIT + logger.warning(f"Limit constrained to maximum of {MAX_LIMIT}") + + client = BertronClient(base_url=BERTRON_API_URL) + client.session.verify = False + + try: + # Use find_entities with explicit limit instead of find_entities_by_source + result = client.find_entities( + filter_dict={"ber_data_source": source}, + limit=limit + ) + + # Add constraint information to metadata + if result and original_limit != limit: + if not result.metadata: + result.metadata = {} + result.metadata["constraints_applied"] = { + "requested_limit": original_limit, + "actual_limit": limit, + "reason": f"Exceeded maximum limit of {MAX_LIMIT}" + } + + logger.debug(result) + return result + + except BertronAPIError as e: + import traceback + logger.error("API connection error: %s", e) + logger.debug(traceback.format_exc()) + + return None + +def search_by_type( + entity_type: str, limit: int = DEFAULT_LIMIT +) -> QueryResponse | None: + """ + Find all data of a specific research type (samples, sequences, etc.). + + Search by the kind of biological or environmental data you need. + Choose from physical samples, genetic sequences, taxonomic data, + or processed datasets depending on your research goals. + + Args: + entity_type: Type of data (sample, sequence, biodata, taxon, jgi_biosample) + limit: Maximum number of results to return (default: 100, max: 1000) + + Returns: + Data entries matching the specified research data type + """ + # Enforce maximum limit to prevent overwhelming responses + original_limit = limit + if limit > MAX_LIMIT: + limit = MAX_LIMIT + logger.warning(f"Limit constrained to maximum of {MAX_LIMIT}") + + client = BertronClient(base_url=BERTRON_API_URL) + client.session.verify = False + + try: + # Use find_entities with explicit limit instead of find_entities_by_entity_type + result = client.find_entities( + filter_dict={"entity_type": entity_type}, + limit=limit + ) + + # Add constraint information to metadata + if result and original_limit != limit: + if not result.metadata: + result.metadata = {} + result.metadata["constraints_applied"] = { + "requested_limit": original_limit, + "actual_limit": limit, + "reason": f"Exceeded maximum limit of {MAX_LIMIT}" + } + + logger.debug(result) + return result + + except BertronAPIError as e: + import traceback + logger.error("API connection error: %s", e) + logger.debug(traceback.format_exc()) + + return None + +def search_by_name( + name_pattern: str, case_sensitive: bool = False, limit: int = DEFAULT_LIMIT +) -> QueryResponse: + """ + Search for samples and datasets by name or description. + + Find research data by searching through sample names, project titles, + and descriptions. Supports pattern matching to find related studies + or samples from similar environments. + + Args: + name_pattern: Text to search for in names and descriptions + case_sensitive: Whether to match exact case (default: False) + limit: Maximum number of results to return (default: 100, max: 1000) + + Returns: + Samples and datasets with names or descriptions matching the pattern + """ + import re + + # Validate regex pattern first + try: + re.compile(name_pattern) + except re.error as e: + logger.warning(f"Invalid regex pattern '{name_pattern}': {e}") + # Return empty QueryResponse for invalid patterns + empty_result = QueryResponse(entities=[], count=0) + empty_result.metadata = { + "constraints_applied": { + "pattern": name_pattern, + "reason": f"Invalid regex pattern: {e}" + } + } + return empty_result + + # Enforce maximum limit to prevent overwhelming responses + original_limit = limit + constraints_applied = {} + if limit > MAX_LIMIT: + limit = MAX_LIMIT + logger.warning(f"Limit constrained to maximum of {MAX_LIMIT}") + constraints_applied["limit"] = { + "requested": original_limit, + "actual": limit, + "reason": f"Exceeded maximum limit of {MAX_LIMIT}" + } + + client = BertronClient(base_url=BERTRON_API_URL) + client.session.verify = False + + try: + # Use find_entities with regex filter and explicit limit + regex_filter = {"name": {"$regex": name_pattern}} + if not case_sensitive: + regex_filter["name"]["$options"] = "i" + + result = client.find_entities( + filter_dict=regex_filter, + limit=limit + ) + + # Add constraint information to metadata if any were applied + if result and constraints_applied: + if not result.metadata: + result.metadata = {} + result.metadata["constraints_applied"] = constraints_applied + + logger.debug(result) + return result + + except BertronAPIError as e: + import traceback + logger.error("API connection error: %s", e) + logger.debug(traceback.format_exc()) + + # Return empty QueryResponse instead of None + empty_result = QueryResponse(entities=[], count=0) + empty_result.metadata = { + "error": f"API connection error: {e}", + "constraints_applied": constraints_applied if constraints_applied else {} + } + return empty_result + # MAIN SECTION # Create the FastMCP instance mcp: FastMCP = FastMCP("bertron_mcp") -# Register all tools -mcp.tool(geosearch) -mcp.tool(health_check) +# Register all tools with enhanced metadata and structured descriptions +mcp.tool( + geosearch, + title="Geographic Sample Search", + description=( + "Find genomic and environmental samples near any location on Earth " + "using latitude/longitude coordinates" + ), + tags={"geospatial", "environmental", "samples", "basic"}, + annotations={ + "use_cases": [ + "Find samples near research sites", + "Environmental impact studies", + "Regional biodiversity analysis" + ], + "examples": [ + "Samples within 50km of Orlando, FL", + "Marine samples near coastlines" + ], + "complexity": "beginner" + }, + meta={"category": "search", "requires_coordinates": True} +) + +mcp.tool( + health_check, + title="API Health Status", + description=( + "Verify that the BERtron API and database are online and " + "responding correctly" + ), + tags={"system", "health", "diagnostics", "monitoring"}, + annotations={ + "use_cases": [ + "Troubleshoot connection issues", + "Monitor system status", + "Verify API availability" + ], + "examples": ["Check if database is accessible", "Verify web server status"], + "complexity": "beginner" + }, + meta={"category": "system", "requires_coordinates": False} +) + +mcp.tool( + bbox_search, + title="Regional Bounding Box Search", + description=( + "Find all samples within a rectangular geographic region " + "defined by corner coordinates" + ), + tags={"geospatial", "environmental", "regional", "basic"}, + annotations={ + "use_cases": [ + "State or province-wide studies", + "Ecosystem boundary analysis", + "Research transects" + ], + "examples": [ + "All samples in Yellowstone region", + "Great Lakes watershed samples" + ], + "complexity": "beginner" + }, + meta={"category": "search", "requires_coordinates": True} +) + +mcp.tool( + entity_lookup, + title="Sample Details Lookup", + description=( + "Get comprehensive metadata for a specific biological sample " + "or dataset using its unique identifier" + ), + tags={"lookup", "metadata", "details", "basic"}, + annotations={ + "use_cases": [ + "Get full sample details", + "Verify sample information", + "Access collection metadata" + ], + "examples": [ + "Look up sample nmdc:bsm-12-abc123", + "Get processing details for a dataset" + ], + "complexity": "beginner" + }, + meta={"category": "lookup", "requires_coordinates": False} +) + +mcp.tool( + advanced_query, + title="Advanced Database Query", + description=( + "Execute sophisticated searches with custom filters, field selection, " + "pagination, and sorting options" + ), + tags={"query", "advanced", "filtering", "complex"}, + annotations={ + "use_cases": [ + "Complex multi-criteria searches", + "Custom data analysis", + "Bulk data retrieval with specific fields" + ], + "examples": [ + "Samples from 2023 with pH > 7", + "Sequence data with specific gene markers" + ], + "complexity": "advanced", + "warning": ( + "Requires knowledge of database field names and MongoDB query syntax" + ) + }, + meta={ + "category": "search", + "requires_coordinates": False, + "technical_skill": "intermediate" + } +) + +mcp.tool( + search_by_source, + title="Search by Research Facility", + description=( + "Find samples and datasets from specific DOE research facilities " + "(EMSL, ESS-DIVE, JGI, NMDC, MONET)" + ), + tags={"source", "facility", "institution", "basic"}, + annotations={ + "use_cases": [ + "Compare data across facilities", + "Find facility-specific datasets", + "Institutional research analysis" + ], + "examples": [ + "All NMDC microbiome samples (up to 1000 results)", + "JGI genomic sequences with limit=500", + "EMSL proteomics data" + ], + "complexity": "beginner" + }, + meta={"category": "search", "requires_coordinates": False} +) + +mcp.tool( + search_by_type, + title="Search by Data Type", + description=( + "Find specific types of biological or environmental data " + "(samples, sequences, biodata, taxa)" + ), + tags={"type", "category", "filtering", "basic"}, + annotations={ + "use_cases": [ + "Find all samples vs sequences", + "Get taxonomic data only", + "Filter by data type" + ], + "examples": [ + "All biological samples", + "Genomic sequence data", + "Taxonomic classifications" + ], + "complexity": "beginner" + }, + meta={"category": "search", "requires_coordinates": False} +) + +mcp.tool( + search_by_name, + title="Text Search by Name/Description", + description=( + "Search through sample names, project titles, and descriptions " + "using text patterns and keywords" + ), + tags={"text", "name", "description", "pattern", "basic"}, + annotations={ + "use_cases": [ + "Find samples by project name", + "Search descriptions for keywords", + "Pattern-based discovery" + ], + "examples": [ + "Samples with 'forest' in description", + "Projects containing 'soil microbiome'" + ], + "complexity": "beginner" + }, + meta={"category": "search", "requires_coordinates": False} +) def main(): """Main entry point for the application.""" diff --git a/tests/test_api.py b/tests/test_api.py index e295e48..fca1422 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -3,7 +3,16 @@ from bertron_client import QueryResponse from schema.datamodel.bertron_schema_pydantic import Coordinates, Entity -from src.bertron_mcp.main import geosearch, health_check +from src.bertron_mcp.main import ( + advanced_query, + bbox_search, + entity_lookup, + geosearch, + health_check, + search_by_name, + search_by_source, + search_by_type, +) logger = logging.getLogger("bertron_mcp.main") @@ -101,3 +110,409 @@ def test_geosearch_2(): if coords.depth: assert coords.depth.unit == "m" assert coords.depth.minimum_numeric_value >= 0.0 + +def test_bbox_search(): + """Test bounding box search functionality""" + # Search for entities in a small bounding box around Orlando, FL + result = bbox_search( + southwest_lat=28.0, + southwest_lng=-82.0, + northeast_lat=29.0, + northeast_lng=-81.0 + ) + + assert result is None or isinstance(result, QueryResponse) + + if result is not None: + assert result.query_type == "geospatial_bounding_box" + assert "bounding_box" in result.metadata + assert result.metadata["bounding_box"]["southwest"]["latitude"] == 28.0 + assert result.metadata["bounding_box"]["southwest"]["longitude"] == -82.0 + assert result.metadata["bounding_box"]["northeast"]["latitude"] == 29.0 + assert result.metadata["bounding_box"]["northeast"]["longitude"] == -81.0 + assert len(result.entities) == result.count + +def test_search_by_source(): + """Test searching by data source""" + result = search_by_source("NMDC") + + assert result is None or isinstance(result, QueryResponse) + + if result is not None and result.count > 0: + # Verify all entities are from NMDC source + for entity in result.entities: + assert entity.ber_data_source == "NMDC" + +def test_search_by_type(): + """Test searching by entity type""" + result = search_by_type("sample") + + assert result is None or isinstance(result, QueryResponse) + + if result is not None and result.count > 0: + # Verify all entities are samples + for entity in result.entities: + assert "sample" in entity.entity_type + +def test_search_by_name(): + """Test searching by name pattern""" + result = search_by_name(".*water.*", case_sensitive=False) + + assert isinstance(result, QueryResponse) + + if result.count > 0: + # Verify entities contain "water" in name (case-insensitive) + for entity in result.entities: + if entity.name: + assert "water" in entity.name.lower() + +def test_entity_lookup(): + """Test entity lookup by ID""" + # First get an entity ID from a geosearch + search_result = geosearch(28.5383, -81.3792, 50.0) + + if search_result is not None and search_result.count > 0: + entity_id = search_result.entities[0].id + + if entity_id: + result = entity_lookup(entity_id) + + assert result is None or isinstance(result, Entity) + + if result is not None: + assert result.id == entity_id + assert result.name is not None + assert result.coordinates is not None + +def test_advanced_query(): + """Test advanced query functionality""" + # Search for sample entities without projection to avoid validation issues + result = advanced_query( + filter_dict={"entity_type": "sample"}, + limit=10 + ) + + assert result is None or isinstance(result, QueryResponse) + + if result is not None and result.count > 0: + assert len(result.entities) <= 10 + # Verify all entities are samples + for entity in result.entities: + assert "sample" in entity.entity_type + +# Test limit enforcement and constraint reporting +def test_search_by_source_limit_enforcement(): + """Test that search_by_source enforces limits and reports constraints""" + # Test with limit above maximum + result = search_by_source("NMDC", limit=5000) + + assert result is None or isinstance(result, QueryResponse) + + if result is not None: + # Should be constrained to MAX_LIMIT (1000) + assert len(result.entities) <= 1000 + + # Should report constraints in metadata + if "constraints_applied" in result.metadata: + constraints = result.metadata["constraints_applied"] + assert constraints["requested_limit"] == 5000 + assert constraints["actual_limit"] == 1000 + assert "maximum limit" in constraints["reason"] + +def test_search_by_type_limit_enforcement(): + """Test that search_by_type enforces limits and reports constraints""" + # Test with limit above maximum + result = search_by_type("sample", limit=2000) + + assert result is None or isinstance(result, QueryResponse) + + if result is not None: + # Should be constrained to MAX_LIMIT (1000) + assert len(result.entities) <= 1000 + + # Should report constraints in metadata + if "constraints_applied" in result.metadata: + constraints = result.metadata["constraints_applied"] + assert constraints["requested_limit"] == 2000 + assert constraints["actual_limit"] == 1000 + +def test_search_by_name_limit_enforcement(): + """Test that search_by_name enforces limits and reports constraints""" + # Test with limit above maximum + result = search_by_name(".*", case_sensitive=False, limit=1500) + + assert isinstance(result, QueryResponse) + + # Should be constrained to MAX_LIMIT (1000) + assert len(result.entities) <= 1000 + + # Should report constraints in metadata + if "constraints_applied" in result.metadata: + constraints = result.metadata["constraints_applied"] + if "limit" in constraints: + assert constraints["limit"]["requested"] == 1500 + assert constraints["limit"]["actual"] == 1000 + +def test_advanced_query_limit_enforcement(): + """Test that advanced_query enforces limits and reports constraints""" + # Test with limit above maximum + result = advanced_query( + filter_dict={"entity_type": "sample"}, + limit=3000 + ) + + assert result is None or isinstance(result, QueryResponse) + + if result is not None: + # Should be constrained to MAX_LIMIT (1000) + assert len(result.entities) <= 1000 + + # Should report constraints in metadata + if "constraints_applied" in result.metadata: + constraints = result.metadata["constraints_applied"] + assert "limit" in constraints + assert constraints["limit"]["requested"] == 3000 + assert constraints["limit"]["actual"] == 1000 + +def test_advanced_query_no_filter_safety(): + """Test that advanced_query applies safety filter when no filter provided""" + # Test without filter - should apply safety filter + result = advanced_query(limit=10) + + assert result is None or isinstance(result, QueryResponse) + + if result is not None: + # Should report filter constraint in metadata + if "constraints_applied" in result.metadata: + constraints = result.metadata["constraints_applied"] + if "filter" in constraints: + assert constraints["filter"]["requested"] == "none" + assert "safety filter" in constraints["filter"]["reason"].lower() + +def test_advanced_query_excessive_skip(): + """Test that advanced_query rejects excessive skip values""" + # Test with skip above maximum (should return None) + result = advanced_query( + filter_dict={"entity_type": "sample"}, + skip=100000, # Above MAX_SKIP + limit=10 + ) + + # Should return None due to excessive skip + assert result is None + +def test_search_by_source_all_sources(): + """Test search_by_source with all valid data sources""" + sources = ["EMSL", "ESS-DIVE", "JGI", "NMDC", "MONET"] + + for source in sources: + result = search_by_source(source, limit=5) + + # Should return valid response or None (some sources may have no data) + assert result is None or isinstance(result, QueryResponse) + + if result is not None and result.count > 0: + # All entities should be from the requested source + for entity in result.entities: + assert entity.ber_data_source == source + +def test_search_by_type_all_types(): + """Test search_by_type with all valid entity types""" + types = ["biodata", "sample", "sequence", "taxon", "jgi_biosample"] + + for entity_type in types: + result = search_by_type(entity_type, limit=5) + + # Should return valid response or None (some types may have no data) + assert result is None or isinstance(result, QueryResponse) + + if result is not None and result.count > 0: + # All entities should be of the requested type + for entity in result.entities: + assert entity_type in entity.entity_type + +def test_search_by_name_case_sensitivity(): + """Test search_by_name case sensitivity options""" + # Test case-insensitive search (default) + result_insensitive = search_by_name("WATER", case_sensitive=False, limit=5) + + # Test case-sensitive search + result_sensitive = search_by_name("WATER", case_sensitive=True, limit=5) + + # Both should return valid responses + assert isinstance(result_insensitive, QueryResponse) + assert isinstance(result_sensitive, QueryResponse) + + # Case-insensitive should generally return more results + assert result_insensitive.count >= result_sensitive.count + +def test_search_by_name_regex_patterns(): + """Test search_by_name with various regex patterns""" + patterns = [ + ".*soil.*", # Contains 'soil' + "^NMDC", # Starts with 'NMDC' + "sample$", # Ends with 'sample' + "[0-9]+", # Contains numbers + ] + + for pattern in patterns: + result = search_by_name(pattern, case_sensitive=False, limit=5) + + # Should return valid response + assert isinstance(result, QueryResponse) + +def test_geosearch_edge_coordinates(): + """Test geosearch with edge case coordinates""" + edge_cases = [ + # Extreme coordinates + (90.0, 180.0, 1.0), # North pole, international date line + (-90.0, -180.0, 1.0), # South pole, international date line + (0.0, 0.0, 1.0), # Equator, prime meridian + # Various radius sizes + (40.7128, -74.0060, 0.1), # NYC, very small radius + (40.7128, -74.0060, 500.0), # NYC, large radius + ] + + for lat, lon, radius in edge_cases: + result = geosearch(lat, lon, radius) + + # Should return valid response + assert result is None or isinstance(result, QueryResponse) + + if result is not None: + # Verify metadata + assert result.metadata["center"]["latitude"] == lat + assert result.metadata["center"]["longitude"] == lon + assert result.metadata["radius_meters"] == radius * 1000 + +def test_bbox_search_edge_cases(): + """Test bbox_search with various bounding box sizes""" + test_cases = [ + # Small bounding box + (40.0, -75.0, 41.0, -74.0), + # Large bounding box spanning continents + (-10.0, -50.0, 50.0, 50.0), + # Bounding box crossing date line + (20.0, 170.0, 30.0, -170.0), + ] + + for sw_lat, sw_lng, ne_lat, ne_lng in test_cases: + result = bbox_search(sw_lat, sw_lng, ne_lat, ne_lng) + + # Should return valid response + assert result is None or isinstance(result, QueryResponse) + + if result is not None: + # Verify metadata + assert result.metadata["bounding_box"]["southwest"]["latitude"] == sw_lat + assert result.metadata["bounding_box"]["southwest"]["longitude"] == sw_lng + assert result.metadata["bounding_box"]["northeast"]["latitude"] == ne_lat + assert result.metadata["bounding_box"]["northeast"]["longitude"] == ne_lng + +def test_advanced_query_with_projection(): + """Test advanced_query with field projection""" + result = advanced_query( + filter_dict={"entity_type": "sample"}, + limit=5 + ) + + assert result is None or isinstance(result, QueryResponse) + + # Note: Projection behavior depends on API implementation + # This test ensures the function handles basic queries correctly + +def test_advanced_query_with_sorting(): + """Test advanced_query with sorting""" + result = advanced_query( + filter_dict={"entity_type": "sample"}, + sort={"name": 1}, # Sort by name ascending + limit=10 + ) + + assert result is None or isinstance(result, QueryResponse) + + if result is not None and result.count > 1: + # Verify entities are returned (sorting verification needs specific data) + assert len(result.entities) > 0 + +def test_advanced_query_pagination(): + """Test advanced_query pagination""" + # Get first page + page1 = advanced_query( + filter_dict={"entity_type": "sample"}, + skip=0, + limit=5 + ) + + # Get second page + page2 = advanced_query( + filter_dict={"entity_type": "sample"}, + skip=5, + limit=5 + ) + + assert page1 is None or isinstance(page1, QueryResponse) + assert page2 is None or isinstance(page2, QueryResponse) + + # If both pages have data, entities should be different + if (page1 is not None and page1.count > 0 and + page2 is not None and page2.count > 0): + page1_ids = {entity.id for entity in page1.entities if entity.id} + page2_ids = {entity.id for entity in page2.entities if entity.id} + # Pages should generally have different entities (unless very limited data) + if page1_ids and page2_ids: + # Allow some overlap in case of limited test data + intersection_len = len(page1_ids.intersection(page2_ids)) + max_len = max(len(page1_ids), len(page2_ids)) + assert intersection_len < max_len + +def test_entity_lookup_invalid_id(): + """Test entity_lookup with various ID formats""" + test_ids = [ + "invalid_id", + "", + "nmdc:nonexistent", + "fake:test:id", + ] + + for entity_id in test_ids: + result = entity_lookup(entity_id) + + # Should return None for invalid/nonexistent IDs + assert result is None or isinstance(result, Entity) + +def test_all_tools_return_types(): + """Test that all tools return expected types""" + # Test basic calls to ensure proper return types + + # Health check should always return dict + result = health_check() + assert isinstance(result, dict) + + # Geosearch should return QueryResponse + result = geosearch(0.0, 0.0) + assert isinstance(result, QueryResponse) + + # Bbox search should return QueryResponse + result = bbox_search(0.0, 0.0, 1.0, 1.0) + assert isinstance(result, QueryResponse) + + # Search by source should return QueryResponse + result = search_by_source("NMDC", limit=1) + assert isinstance(result, QueryResponse) + + # Search by type should return QueryResponse + result = search_by_type("sample", limit=1) + assert isinstance(result, QueryResponse) + + # Search by name should return QueryResponse + result = search_by_name("test", limit=1) + assert isinstance(result, QueryResponse) + + # Advanced query should return QueryResponse + result = advanced_query(filter_dict={"entity_type": "sample"}, limit=1) + assert isinstance(result, QueryResponse) + + # Entity lookup with invalid ID should return None + result = entity_lookup("test_id") + assert result is None diff --git a/tests/test_constants_and_version.py b/tests/test_constants_and_version.py new file mode 100644 index 0000000..c22d439 --- /dev/null +++ b/tests/test_constants_and_version.py @@ -0,0 +1,153 @@ +""" +Test constants, version handling, and module-level functionality. +""" + +from src.bertron_mcp.main import DEFAULT_LIMIT, MAX_LIMIT, MAX_SKIP, __version__ + + +def test_constants_defined(): + """Test that all limit constants are properly defined""" + # Check constants exist and have reasonable values + assert isinstance(DEFAULT_LIMIT, int) + assert isinstance(MAX_LIMIT, int) + assert isinstance(MAX_SKIP, int) + + # Check values are reasonable + assert DEFAULT_LIMIT > 0 + assert MAX_LIMIT > DEFAULT_LIMIT + assert MAX_SKIP > MAX_LIMIT + + # Check specific expected values + assert DEFAULT_LIMIT == 100 + assert MAX_LIMIT == 1000 + assert MAX_SKIP == 50000 + + +def test_version_handling(): + """Test that version is handled properly""" + # Version should be a string + assert isinstance(__version__, str) + + # Version should not be empty + assert len(__version__) > 0 + + # Version should be 'unknown' or a valid version string + assert __version__ == "unknown" or "." in __version__ + + +def test_module_imports(): + """Test that all important module components can be imported""" + # Test main imports + from src.bertron_mcp.main import ( + BERTRON_API_URL, + advanced_query, + bbox_search, + entity_lookup, + geosearch, + health_check, + main, + mcp, + search_by_name, + search_by_source, + search_by_type, + ) + + # Check that API URL is defined + assert isinstance(BERTRON_API_URL, str) + assert len(BERTRON_API_URL) > 0 + assert BERTRON_API_URL.startswith("https://") + + # Check that functions are callable + assert callable(health_check) + assert callable(geosearch) + assert callable(bbox_search) + assert callable(entity_lookup) + assert callable(advanced_query) + assert callable(search_by_source) + assert callable(search_by_type) + assert callable(search_by_name) + assert callable(main) + + # Check that mcp instance exists + assert mcp is not None + assert hasattr(mcp, "run") + + +def test_logging_setup(): + """Test that logging is set up properly""" + import logging + + # Get the module logger + logger = logging.getLogger("bertron_mcp.main") + + # Logger should exist + assert logger is not None + + # Logger should have reasonable default level + # (May be changed by other tests, so we just check it exists) + assert hasattr(logger, "level") + + +def test_constants_consistency(): + """Test that constants are used consistently in function signatures""" + import inspect + + from src.bertron_mcp.main import ( + advanced_query, + search_by_name, + search_by_source, + search_by_type, + ) + + # Check function signatures use DEFAULT_LIMIT + sig = inspect.signature(search_by_source) + assert sig.parameters['limit'].default == DEFAULT_LIMIT + + sig = inspect.signature(search_by_type) + assert sig.parameters['limit'].default == DEFAULT_LIMIT + + sig = inspect.signature(search_by_name) + assert sig.parameters['limit'].default == DEFAULT_LIMIT + + sig = inspect.signature(advanced_query) + assert sig.parameters['limit'].default == DEFAULT_LIMIT + + +def test_ssl_warnings_disabled(): + """Test that SSL warnings are properly disabled""" + import urllib3 + + # Check that urllib3 is available (should be imported in main) + assert hasattr(urllib3, "disable_warnings") + + # This test mainly ensures the import works + # SSL warning disabling is tested by the fact that other tests don't show warnings + + +def test_constants_mathematical_relationships(): + """Test that constants have proper mathematical relationships""" + # DEFAULT_LIMIT should be a reasonable fraction of MAX_LIMIT + assert DEFAULT_LIMIT <= MAX_LIMIT / 2 # At least half the max + + # MAX_SKIP should be significantly larger than MAX_LIMIT for pagination + assert MAX_SKIP >= MAX_LIMIT * 10 # At least 10x the max limit + + # All should be round numbers for user-friendliness + assert DEFAULT_LIMIT % 10 == 0 # Round number + assert MAX_LIMIT % 100 == 0 # Round number + assert MAX_SKIP % 1000 == 0 # Round number + + +def test_api_url_configuration(): + """Test API URL configuration""" + from src.bertron_mcp.main import BERTRON_API_URL + + # Should be a proper HTTPS URL + assert BERTRON_API_URL.startswith("https://") + assert BERTRON_API_URL.endswith("/") + + # Should contain expected domain + assert "bertron" in BERTRON_API_URL.lower() + + # Should be a reasonable length + assert 30 < len(BERTRON_API_URL) < 200 diff --git a/tests/test_error_handling.py b/tests/test_error_handling.py new file mode 100644 index 0000000..b3d66ff --- /dev/null +++ b/tests/test_error_handling.py @@ -0,0 +1,234 @@ +""" +Test error handling and edge cases for bertron-mcp. +""" + + +from bertron_client import BertronAPIError, QueryResponse +from schema.datamodel.bertron_schema_pydantic import Entity + +from src.bertron_mcp.main import ( + MAX_LIMIT, + MAX_SKIP, + advanced_query, + bbox_search, + entity_lookup, + geosearch, + health_check, + search_by_name, + search_by_source, + search_by_type, +) + + +def test_health_check_api_error(): + """Test health_check handles API errors gracefully""" + # Test that function handles API errors without crashing + result = health_check() + + # Should return dict or None, never crash + assert result is None or isinstance(result, dict) + + +def test_geosearch_invalid_coordinates(): + """Test geosearch with invalid coordinate values""" + # Test extreme coordinates + test_cases = [ + (91.0, 0.0), # Invalid latitude > 90 + (-91.0, 0.0), # Invalid latitude < -90 + (0.0, 181.0), # Invalid longitude > 180 + (0.0, -181.0), # Invalid longitude < -180 + (float('inf'), 0.0), # Infinite latitude + (0.0, float('nan')), # NaN longitude + ] + + for lat, lon in test_cases: + try: + result = geosearch(lat, lon, 1.0) + # Should return None or QueryResponse, not crash + assert result is None or isinstance(result, QueryResponse) + except Exception as e: + # Some validation errors are acceptable + assert isinstance(e, (ValueError, TypeError, BertronAPIError)) + + +def test_geosearch_negative_radius(): + """Test geosearch with negative search radius""" + try: + result = geosearch(0.0, 0.0, -1.0) + # Should handle gracefully + assert result is None or isinstance(result, QueryResponse) + except Exception as e: + # Validation errors are acceptable + assert isinstance(e, (ValueError, BertronAPIError)) + + +def test_geosearch_zero_radius(): + """Test geosearch with zero search radius""" + result = geosearch(0.0, 0.0, 0.0) + # Should handle gracefully + assert result is None or isinstance(result, QueryResponse) + + +def test_bbox_search_invalid_bbox(): + """Test bbox_search with invalid bounding box coordinates""" + # Southwest corner should be southwest of northeast corner + invalid_cases = [ + # Southwest lat > Northeast lat + (30.0, -80.0, 20.0, -70.0), + # Southwest lng > Northeast lng (non-crossing case) + (20.0, -70.0, 30.0, -80.0), + # Invalid coordinate ranges + (91.0, 0.0, 92.0, 1.0), # Invalid latitudes + (0.0, 181.0, 1.0, 182.0), # Invalid longitudes + ] + + for sw_lat, sw_lng, ne_lat, ne_lng in invalid_cases: + try: + result = bbox_search(sw_lat, sw_lng, ne_lat, ne_lng) + # Should return None or handle gracefully + assert result is None or isinstance(result, QueryResponse) + except Exception as e: + # Validation errors are acceptable + assert isinstance(e, (ValueError, BertronAPIError)) + + +def test_entity_lookup_empty_id(): + """Test entity_lookup with empty or invalid entity IDs""" + invalid_ids = ["", " ", None, "invalid", "nmdc:", ":invalid"] + + for entity_id in invalid_ids: + try: + if entity_id is None: + continue # Skip None test as it would cause TypeError + result = entity_lookup(entity_id) + # Should return None for invalid IDs + assert result is None or isinstance(result, Entity) + except Exception as e: + # API errors are acceptable + assert isinstance(e, (BertronAPIError, TypeError)) + + +def test_search_by_source_invalid_source(): + """Test search_by_source with invalid data sources""" + invalid_sources = ["INVALID", "", " ", "invalid_source", "123"] + + for source in invalid_sources: + result = search_by_source(source) + # Should return QueryResponse with no results for invalid sources + assert isinstance(result, QueryResponse) + assert result.count == 0 # Invalid sources should return empty results + + +def test_search_by_type_invalid_type(): + """Test search_by_type with invalid entity types""" + invalid_types = ["invalid_type", "", " ", "123"] + + for entity_type in invalid_types: + result = search_by_type(entity_type) + # Should return QueryResponse with no results for invalid types + assert isinstance(result, QueryResponse) + assert result.count == 0 # Invalid types should return empty results + + +def test_search_by_name_empty_pattern(): + """Test search_by_name with empty or invalid patterns""" + invalid_patterns = ["", " "] + + for pattern in invalid_patterns: + result = search_by_name(pattern) + # Should return QueryResponse, possibly empty + assert isinstance(result, QueryResponse) + assert result.count >= 0 # Empty patterns might return no results + + +def test_search_by_name_invalid_regex(): + """Test search_by_name with invalid regex patterns""" + import re + + invalid_regex_patterns = [ + "[", # Unclosed bracket + "(?P<", # Invalid group + "*", # Invalid quantifier + "(?", # Incomplete group + ] + + for pattern in invalid_regex_patterns: + # These should either handle gracefully or raise specific regex errors + try: + result = search_by_name(pattern) + # If it doesn't raise, should return QueryResponse + assert isinstance(result, QueryResponse) + except (re.error, BertronAPIError): + # Specific regex or API errors are acceptable + pass + + +def test_advanced_query_excessive_skip(): + """Test advanced_query with skip values above maximum""" + # Test with skip above MAX_SKIP + result = advanced_query( + filter_dict={"entity_type": "sample"}, + skip=MAX_SKIP + 1 + ) + + # Should return None for excessive skip + assert result is None + + +def test_advanced_query_no_filter_safety(): + """Test advanced_query applies safety filter when no filter provided""" + result = advanced_query(skip=0, limit=5) + + # Should handle safely with automatic filter + assert result is None or isinstance(result, QueryResponse) + + if isinstance(result, QueryResponse) and result.metadata: + # Should report filter constraint + if "constraints_applied" in result.metadata: + assert "filter" in result.metadata["constraints_applied"] + + +def test_advanced_query_invalid_filter(): + """Test advanced_query with invalid filter dictionaries""" + invalid_filters = [ + {"$invalid": "operator"}, + {"field": {"$badop": "value"}}, + "", # String instead of dict + [], # List instead of dict + ] + + for invalid_filter in invalid_filters: + try: + if isinstance(invalid_filter, dict): + result = advanced_query(filter_dict=invalid_filter, limit=5) + # Should handle invalid filters gracefully + assert result is None or isinstance(result, QueryResponse) + else: + # Non-dict filters should cause type errors + continue + except Exception as e: + # API or validation errors are acceptable + assert isinstance(e, (BertronAPIError, TypeError, ValueError)) + + +def test_limit_enforcement_edge_cases(): + """Test limit enforcement with edge case values""" + edge_cases = [0, -1, -100, MAX_LIMIT + 1, MAX_LIMIT * 10] + + for limit in edge_cases: + try: + result = search_by_source("NMDC", limit=limit) + + if isinstance(result, QueryResponse): + # Limits should be enforced + assert len(result.entities) <= MAX_LIMIT + + # Negative limits should be handled gracefully + if limit <= 0: + assert len(result.entities) == 0 or result.count >= 0 + + except Exception as e: + # API or validation errors for negative limits are acceptable + assert isinstance(e, (BertronAPIError, ValueError)) + + diff --git a/tests/test_integration.py b/tests/test_integration.py new file mode 100644 index 0000000..16efc28 --- /dev/null +++ b/tests/test_integration.py @@ -0,0 +1,281 @@ +""" +Integration tests for bertron-mcp with real API calls. +""" + +from bertron_client import QueryResponse +from schema.datamodel.bertron_schema_pydantic import Entity + +from src.bertron_mcp.main import ( + MAX_LIMIT, + advanced_query, + bbox_search, + entity_lookup, + geosearch, + health_check, + search_by_name, + search_by_source, + search_by_type, +) + + +def test_full_workflow_geospatial_search(): + """Test a complete workflow: health check -> geosearch -> entity lookup""" + # Step 1: Verify API is healthy + health = health_check() + if health is None: + return # Skip if API is not available + + # Step 2: Search for entities near a known location (Orlando, FL) + search_result = geosearch(28.5383, -81.3792, 50.0) + + if search_result is not None and isinstance(search_result, QueryResponse): + # Verify basic properties + assert search_result.count >= 0 + assert len(search_result.entities) == search_result.count + + # Step 3: If we found entities, look up the first one + if search_result.entities: + first_entity = search_result.entities[0] + if hasattr(first_entity, 'id') and first_entity.id: + detailed_entity = entity_lookup(first_entity.id) + + if detailed_entity is not None: + assert isinstance(detailed_entity, Entity) + assert detailed_entity.id == first_entity.id + + +def test_search_by_source_workflow(): + """Test searching by data source and validating results""" + # Test with a known data source + result = search_by_source("NMDC", limit=10) + + if result is not None and isinstance(result, QueryResponse): + assert result.count >= 0 + assert len(result.entities) <= 10 + + # All entities should be from NMDC source + for entity in result.entities: + if hasattr(entity, 'ber_data_source'): + assert entity.ber_data_source == "NMDC" + + +def test_search_by_type_workflow(): + """Test searching by entity type and validating results""" + # Test with sample entity type + result = search_by_type("sample", limit=5) + + if result is not None and isinstance(result, QueryResponse): + assert result.count >= 0 + assert len(result.entities) <= 5 + + # All entities should be samples + for entity in result.entities: + if hasattr(entity, 'entity_type'): + assert "sample" in entity.entity_type + + +def test_advanced_query_with_filters(): + """Test advanced query with realistic filters""" + # Test with entity type filter + result = advanced_query( + filter_dict={"entity_type": "sample"}, + limit=5 + ) + + if result is not None and isinstance(result, QueryResponse): + assert result.count >= 0 + assert len(result.entities) <= 5 + + # Verify all entities match the filter + for entity in result.entities: + if hasattr(entity, 'entity_type'): + assert "sample" in entity.entity_type + + +def test_bounding_box_search_realistic(): + """Test bounding box search with realistic coordinates""" + # Search around Florida + result = bbox_search( + southwest_lat=24.0, + southwest_lng=-85.0, + northeast_lat=31.0, + northeast_lng=-80.0 + ) + + if result is not None and isinstance(result, QueryResponse): + assert result.count >= 0 + assert len(result.entities) == result.count + + # Verify metadata + if result.metadata and "bounding_box" in result.metadata: + bbox = result.metadata["bounding_box"] + assert bbox["southwest"]["latitude"] == 24.0 + assert bbox["southwest"]["longitude"] == -85.0 + assert bbox["northeast"]["latitude"] == 31.0 + assert bbox["northeast"]["longitude"] == -80.0 + + +def test_name_search_realistic(): + """Test name search with realistic patterns""" + # Search for water-related samples + result = search_by_name(".*water.*", case_sensitive=False, limit=5) + + assert isinstance(result, QueryResponse) + assert result.count >= 0 + assert len(result.entities) <= 5 + + # If we found results, verify they contain "water" in name + for entity in result.entities: + if hasattr(entity, 'name') and entity.name: + assert "water" in entity.name.lower() + + +def test_limit_constraint_enforcement(): + """Test that limits are actually enforced""" + # Test with limit above maximum + result = search_by_source("NMDC", limit=MAX_LIMIT + 100) + + if result is not None and isinstance(result, QueryResponse): + # Should be constrained to MAX_LIMIT + assert len(result.entities) <= MAX_LIMIT + + # Should report constraints in metadata + if result.metadata and "constraints_applied" in result.metadata: + constraints = result.metadata["constraints_applied"] + assert constraints["requested_limit"] == MAX_LIMIT + 100 + assert constraints["actual_limit"] == MAX_LIMIT + + +def test_entity_data_quality(): + """Test that returned entities have expected data quality""" + result = geosearch(28.5383, -81.3792, 100.0) + + if result is not None and isinstance(result, QueryResponse) and result.entities: + for entity in result.entities[:3]: # Check first 3 entities + # Should have basic required fields + assert hasattr(entity, 'id') + assert hasattr(entity, 'name') + assert hasattr(entity, 'entity_type') + + # ID should be meaningful + if entity.id: + assert len(entity.id) > 0 + assert not entity.id.isspace() + + # Name should be meaningful if present + if entity.name: + assert len(entity.name) > 0 + assert not entity.name.isspace() + + # Should have coordinates if it's a geospatial result + if hasattr(entity, 'coordinates') and entity.coordinates: + assert hasattr(entity.coordinates, 'latitude') + assert hasattr(entity.coordinates, 'longitude') + assert -90 <= entity.coordinates.latitude <= 90 + assert -180 <= entity.coordinates.longitude <= 180 + + +def test_error_recovery(): + """Test that functions handle edge cases appropriately""" + # Test with edge case inputs + + # Very small radius should work and return valid QueryResponse + result = geosearch(0.0, 0.0, 0.1) + assert isinstance(result, QueryResponse) + assert result.count >= 0 # Empty results are fine + + # Invalid ID should return None (documented behavior) + result = entity_lookup("invalid_id") + assert result is None + + # Invalid source should return QueryResponse with no results + result = search_by_source("INVALID_SOURCE") + assert isinstance(result, QueryResponse) + assert result.count == 0 # Should be empty but not None + + # Invalid type should return QueryResponse with no results + result = search_by_type("invalid_type") + assert isinstance(result, QueryResponse) + assert result.count == 0 # Should be empty but not None + + # Nonexistent field should return QueryResponse (API should handle gracefully) + result = advanced_query(filter_dict={"nonexistent_field": "value"}) + assert isinstance(result, QueryResponse) + assert result.count >= 0 # Empty results are acceptable + + +def test_pagination_workflow(): + """Test pagination with skip and limit""" + # Get first page + page1 = advanced_query( + filter_dict={"entity_type": "sample"}, + skip=0, + limit=5 + ) + + # Get second page + page2 = advanced_query( + filter_dict={"entity_type": "sample"}, + skip=5, + limit=5 + ) + + if (page1 is not None and isinstance(page1, QueryResponse) and + page2 is not None and isinstance(page2, QueryResponse)): + + # Both pages should have valid results + assert page1.count >= 0 + assert page2.count >= 0 + + # If both have entities, they should generally be different + if page1.entities and page2.entities: + page1_ids = {e.id for e in page1.entities if hasattr(e, 'id') and e.id} + page2_ids = {e.id for e in page2.entities if hasattr(e, 'id') and e.id} + + # Should be mostly different entities (allow some overlap for limited data) + if page1_ids and page2_ids: + overlap = len(page1_ids.intersection(page2_ids)) + total_unique = len(page1_ids.union(page2_ids)) + if total_unique > 0: + overlap_ratio = overlap / total_unique + assert overlap_ratio < 0.8 # Less than 80% overlap + + +def test_comprehensive_data_sources(): + """Test all known data sources return valid results""" + sources = ["EMSL", "ESS-DIVE", "JGI", "NMDC", "MONET"] + + for source in sources: + result = search_by_source(source, limit=3) + + # Each source should return valid result or None + assert result is None or isinstance(result, QueryResponse) + + if result is not None: + assert result.count >= 0 + assert len(result.entities) <= 3 + + # All entities should be from the correct source + for entity in result.entities: + if hasattr(entity, 'ber_data_source'): + assert entity.ber_data_source == source + + +def test_comprehensive_entity_types(): + """Test all known entity types return valid results""" + types = ["biodata", "sample", "sequence", "taxon", "jgi_biosample"] + + for entity_type in types: + result = search_by_type(entity_type, limit=3) + + # Each type should return valid result or None + assert result is None or isinstance(result, QueryResponse) + + if result is not None: + assert result.count >= 0 + assert len(result.entities) <= 3 + + # All entities should be of the correct type + for entity in result.entities: + if hasattr(entity, 'entity_type'): + assert entity_type in entity.entity_type diff --git a/tests/test_mcp_protocol.py b/tests/test_mcp_protocol.py index 3bf09c3..cb80a95 100644 --- a/tests/test_mcp_protocol.py +++ b/tests/test_mcp_protocol.py @@ -4,16 +4,146 @@ Tests verify MCP server implements protocol and responds to requests. """ -def test_mcp_tool_registration(): - """Test that MCP tools are properly registered.""" - from src.bertron_mcp.main import mcp +import logging - # Verify that the MCP instance is properly initialized +from fastmcp import FastMCP + +from src.bertron_mcp.main import DEFAULT_LIMIT, MAX_LIMIT, MAX_SKIP, mcp + + +def test_mcp_instance_creation(): + """Test that MCP instance is properly created""" assert mcp is not None + assert isinstance(mcp, FastMCP) assert mcp.name == "bertron_mcp" - # Import the functions to verify they exist + +def test_mcp_has_expected_methods(): + """Test that MCP instance has expected methods""" + # Verify FastMCP instance methods exist + assert hasattr(mcp, 'run') + assert hasattr(mcp, 'get_tools') + + # Verify the instance is properly configured + assert callable(mcp.run) + assert callable(mcp.get_tools) + + +def test_mcp_constants_consistency(): + """Test that MCP tools use consistent constants""" + # Verify constants are properly defined + assert isinstance(DEFAULT_LIMIT, int) + assert isinstance(MAX_LIMIT, int) + assert isinstance(MAX_SKIP, int) + + # Verify relationships + assert DEFAULT_LIMIT > 0 + assert MAX_LIMIT > DEFAULT_LIMIT + assert MAX_SKIP > MAX_LIMIT + + +def test_tool_execution_basic(): + """Test basic tool execution by calling functions directly""" + # Test health_check directly + from src.bertron_mcp.main import health_check + + result = health_check() + + # Should return dict with health status + assert isinstance(result, dict) + assert "web_server" in result + assert "database" in result + + +def test_geosearch_function_call(): + """Test geosearch function execution""" + from bertron_client import QueryResponse + from src.bertron_mcp.main import geosearch - # Verify functions are callable - assert callable(geosearch) + # Test with basic coordinates + result = geosearch(0.0, 0.0, 1.0) + + # Should return QueryResponse + assert isinstance(result, QueryResponse) + assert hasattr(result, 'entities') + assert hasattr(result, 'count') + assert hasattr(result, 'query_type') + + +def test_entity_lookup_function_call(): + """Test entity_lookup function execution""" + from schema.datamodel.bertron_schema_pydantic import Entity + + from src.bertron_mcp.main import entity_lookup + + # Test with invalid ID (should return None gracefully) + result = entity_lookup("invalid_test_id") + + # Should return Entity or None + assert result is None or isinstance(result, Entity) + + +def test_logging_configuration(): + """Test that logging is properly configured for MCP operations""" + # Get the bertron_mcp logger + logger = logging.getLogger("bertron_mcp.main") + assert logger is not None + + # Logger should be properly configured + assert hasattr(logger, 'level') + assert hasattr(logger, 'handlers') + + +def test_constraint_reporting_integration(): + """Test that constraint reporting works with function calls""" + from bertron_client import QueryResponse + + from src.bertron_mcp.main import search_by_source + + # Test with limit that should trigger constraint reporting + result = search_by_source("NMDC", limit=5000) # Above MAX_LIMIT + + # Should always return QueryResponse + assert isinstance(result, QueryResponse) + + # Should have constraint reporting in metadata + assert result.metadata is not None + assert isinstance(result.metadata, dict) + + # Constraints should be applied and reported + assert "constraints_applied" in result.metadata + constraints = result.metadata["constraints_applied"] + assert "requested_limit" in constraints + assert "actual_limit" in constraints + assert constraints["requested_limit"] == 5000 + assert constraints["actual_limit"] == MAX_LIMIT + + +def test_function_imports(): + """Test that all MCP tool functions can be imported and are callable""" + from src.bertron_mcp.main import ( + advanced_query, + bbox_search, + entity_lookup, + geosearch, + health_check, + search_by_name, + search_by_source, + search_by_type, + ) + + # All functions should be callable + functions = [ + health_check, + geosearch, + bbox_search, + entity_lookup, + advanced_query, + search_by_source, + search_by_type, + search_by_name, + ] + + for func in functions: + assert callable(func), f"Function {func.__name__} is not callable"