diff --git a/cloud_pipelines_backend/api_router.py b/cloud_pipelines_backend/api_router.py index 6652637..8822659 100644 --- a/cloud_pipelines_backend/api_router.py +++ b/cloud_pipelines_backend/api_router.py @@ -269,6 +269,97 @@ def stream_container_log( router.get("/api/pipeline_runs/", tags=["pipelineRuns"], **default_config)( inject_session_dependency(list_pipeline_runs_func) ) + router.post( + "/api/pipeline_runs/search/", + tags=["pipelineRuns"], + summary="Search Pipeline Runs", + description="""Search pipeline runs with annotation filters. + +## What is an Annotation? + +An annotation is a **key-value pair** attached to a pipeline run. +For example the following annotations (i.e. key = value): +- `environment` = `production` +- `team` = `backend` +- `priority` = `high` + +## Filter Types + +### KeyFilter - Search by annotation key +| Operator | Description | +|----------|-------------| +| `exists` | Key exists (any value) | +| `equals` | Key exactly matches string | +| `contains` | Key contains substring | +| `in_set` | Key matches one of multiple values | + +### ValueFilter - Search by annotation value (across ALL annotations) +| Operator | Description | +|----------|-------------| +| `equals` | Value exactly matches string | +| `contains` | Value contains substring | +| `in_set` | Value matches one of multiple values | + +### FilterGroup - Combine filters with logic +| Operator | Description | +|----------|-------------| +| `and` | ALL filters must match | +| `or` | ANY filter must match | + +All filters support `negate: true` to invert the condition (e.g., NOT equals). + +--- + +## Examples + +### 1. Key equals a string +Find runs where annotation key equals "environment": +```json +{ + "annotation_filters": { + "filters": [ + {"operator": "equals", "key": "environment"} + ] + } +} +``` + +### 2. Key contains substring AND value in set +Find runs where key contains "env" AND value is "prod" or "staging": +```json +{ + "annotation_filters": { + "filters": [ + {"operator": "contains", "key": "env"}, + {"operator": "in_set", "values": ["prod", "staging"]} + ], + "operator": "and" + } +} +``` + +### 3. Complex: (key contains OR value contains) AND key NOT contains +Find runs where (key contains "env" OR any value contains "prod") AND key NOT contains "deprecated": +```json +{ + "annotation_filters": { + "filters": [ + { + "filters": [ + {"operator": "contains", "key": "env"}, + {"operator": "contains", "value": "prod"} + ], + "operator": "or" + }, + {"operator": "contains", "value": "deprecated", "negate": true} + ], + "operator": "and" + } +} +``` +""", + **default_config, + )(inject_session_dependency(pipeline_run_service.search)) router.get("/api/pipeline_runs/{id}", tags=["pipelineRuns"], **default_config)( inject_session_dependency(pipeline_run_service.get) ) diff --git a/cloud_pipelines_backend/api_server_sql.py b/cloud_pipelines_backend/api_server_sql.py index e8e0624..210e6db 100644 --- a/cloud_pipelines_backend/api_server_sql.py +++ b/cloud_pipelines_backend/api_server_sql.py @@ -1,10 +1,11 @@ import base64 import dataclasses import datetime +import enum import json import logging import typing -from typing import Any, Optional +from typing import Any, Final, Optional, Union if typing.TYPE_CHECKING: from cloud_pipelines.orchestration.storage_providers import ( @@ -32,6 +33,104 @@ def _get_current_time() -> datetime.datetime: from .errors import ItemNotFoundError +# ==== Annotation Filter Types for Search ==== + + +class GroupOperator(enum.StrEnum): + """Logical operators for combining filters in a group.""" + + AND = "and" + OR = "or" + + +class KeyFilterOperator(enum.StrEnum): + """Operators for filtering by annotation key.""" + + CONTAINS = "contains" # Key contains substring + EQUALS = "equals" # Key equals exact string + EXISTS = "exists" # Key exists (regardless of value) + IN_SET = "in_set" # Key is in a set of values + +class ValueFilterOperator(enum.StrEnum): + """Operators for filtering by annotation value.""" + + CONTAINS = "contains" # Value contains substring + EQUALS = "equals" # Value equals exact string + IN_SET = "in_set" # Value is in a set of values + +@dataclasses.dataclass(kw_only=True) +class KeyFilter: + """Filter annotations by key patterns. + + Examples: + - KeyFilter(operator=KeyFilterOperator.EXISTS, key="environment") + → Find runs that have an "environment" annotation + - KeyFilter(operator=KeyFilterOperator.CONTAINS, key="env", negate=True) + → Find runs that do NOT have any key containing "env" + - KeyFilter(operator=KeyFilterOperator.IN_SET, keys=["env", "team"]) + → Find runs that have a key matching "env" or "team" + """ + + operator: KeyFilterOperator + key: str | None = None # For EXISTS, EQUALS, CONTAINS operators + keys: list[str] | None = None # For IN_SET operator + negate: bool = False # If True, negates the operation (NOT EXISTS, NOT IN, etc.) + + +@dataclasses.dataclass(kw_only=True) +class ValueFilter: + """Filter annotations by value patterns across ALL annotation values. + + Examples: + - ValueFilter(operator=ValueFilterOperator.EQUALS, value="production") + → Find runs where ANY annotation has value "production" + - ValueFilter(operator=ValueFilterOperator.CONTAINS, value="error") + → Find runs where ANY annotation value contains "error" + - ValueFilter(operator=ValueFilterOperator.IN_SET, values=["high", "critical"]) + → Find runs where ANY annotation value is "high" or "critical" + """ + + operator: ValueFilterOperator + value: str | None = None # For EQUALS, CONTAINS operators + values: list[str] | None = None # For IN_SET operator + negate: bool = False # If True, negates the operation + + +# Type alias using Union to support forward reference +AnnotationFilterType = Union[KeyFilter, ValueFilter, "FilterGroup"] + + +@dataclasses.dataclass(kw_only=True) +class FilterGroup: + """A group of filters combined with AND/OR logic. + + Examples: + - FilterGroup(operator=GroupOperator.AND, filters=[...]) + → All filters must match + - FilterGroup(operator=GroupOperator.OR, filters=[...]) + → At least one filter must match + - FilterGroup(filters=[...]) + → Defaults to AND logic when operator is not specified + """ + + # Union type for filter types that can appear in a group (recursive) + filters: list[AnnotationFilterType] + # Operator defaults to None, which is treated as AND logic + # TODO: add documetation why this is more user friendly + operator: GroupOperator | None = None + + +@dataclasses.dataclass(kw_only=True) +class SearchFilters: + """Top-level search filters for pipeline runs. + + The annotation_filters field accepts a FilterGroup that can contain + nested groups and individual filters for complex query logic. + """ + + annotation_filters: FilterGroup | None = None + + # ==== PipelineJobService @dataclasses.dataclass(kw_only=True) class PipelineRunResponse: @@ -63,15 +162,310 @@ class GetPipelineRunResponse(PipelineRunResponse): class ListPipelineJobsResponse: pipeline_runs: list[PipelineRunResponse] next_page_token: str | None = None + debug_where_clause: str | None = None # Populated when debug_where_clause=True in search() import sqlalchemy as sql from sqlalchemy import orm +# Pagination constants +OFFSET_KEY: Final[str] = "offset" +PAGE_SIZE: Final[int] = 10 + + +def _compile_where_clauses_to_string( + session: orm.Session, + where_clauses: list[sql.ColumnElement[bool]], +) -> str: + """Compile WHERE clauses to a SQL string for debugging. + + Uses the dialect from the session's engine (SQLite, MySQL, etc.) + and inlines literal values for readability. + """ + if not where_clauses: + return "(no where clauses)" + + # Combine all clauses with AND + combined = sql.and_(*where_clauses) if len(where_clauses) > 1 else where_clauses[0] + + # Get dialect from session's engine + dialect = session.bind.dialect if session.bind else None + + try: + compiled = combined.compile( + dialect=dialect, + compile_kwargs={"literal_binds": True}, + ) + return str(compiled) + except Exception as e: + # Fallback if literal_binds fails (e.g., for complex types) + try: + compiled = combined.compile(dialect=dialect) + return f"{compiled} [params: {compiled.params}]" + except Exception: + return f"(failed to compile: {e})" + + class PipelineRunsApiService_Sql: PIPELINE_NAME_EXTRA_DATA_KEY = "pipeline_name" + def _query_pipeline_runs( + self, + *, + session: orm.Session, + where_clauses: list, + offset: int, + page_size: int, + ) -> list[bts.PipelineRun]: + """Query pipeline runs with pagination and filtering.""" + return list( + session.scalars( + sql.select(bts.PipelineRun) + .where(*where_clauses) + .order_by(bts.PipelineRun.created_at.desc()) + .offset(offset) + .limit(page_size) + ).all() + ) + + def _calculate_next_page_offset( + self, + *, + offset: int, + page_size: int, + ) -> int: + """Calculate the offset for the next page.""" + return offset + page_size + + def _get_next_page_token( + self, + *, + num_results: int, + page_size: int, + next_page_token_dict: dict[str, Any], + ) -> str | None: + """Get the next page token, or None if this is the last page.""" + if num_results < page_size: + return None + return _encode_page_token(next_page_token_dict) + + def _create_pipeline_run_response( + self, + session: orm.Session, + pipeline_run: bts.PipelineRun, + include_pipeline_names: bool = False, + include_execution_stats: bool = False, + ) -> PipelineRunResponse: + """Create a PipelineRunResponse with optional enrichment.""" + response = PipelineRunResponse.from_db(pipeline_run) + if include_pipeline_names: + pipeline_name = None + extra_data = pipeline_run.extra_data or {} + if self.PIPELINE_NAME_EXTRA_DATA_KEY in extra_data: + pipeline_name = extra_data[self.PIPELINE_NAME_EXTRA_DATA_KEY] + else: + execution_node = session.get( + bts.ExecutionNode, pipeline_run.root_execution_id + ) + if execution_node: + task_spec = structures.TaskSpec.from_json_dict( + execution_node.task_spec + ) + component_spec = task_spec.component_ref.spec + if component_spec: + pipeline_name = component_spec.name + response.pipeline_name = pipeline_name + if include_execution_stats: + execution_status_stats = self._calculate_execution_status_stats( + session=session, root_execution_id=pipeline_run.root_execution_id + ) + response.execution_status_stats = { + status.value: count + for status, count in execution_status_stats.items() + } + return response + + def _build_list_response( + self, + *, + session: orm.Session, + pipeline_runs: list[bts.PipelineRun], + next_page_token: str | None, + include_pipeline_names: bool = False, + include_execution_stats: bool = False, + ) -> ListPipelineJobsResponse: + """Build the ListPipelineJobsResponse from pipeline runs.""" + return ListPipelineJobsResponse( + pipeline_runs=[ + self._create_pipeline_run_response( + session=session, + pipeline_run=pipeline_run, + include_pipeline_names=include_pipeline_names, + include_execution_stats=include_execution_stats, + ) + for pipeline_run in pipeline_runs + ], + next_page_token=next_page_token, + ) + + def _build_annotation_filter_clause( + self, + *, + filter: AnnotationFilterType, + ) -> sql.ColumnElement[bool]: + """Build a SQLAlchemy clause from a single annotation filter.""" + if isinstance(filter, KeyFilter): + return self._build_key_filter_clause(filter=filter) + elif isinstance(filter, ValueFilter): + return self._build_value_filter_clause(filter=filter) + elif isinstance(filter, FilterGroup): + return self._build_filter_group_clause(group=filter) + else: + raise ValueError(f"Unknown filter type: {type(filter)}") + + def _build_key_filter_clause( + self, + *, + filter: KeyFilter, + ) -> sql.ColumnElement[bool]: + """Build a SQLAlchemy clause for a KeyFilter. + + KeyFilter checks for the existence or pattern of annotation keys. + + Design Decision - EXISTS vs JOIN: + We use EXISTS subqueries instead of JOINs because: + 1. Cleaner for complex boolean logic (AND/OR/NOT groups) + 2. Automatically handles duplicates (no need for DISTINCT) + 3. Easier to negate (~clause for NOT EXISTS) + + The query optimizer will use the (key, value) index on + PipelineRunAnnotation for efficient lookups regardless of + whether EXISTS or JOIN is used. + """ + # Base subquery to check annotation keys + subquery = sql.select(bts.PipelineRunAnnotation).where( + bts.PipelineRunAnnotation.pipeline_run_id == bts.PipelineRun.id + ) + + if filter.operator == KeyFilterOperator.EXISTS: + # Key exists (regardless of value) + subquery = subquery.where(bts.PipelineRunAnnotation.key != None) + elif filter.operator == KeyFilterOperator.EQUALS: + # Key equals exact string + if not filter.key: + raise ValueError("EQUALS operator requires 'key' to be set") + subquery = subquery.where(bts.PipelineRunAnnotation.key == filter.key) + elif filter.operator == KeyFilterOperator.CONTAINS: + # Key contains substring + if not filter.key: + raise ValueError("CONTAINS operator requires 'key' to be set") + subquery = subquery.where( + bts.PipelineRunAnnotation.key.contains(filter.key) + ) + elif filter.operator == KeyFilterOperator.IN_SET: + # Key is in a set of values + if not filter.keys: + raise ValueError("IN_SET operator requires 'keys' to be set") + subquery = subquery.where(bts.PipelineRunAnnotation.key.in_(filter.keys)) + else: + raise ValueError(f"Unknown KeyFilterOperator: {filter.operator}") + + clause = subquery.exists() + + if filter.negate: + clause = ~clause + + return clause + + def _build_value_filter_clause( + self, + *, + filter: ValueFilter, + ) -> sql.ColumnElement[bool]: + """Build a SQLAlchemy clause for a ValueFilter. + + ValueFilter searches across ALL annotation values for matching patterns. + + See _build_key_filter_clause for design decision on EXISTS vs JOIN. + """ + # Base subquery - searches across all annotation values + subquery = sql.select(bts.PipelineRunAnnotation).where( + bts.PipelineRunAnnotation.pipeline_run_id == bts.PipelineRun.id, + ) + + if filter.operator == ValueFilterOperator.EQUALS: + # Value equals exact string + if filter.value is None: + raise ValueError("EQUALS operator requires 'value' to be set") + subquery = subquery.where(bts.PipelineRunAnnotation.value == filter.value) + elif filter.operator == ValueFilterOperator.CONTAINS: + # Value contains substring + if filter.value is None: + raise ValueError("CONTAINS operator requires 'value' to be set") + subquery = subquery.where( + bts.PipelineRunAnnotation.value.contains(filter.value) + ) + elif filter.operator == ValueFilterOperator.IN_SET: + # Value is in a set of values + if not filter.values: + raise ValueError("IN_SET operator requires 'values' to be set") + subquery = subquery.where( + bts.PipelineRunAnnotation.value.in_(filter.values) + ) + else: + raise ValueError(f"Unknown ValueFilterOperator: {filter.operator}") + + clause = subquery.exists() + + if filter.negate: + clause = ~clause + + return clause + + def _build_filter_group_clause( + self, + *, + group: FilterGroup, + ) -> sql.ColumnElement[bool]: + """Build a SQLAlchemy clause for a FilterGroup. + + Combines multiple filters with AND or OR logic. + """ + if not group.filters: + # Empty group matches everything + return sql.true() + + clauses = [ + self._build_annotation_filter_clause(filter=f) for f in group.filters + ] + + if group.operator == GroupOperator.OR: + return sql.or_(*clauses) + else: + # If operator is missing, defaults to AND + return sql.and_(*clauses) + + def _build_search_where_clauses( + self, + *, + filters: SearchFilters | None, + ) -> list[sql.ColumnElement[bool]]: + """Build where clauses from SearchFilters.""" + where_clauses: list[sql.ColumnElement[bool]] = [] + + if filters is None: + return where_clauses + + # Handle annotation filters + if filters.annotation_filters is not None: + annotation_clause = self._build_filter_group_clause( + group=filters.annotation_filters + ) + where_clauses.append(annotation_clause) + + return where_clauses + def create( self, session: orm.Session, @@ -156,22 +550,125 @@ def terminate( execution_node.extra_data["desired_state"] = "TERMINATED" session.commit() + def search( + self, + *, + session: orm.Session, + filters: SearchFilters | None = None, + page_token: str | None = None, + include_pipeline_names: bool = False, + include_execution_stats: bool = False, + debug_where_clause: bool = False, + ) -> ListPipelineJobsResponse: + """Search pipeline runs with advanced filtering capabilities. + + This is an enhanced version of `list()` that supports filtering by annotations. + + **Parameters:** + - **filters**: Search filters including annotation filters + - **page_token**: Pagination token from a previous search call + - **include_pipeline_names**: Whether to include pipeline names in responses + - **include_execution_stats**: Whether to include execution statistics + - **debug_where_clause**: If True, includes the compiled SQL WHERE clause in the response + + **Returns:** `ListPipelineJobsResponse` with matching pipeline runs. + + **Example 1:** Find runs that have an 'environment' annotation key + + ```python + search(session=session, filters=SearchFilters( + annotation_filters=FilterGroup( + filters=[ + KeyFilter( + operator=KeyFilterOperator.EQUALS, + key="environment" + ) + ] + ) + )) + ``` + + **Example 2:** Find runs where ANY annotation value equals 'production' + + ```python + search(session=session, filters=SearchFilters( + annotation_filters=FilterGroup( + filters=[ + ValueFilter( + operator=ValueFilterOperator.EQUALS, + value="production" + ) + ] + ) + )) + ``` + + **Example 3:** Find runs where ANY annotation value contains 'error' + + ```python + search(session=session, filters=SearchFilters( + annotation_filters=FilterGroup( + filters=[ + ValueFilter( + operator=ValueFilterOperator.CONTAINS, + value="error" + ) + ] + ) + )) + ``` + """ + page_token_dict = _decode_page_token(page_token) + offset = page_token_dict.get(OFFSET_KEY, 0) + + where_clauses = self._build_search_where_clauses(filters=filters) + + pipeline_runs = self._query_pipeline_runs( + session=session, + where_clauses=where_clauses, + offset=offset, + page_size=PAGE_SIZE, + ) + + next_page_offset = self._calculate_next_page_offset( + offset=offset, page_size=PAGE_SIZE + ) + next_page_token_dict = {OFFSET_KEY: next_page_offset} + next_page_token = self._get_next_page_token( + num_results=len(pipeline_runs), + page_size=PAGE_SIZE, + next_page_token_dict=next_page_token_dict, + ) + + response = self._build_list_response( + session=session, + pipeline_runs=pipeline_runs, + next_page_token=next_page_token, + include_pipeline_names=include_pipeline_names, + include_execution_stats=include_execution_stats, + ) + + if debug_where_clause: + response.debug_where_clause = _compile_where_clauses_to_string( + session=session, + where_clauses=where_clauses, + ) + + return response + # Note: This method must be last to not shadow the "list" type def list( self, *, session: orm.Session, page_token: str | None = None, - # page_size: int = 10, filter: str | None = None, current_user: str | None = None, include_pipeline_names: bool = False, include_execution_stats: bool = False, ) -> ListPipelineJobsResponse: page_token_dict = _decode_page_token(page_token) - OFFSET_KEY = "offset" offset = page_token_dict.get(OFFSET_KEY, 0) - page_size = 10 FILTER_KEY = "filter" if page_token: @@ -200,58 +697,30 @@ def list( where_clauses.append(bts.PipelineRun.created_by == None) else: raise NotImplementedError(f"Unsupported filter {filter}.") - pipeline_runs = list( - session.scalars( - sql.select(bts.PipelineRun) - .where(*where_clauses) - .order_by(bts.PipelineRun.created_at.desc()) - .offset(offset) - .limit(page_size) - ).all() + + pipeline_runs = self._query_pipeline_runs( + session=session, + where_clauses=where_clauses, + offset=offset, + page_size=PAGE_SIZE, + ) + + next_page_offset = self._calculate_next_page_offset( + offset=offset, page_size=PAGE_SIZE ) - next_page_offset = offset + page_size next_page_token_dict = {OFFSET_KEY: next_page_offset, FILTER_KEY: filter} - next_page_token = _encode_page_token(next_page_token_dict) - if len(pipeline_runs) < page_size: - next_page_token = None - - def create_pipeline_run_response( - pipeline_run: bts.PipelineRun, - ) -> PipelineRunResponse: - response = PipelineRunResponse.from_db(pipeline_run) - if include_pipeline_names: - pipeline_name = None - extra_data = pipeline_run.extra_data or {} - if self.PIPELINE_NAME_EXTRA_DATA_KEY in extra_data: - pipeline_name = extra_data[self.PIPELINE_NAME_EXTRA_DATA_KEY] - else: - execution_node = session.get( - bts.ExecutionNode, pipeline_run.root_execution_id - ) - if execution_node: - task_spec = structures.TaskSpec.from_json_dict( - execution_node.task_spec - ) - component_spec = task_spec.component_ref.spec - if component_spec: - pipeline_name = component_spec.name - response.pipeline_name = pipeline_name - if include_execution_stats: - execution_status_stats = self._calculate_execution_status_stats( - session=session, root_execution_id=pipeline_run.root_execution_id - ) - response.execution_status_stats = { - status.value: count - for status, count in execution_status_stats.items() - } - return response + next_page_token = self._get_next_page_token( + num_results=len(pipeline_runs), + page_size=PAGE_SIZE, + next_page_token_dict=next_page_token_dict, + ) - return ListPipelineJobsResponse( - pipeline_runs=[ - create_pipeline_run_response(pipeline_run) - for pipeline_run in pipeline_runs - ], + return self._build_list_response( + session=session, + pipeline_runs=pipeline_runs, next_page_token=next_page_token, + include_pipeline_names=include_pipeline_names, + include_execution_stats=include_execution_stats, ) def _calculate_execution_status_stats( diff --git a/cloud_pipelines_backend/backend_types_sql.py b/cloud_pipelines_backend/backend_types_sql.py index af16b3c..928112a 100644 --- a/cloud_pipelines_backend/backend_types_sql.py +++ b/cloud_pipelines_backend/backend_types_sql.py @@ -2,7 +2,7 @@ import datetime import enum import typing -from typing import Any +from typing import Any, Final import sqlalchemy as sql from sqlalchemy import orm @@ -465,6 +465,12 @@ class ContainerExecution(_TableBase): ), ) +PIPELINE_RUN_ANNOTATION_KEY_VALUE_INDEX_NAME: Final[str] = ( + "ix_pipeline_run_annotation_key_value" +) +PIPELINE_RUN_ANNOTATION_VALUE_INDEX_NAME: Final[str] = ( + "ix_pipeline_run_annotation_value" +) class PipelineRunAnnotation(_TableBase): __tablename__ = "pipeline_run_annotation" @@ -476,3 +482,19 @@ class PipelineRunAnnotation(_TableBase): pipeline_run: orm.Mapped[PipelineRun] = orm.relationship(repr=False, init=False) key: orm.Mapped[str] = orm.mapped_column(default=None, primary_key=True) value: orm.Mapped[str | None] = orm.mapped_column(default=None) + + __table_args__ = ( + # Index for searching pipeline runs by annotation key/value + # Enables efficient queries like "find runs where key='environment' and value='production'" + sql.Index( + PIPELINE_RUN_ANNOTATION_KEY_VALUE_INDEX_NAME, + "key", + "value", + ), + # Index for searching pipeline runs by annotation value only (across all keys) + # Enables efficient queries like "find runs where any annotation value='production'" + sql.Index( + PIPELINE_RUN_ANNOTATION_VALUE_INDEX_NAME, + "value", + ), + ) diff --git a/cloud_pipelines_backend/database_ops.py b/cloud_pipelines_backend/database_ops.py index 3d94ed1..e691433 100644 --- a/cloud_pipelines_backend/database_ops.py +++ b/cloud_pipelines_backend/database_ops.py @@ -77,3 +77,13 @@ def migrate_db(db_engine: sqlalchemy.Engine): for index in bts.ExecutionNode.__table__.indexes: if index.name == "ix_execution_node_container_execution_cache_key": index.create(db_engine, checkfirst=True) + + # TODO: I believe we should create an index for the Annotation key + # column and migrate existing data? + # + # Migration for annotation filtering feature + for index in bts.PipelineRunAnnotation.__table__.indexes: + if index.name == bts.PIPELINE_RUN_ANNOTATION_KEY_VALUE_INDEX_NAME: + index.create(db_engine, checkfirst=True) + if index.name == bts.PIPELINE_RUN_ANNOTATION_VALUE_INDEX_NAME: + index.create(db_engine, checkfirst=True) \ No newline at end of file diff --git a/tests/test_pipeline_run_search.py b/tests/test_pipeline_run_search.py new file mode 100644 index 0000000..a6ddfc5 --- /dev/null +++ b/tests/test_pipeline_run_search.py @@ -0,0 +1,499 @@ +"""Tests for the pipeline run search functionality.""" + +from sqlalchemy import orm +import pytest + +from cloud_pipelines_backend import api_server_sql +from cloud_pipelines_backend import database_ops + + +def _initialize_db_and_get_session_factory(): + """Initialize an in-memory SQLite database and return a session factory.""" + db_engine = database_ops.create_db_engine_and_migrate_db(database_uri="sqlite://") + return lambda: orm.Session(bind=db_engine) + + +class TestPipelineRunSearch: + """Tests for PipelineRunsApiService_Sql.search()""" + + def test_search_with_no_filters(self): + """Test search with filters=None returns all pipeline runs.""" + session_factory = _initialize_db_and_get_session_factory() + service = api_server_sql.PipelineRunsApiService_Sql() + + with session_factory() as session: + response = service.search( + session=session, + filters=None, + debug_where_clause=True, + ) + + assert response.debug_where_clause == "(no where clauses)" + assert response.pipeline_runs == [] + + def test_search_with_key_exists_filter(self): + """Test search with KeyFilter EXISTS operator.""" + session_factory = _initialize_db_and_get_session_factory() + service = api_server_sql.PipelineRunsApiService_Sql() + + with session_factory() as session: + filters = api_server_sql.SearchFilters( + annotation_filters=api_server_sql.FilterGroup( + operator=api_server_sql.GroupOperator.AND, + filters=[ + api_server_sql.KeyFilter( + operator=api_server_sql.KeyFilterOperator.EXISTS + ) + ], + ) + ) + + response = service.search( + session=session, + filters=filters, + debug_where_clause=True, + ) + + expected = ( + 'EXISTS (SELECT pipeline_run_annotation.pipeline_run_id, ' + 'pipeline_run_annotation."key", pipeline_run_annotation.value \n' + 'FROM pipeline_run_annotation, pipeline_run \n' + 'WHERE pipeline_run_annotation.pipeline_run_id = pipeline_run.id ' + 'AND pipeline_run_annotation."key" IS NOT NULL)' + ) + assert response.debug_where_clause == expected + + def test_search_with_key_exists_no_group_operator_filter(self): + """Test search with KeyFilter EXISTS operator without specifying group operator. + + When operator is not specified in FilterGroup, it should default to AND logic. + """ + session_factory = _initialize_db_and_get_session_factory() + service = api_server_sql.PipelineRunsApiService_Sql() + + with session_factory() as session: + # Note: operator is not specified, should default to AND + filters = api_server_sql.SearchFilters( + annotation_filters=api_server_sql.FilterGroup( + filters=[ + api_server_sql.KeyFilter( + operator=api_server_sql.KeyFilterOperator.EXISTS + ) + ], + ) + ) + + response = service.search( + session=session, + filters=filters, + debug_where_clause=True, + ) + + # Should produce the same result as test_search_with_key_exists_filter + expected = ( + 'EXISTS (SELECT pipeline_run_annotation.pipeline_run_id, ' + 'pipeline_run_annotation."key", pipeline_run_annotation.value \n' + 'FROM pipeline_run_annotation, pipeline_run \n' + 'WHERE pipeline_run_annotation.pipeline_run_id = pipeline_run.id ' + 'AND pipeline_run_annotation."key" IS NOT NULL)' + ) + assert response.debug_where_clause == expected + + def test_search_with_key_contains_filter(self): + """Test search with KeyFilter CONTAINS operator.""" + session_factory = _initialize_db_and_get_session_factory() + service = api_server_sql.PipelineRunsApiService_Sql() + + with session_factory() as session: + filters = api_server_sql.SearchFilters( + annotation_filters=api_server_sql.FilterGroup( + operator=api_server_sql.GroupOperator.AND, + filters=[ + api_server_sql.KeyFilter( + operator=api_server_sql.KeyFilterOperator.CONTAINS, + key="env", + ) + ], + ) + ) + + response = service.search( + session=session, + filters=filters, + debug_where_clause=True, + ) + + expected = ( + 'EXISTS (SELECT pipeline_run_annotation.pipeline_run_id, ' + 'pipeline_run_annotation."key", pipeline_run_annotation.value \n' + 'FROM pipeline_run_annotation, pipeline_run \n' + 'WHERE pipeline_run_annotation.pipeline_run_id = pipeline_run.id ' + 'AND (pipeline_run_annotation."key" LIKE \'%\' || \'env\' || \'%\'))' + ) + assert response.debug_where_clause == expected + + def test_search_with_key_equals_filter(self): + """Test search with KeyFilter EQUALS operator.""" + session_factory = _initialize_db_and_get_session_factory() + service = api_server_sql.PipelineRunsApiService_Sql() + + with session_factory() as session: + filters = api_server_sql.SearchFilters( + annotation_filters=api_server_sql.FilterGroup( + operator=api_server_sql.GroupOperator.AND, + filters=[ + api_server_sql.KeyFilter( + operator=api_server_sql.KeyFilterOperator.EQUALS, + key="environment", + ) + ], + ) + ) + + response = service.search( + session=session, + filters=filters, + debug_where_clause=True, + ) + + expected = ( + 'EXISTS (SELECT pipeline_run_annotation.pipeline_run_id, ' + 'pipeline_run_annotation."key", pipeline_run_annotation.value \n' + 'FROM pipeline_run_annotation, pipeline_run \n' + 'WHERE pipeline_run_annotation.pipeline_run_id = pipeline_run.id ' + 'AND pipeline_run_annotation."key" = \'environment\')' + ) + assert response.debug_where_clause == expected + + def test_search_with_key_equals_negate_filter(self): + """Test search with KeyFilter EQUALS operator with negate=True.""" + session_factory = _initialize_db_and_get_session_factory() + service = api_server_sql.PipelineRunsApiService_Sql() + + with session_factory() as session: + filters = api_server_sql.SearchFilters( + annotation_filters=api_server_sql.FilterGroup( + operator=api_server_sql.GroupOperator.AND, + filters=[ + api_server_sql.KeyFilter( + operator=api_server_sql.KeyFilterOperator.EQUALS, + key="environment", + negate=True, + ) + ], + ) + ) + + response = service.search( + session=session, + filters=filters, + debug_where_clause=True, + ) + + expected = ( + 'NOT (EXISTS (SELECT pipeline_run_annotation.pipeline_run_id, ' + 'pipeline_run_annotation."key", pipeline_run_annotation.value \n' + 'FROM pipeline_run_annotation, pipeline_run \n' + 'WHERE pipeline_run_annotation.pipeline_run_id = pipeline_run.id ' + 'AND pipeline_run_annotation."key" = \'environment\'))' + ) + assert response.debug_where_clause == expected + + def test_search_with_key_in_set_filter(self): + """Test search with KeyFilter IN_SET operator.""" + session_factory = _initialize_db_and_get_session_factory() + service = api_server_sql.PipelineRunsApiService_Sql() + + with session_factory() as session: + filters = api_server_sql.SearchFilters( + annotation_filters=api_server_sql.FilterGroup( + operator=api_server_sql.GroupOperator.AND, + filters=[ + api_server_sql.KeyFilter( + operator=api_server_sql.KeyFilterOperator.IN_SET, + keys=["environment", "team"], + ) + ], + ) + ) + + response = service.search( + session=session, + filters=filters, + debug_where_clause=True, + ) + + expected = ( + 'EXISTS (SELECT pipeline_run_annotation.pipeline_run_id, ' + 'pipeline_run_annotation."key", pipeline_run_annotation.value \n' + 'FROM pipeline_run_annotation, pipeline_run \n' + 'WHERE pipeline_run_annotation.pipeline_run_id = pipeline_run.id ' + 'AND pipeline_run_annotation."key" IN (\'environment\', \'team\'))' + ) + assert response.debug_where_clause == expected + + def test_search_with_value_contains_filter(self): + """Test search with ValueFilter CONTAINS operator. + + Searches across ALL annotation values for substring match. + """ + session_factory = _initialize_db_and_get_session_factory() + service = api_server_sql.PipelineRunsApiService_Sql() + + with session_factory() as session: + filters = api_server_sql.SearchFilters( + annotation_filters=api_server_sql.FilterGroup( + operator=api_server_sql.GroupOperator.AND, + filters=[ + api_server_sql.ValueFilter( + operator=api_server_sql.ValueFilterOperator.CONTAINS, + value="prod", + ) + ], + ) + ) + + response = service.search( + session=session, + filters=filters, + debug_where_clause=True, + ) + + expected = ( + 'EXISTS (SELECT pipeline_run_annotation.pipeline_run_id, ' + 'pipeline_run_annotation."key", pipeline_run_annotation.value \n' + 'FROM pipeline_run_annotation, pipeline_run \n' + 'WHERE pipeline_run_annotation.pipeline_run_id = pipeline_run.id ' + 'AND (pipeline_run_annotation.value LIKE \'%\' || \'prod\' || \'%\'))' + ) + assert response.debug_where_clause == expected + + def test_search_with_value_equals_filter(self): + """Test search with ValueFilter EQUALS operator. + + Searches across ALL annotation values for exact match. + """ + session_factory = _initialize_db_and_get_session_factory() + service = api_server_sql.PipelineRunsApiService_Sql() + + with session_factory() as session: + filters = api_server_sql.SearchFilters( + annotation_filters=api_server_sql.FilterGroup( + operator=api_server_sql.GroupOperator.AND, + filters=[ + api_server_sql.ValueFilter( + operator=api_server_sql.ValueFilterOperator.EQUALS, + value="production", + ) + ], + ) + ) + + response = service.search( + session=session, + filters=filters, + debug_where_clause=True, + ) + + expected = ( + 'EXISTS (SELECT pipeline_run_annotation.pipeline_run_id, ' + 'pipeline_run_annotation."key", pipeline_run_annotation.value \n' + 'FROM pipeline_run_annotation, pipeline_run \n' + 'WHERE pipeline_run_annotation.pipeline_run_id = pipeline_run.id ' + 'AND pipeline_run_annotation.value = \'production\')' + ) + assert response.debug_where_clause == expected + + def test_search_with_value_equals_negate_filter(self): + """Test search with ValueFilter EQUALS operator with negate=True. + + Searches across ALL annotation values for exact match, then negates. + """ + session_factory = _initialize_db_and_get_session_factory() + service = api_server_sql.PipelineRunsApiService_Sql() + + with session_factory() as session: + filters = api_server_sql.SearchFilters( + annotation_filters=api_server_sql.FilterGroup( + operator=api_server_sql.GroupOperator.AND, + filters=[ + api_server_sql.ValueFilter( + operator=api_server_sql.ValueFilterOperator.EQUALS, + value="production", + negate=True, + ) + ], + ) + ) + + response = service.search( + session=session, + filters=filters, + debug_where_clause=True, + ) + + expected = ( + 'NOT (EXISTS (SELECT pipeline_run_annotation.pipeline_run_id, ' + 'pipeline_run_annotation."key", pipeline_run_annotation.value \n' + 'FROM pipeline_run_annotation, pipeline_run \n' + 'WHERE pipeline_run_annotation.pipeline_run_id = pipeline_run.id ' + 'AND pipeline_run_annotation.value = \'production\'))' + ) + assert response.debug_where_clause == expected + + def test_search_with_value_in_set_filter(self): + """Test search with ValueFilter IN_SET operator. + + Searches across ALL annotation values for set membership. + """ + session_factory = _initialize_db_and_get_session_factory() + service = api_server_sql.PipelineRunsApiService_Sql() + + with session_factory() as session: + filters = api_server_sql.SearchFilters( + annotation_filters=api_server_sql.FilterGroup( + operator=api_server_sql.GroupOperator.AND, + filters=[ + api_server_sql.ValueFilter( + operator=api_server_sql.ValueFilterOperator.IN_SET, + values=["backend", "frontend"], + ) + ], + ) + ) + + response = service.search( + session=session, + filters=filters, + debug_where_clause=True, + ) + + expected = ( + 'EXISTS (SELECT pipeline_run_annotation.pipeline_run_id, ' + 'pipeline_run_annotation."key", pipeline_run_annotation.value \n' + 'FROM pipeline_run_annotation, pipeline_run \n' + 'WHERE pipeline_run_annotation.pipeline_run_id = pipeline_run.id ' + 'AND pipeline_run_annotation.value IN (\'backend\', \'frontend\'))' + ) + assert response.debug_where_clause == expected + + def test_search_with_complex_nested_filters(self): + """Test search with complex nested filter groups. + + Structure: + Root Group (OR): + ├── Group 1 (OR): + │ ├── KeyFilter(CONTAINS, key="env") + │ └── ValueFilter(EQUALS, value="admin", negate=True) + └── Group 2 (AND): + ├── KeyFilter(EXISTS, key="status", negate=True) + └── ValueFilter(IN_SET, values=["high", "critical"]) + """ + session_factory = _initialize_db_and_get_session_factory() + service = api_server_sql.PipelineRunsApiService_Sql() + + with session_factory() as session: + filters = api_server_sql.SearchFilters( + annotation_filters=api_server_sql.FilterGroup( + operator=api_server_sql.GroupOperator.OR, + filters=[ + # Group 1 (OR) + api_server_sql.FilterGroup( + operator=api_server_sql.GroupOperator.OR, + filters=[ + api_server_sql.KeyFilter( + operator=api_server_sql.KeyFilterOperator.CONTAINS, + key="env", + ), + api_server_sql.ValueFilter( + operator=api_server_sql.ValueFilterOperator.EQUALS, + value="admin", + negate=True, + ), + ], + ), + # Group 2 (AND) + api_server_sql.FilterGroup( + operator=api_server_sql.GroupOperator.AND, + filters=[ + api_server_sql.KeyFilter( + operator=api_server_sql.KeyFilterOperator.EXISTS, + key="status", + negate=True, + ), + api_server_sql.ValueFilter( + operator=api_server_sql.ValueFilterOperator.IN_SET, + values=["high", "critical"], + ), + ], + ), + ], + ) + ) + + response = service.search( + session=session, + filters=filters, + debug_where_clause=True, + ) + + # Expected SQL structure: + # + # Root Group (OR): + # ├── Group 1 (OR): + # │ ├── KeyFilter(CONTAINS, key="env") + # │ └── ValueFilter(EQUALS, value="admin", negate=True) + # └── Group 2 (AND): + # ├── KeyFilter(EXISTS, key="status", negate=True) + # └── ValueFilter(IN_SET, values=["high", "critical"]) + # + expected = ( + # ===== Root Group (OR) ===== + # | + # |-- Group 1 (OR) ------------------------------------------ + # | | + # | |-- Filter 1: KeyFilter(CONTAINS, key="env") + '(EXISTS (SELECT pipeline_run_annotation.pipeline_run_id, ' + 'pipeline_run_annotation."key", pipeline_run_annotation.value \n' + 'FROM pipeline_run_annotation, pipeline_run \n' + 'WHERE pipeline_run_annotation.pipeline_run_id = pipeline_run.id ' + 'AND (pipeline_run_annotation."key" LIKE \'%\' || \'env\' || \'%\'))) ' + # | | + # | OR + 'OR ' + # | | + # | |-- Filter 2: ValueFilter(EQUALS, value="admin", negate=True) + 'NOT (EXISTS (SELECT pipeline_run_annotation.pipeline_run_id, ' + 'pipeline_run_annotation."key", pipeline_run_annotation.value \n' + 'FROM pipeline_run_annotation, pipeline_run \n' + 'WHERE pipeline_run_annotation.pipeline_run_id = pipeline_run.id ' + 'AND pipeline_run_annotation.value = \'admin\')) ' + # | + # OR (Root level) + 'OR ' + # | + # |-- Group 2 (AND) ------------------------------------------ + # | | + # | |-- Filter 1: KeyFilter(EXISTS, key="status", negate=True) + 'NOT (EXISTS (SELECT pipeline_run_annotation.pipeline_run_id, ' + 'pipeline_run_annotation."key", pipeline_run_annotation.value \n' + 'FROM pipeline_run_annotation, pipeline_run \n' + 'WHERE pipeline_run_annotation.pipeline_run_id = pipeline_run.id ' + 'AND pipeline_run_annotation."key" IS NOT NULL)) ' + # | | + # | AND + 'AND ' + # | | + # | |-- Filter 2: ValueFilter(IN_SET, values=["high", "critical"]) + '(EXISTS (SELECT pipeline_run_annotation.pipeline_run_id, ' + 'pipeline_run_annotation."key", pipeline_run_annotation.value \n' + 'FROM pipeline_run_annotation, pipeline_run \n' + 'WHERE pipeline_run_annotation.pipeline_run_id = pipeline_run.id ' + 'AND pipeline_run_annotation.value IN (\'high\', \'critical\')))' + # ===== End Root Group ===== + ) + assert response.debug_where_clause == expected + + +if __name__ == "__main__": + pytest.main([__file__, "-v"])