diff --git a/Makefile b/Makefile index 9b6dd98a7..330981637 100644 --- a/Makefile +++ b/Makefile @@ -154,7 +154,8 @@ podmonitors.monitoring.coreos.com,$\ apiservices.apiregistration.k8s.io,$\ horizontalpodautoscalers.autoscaling,$\ oidcpolicies.extensions.kuadrant.io,$\ -planpolicies.extensions.kuadrant.io +planpolicies.extensions.kuadrant.io,$\ +pipelinepolicies.extensions.kuadrant.io clean: ## Clean all objects on cluster created by running this testsuite. Set the env variable USER to delete after someone else @echo "Deleting objects for user: $(USER)" diff --git a/testsuite/kuadrant/extensions/pipeline_policy.py b/testsuite/kuadrant/extensions/pipeline_policy.py new file mode 100644 index 000000000..2149a7492 --- /dev/null +++ b/testsuite/kuadrant/extensions/pipeline_policy.py @@ -0,0 +1,140 @@ +"""Module containing classes related to PipelinePolicy""" + +from typing import Dict, List, Optional + +from testsuite.gateway import Referencable +from testsuite.kubernetes import modify +from testsuite.kubernetes.client import KubernetesClient +from testsuite.kuadrant.policy import Policy + + +class PipelinePolicy(Policy): + """PipelinePolicy for defining declarative action pipelines (request/response actions) on routes""" + + @classmethod + def create_instance( + cls, + cluster: KubernetesClient, + name: str, + target: Referencable, + labels: Dict[str, str] = None, + section_name: str = None, + ): + """Creates base instance""" + model: Dict = { + "apiVersion": "extensions.kuadrant.io/v1alpha1", + "kind": "PipelinePolicy", + "metadata": {"name": name, "namespace": cluster.project, "labels": labels}, + "spec": { + "targetRef": target.reference, + }, + } + if section_name: + model["spec"]["targetRef"]["sectionName"] = section_name + + return cls(model, context=cluster.context) + + @modify + def add_action_method(self, name: str, url: str, service: str, method: str, message_template: str): + """Add a gRPC upstream action method definition""" + self.model.spec.setdefault("actionMethods", []).append( + { + "name": name, + "url": url, + "service": service, + "method": method, + "messageTemplate": message_template, + } + ) + + @modify + def add_request_grpc_method(self, method: str, var: Optional[str] = None, predicate: Optional[str] = None): + """Add a grpc_method request action that calls an upstream""" + action: Dict = {"type": "grpc_method", "method": method} + if var: + action["var"] = var + if predicate: + action["predicate"] = predicate + self.model.spec.setdefault("request", []).append(action) + + @modify + def add_request_deny( + self, + predicate: Optional[str] = None, + with_status: Optional[int] = None, + with_headers: Optional[str] = None, + with_body: Optional[str] = None, + ): + """Add a deny request action""" + action: Dict = {"type": "deny"} + if predicate: + action["predicate"] = predicate + if with_status: + action["withStatus"] = with_status + if with_headers: + action["withHeaders"] = with_headers + if with_body: + action["withBody"] = with_body + self.model.spec.setdefault("request", []).append(action) + + @modify + def add_request_fail(self, log_message: str, predicate: Optional[str] = None): + """Add a fail request action""" + action: Dict = {"type": "fail", "logMessage": log_message} + if predicate: + action["predicate"] = predicate + self.model.spec.setdefault("request", []).append(action) + + @modify + def add_request_headers(self, headers: List[List[str]], predicate: Optional[str] = None): + """Add an add_headers request action""" + action: Dict = {"type": "add_headers", "headersToAdd": str(headers)} + if predicate: + action["predicate"] = predicate + self.model.spec.setdefault("request", []).append(action) + + @modify + def add_response_grpc_method(self, method: str, var: Optional[str] = None, predicate: Optional[str] = None): + """Add a grpc_method response action that calls an upstream""" + action: Dict = {"type": "grpc_method", "method": method} + if var: + action["var"] = var + if predicate: + action["predicate"] = predicate + self.model.spec.setdefault("response", []).append(action) + + @modify + def add_response_deny( + self, + predicate: Optional[str] = None, + with_status: Optional[int] = None, + with_headers: Optional[str] = None, + with_body: Optional[str] = None, + ): + """Add a deny response action""" + action: Dict = {"type": "deny"} + if predicate: + action["predicate"] = predicate + if with_status: + action["withStatus"] = with_status + if with_headers: + action["withHeaders"] = with_headers + if with_body: + action["withBody"] = with_body + self.model.spec.setdefault("response", []).append(action) + + @modify + def add_response_fail(self, log_message: str, predicate: Optional[str] = None): + """Add a fail response action""" + action: Dict = {"type": "fail", "logMessage": log_message} + if predicate: + action["predicate"] = predicate + self.model.spec.setdefault("response", []).append(action) + + @modify + def add_response_headers(self, headers: List[List[str]], predicate: Optional[str] = None): + """Add an add_headers response action""" + action: Dict = {"type": "add_headers", "headersToAdd": str(headers)} + if predicate: + action["predicate"] = predicate + self.model.spec.setdefault("response", []).append(action) diff --git a/testsuite/tests/singlecluster/extensions/pipeline_policy/__init__.py b/testsuite/tests/singlecluster/extensions/pipeline_policy/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/testsuite/tests/singlecluster/extensions/pipeline_policy/conftest.py b/testsuite/tests/singlecluster/extensions/pipeline_policy/conftest.py new file mode 100644 index 000000000..12f08845d --- /dev/null +++ b/testsuite/tests/singlecluster/extensions/pipeline_policy/conftest.py @@ -0,0 +1,20 @@ +"""Shared fixtures for PipelinePolicy testing.""" + +import pytest + +from testsuite.kuadrant.extensions.pipeline_policy import PipelinePolicy + + +@pytest.fixture(scope="module") +def pipeline_policy(cluster, blame, route): + """PipelinePolicy targeting the test HTTPRoute""" + return PipelinePolicy.create_instance(cluster, blame("pipeline"), route) + + +@pytest.fixture(scope="module", autouse=True) +def commit(request, pipeline_policy): + """Commit and wait for PipelinePolicy to be ready.""" + for component in [pipeline_policy]: + request.addfinalizer(component.delete) + component.commit() + component.wait_for_ready() diff --git a/testsuite/tests/singlecluster/extensions/pipeline_policy/test_pipeline_policy_action_method.py b/testsuite/tests/singlecluster/extensions/pipeline_policy/test_pipeline_policy_action_method.py new file mode 100644 index 000000000..e643b1ae9 --- /dev/null +++ b/testsuite/tests/singlecluster/extensions/pipeline_policy/test_pipeline_policy_action_method.py @@ -0,0 +1,102 @@ +"""Tests for PipelinePolicy: action method calling a gRPC backend and conditional response headers.""" + +import pytest + +from testsuite.kubernetes import Selector +from testsuite.kubernetes.deployment import Deployment +from testsuite.kubernetes.service import Service, ServicePort + + +@pytest.fixture(scope="module") +def threat_assessment_service(request, cluster, blame, module_label): + """Deploys the ThreatAssessmentService gRPC backend""" + name = blame("threat") + match_labels = {"app": module_label, "deployment": name} + + deployment = Deployment.create_instance( + cluster, + name, + container_name="threat-assessment", + image="quay.io/kuadrant/threat-assessment-service:latest", + ports={"grpc": 8080}, + selector=Selector(matchLabels=match_labels), + labels={"app": module_label}, + ) + request.addfinalizer(deployment.delete) + deployment.commit() + deployment.wait_for_ready() + + service = Service.create_instance( + cluster, + name, + selector=match_labels, + ports=[ServicePort(name="grpc", port=8080, targetPort="grpc")], + labels={"app": module_label}, + ) + request.addfinalizer(service.delete) + service.commit() + return service + + +THREAT_THRESHOLD = 50 + + +@pytest.fixture(scope="module") +def pipeline_policy(pipeline_policy, threat_assessment_service): # pylint: disable=unused-argument + """Configure PipelinePolicy with threat assessment gRPC action and conditional headers.""" + svc_url = ( + f"grpc://{threat_assessment_service.name()}.{threat_assessment_service.namespace()}.svc.cluster.local:8080" + ) + pipeline_policy.add_action_method( + name="assess-threat", + url=svc_url, + service="threat.v1.ThreatAssessmentService", + method="AssessRequest", + message_template="threat.v1.ThreatRequest{uri: request.path, source_ip: source.address}", + ) + + pipeline_policy.add_request_grpc_method( + method="assess-threat", + var="threatResponse", + predicate='"x-assess-threat" in request.headers', + ) + pipeline_policy.add_request_deny(predicate='request.url_path == "/blocked"', with_status=403) + pipeline_policy.add_request_deny( + predicate=f"threatResponse.threat_level >= {THREAT_THRESHOLD}", + with_status=403, + ) + + pipeline_policy.add_response_headers( + [["x-threat-assessed", "true"]], + predicate='"x-assess-threat" in request.headers', + ) + pipeline_policy.add_response_headers( + [["x-threat-assessed", "false"]], + predicate='!("x-assess-threat" in request.headers)', + ) + pipeline_policy.add_response_headers([["x-threat-threshold", str(THREAT_THRESHOLD)]]) + + return pipeline_policy + + +def test_allowed_path(client): + """Request to an allowed path returns 200 without threat assessment.""" + response = client.get("/get") + assert response.status_code == 200 + + assert response.headers.get("x-threat-assessed") == "false" + assert response.headers.get("x-threat-threshold") == str(THREAT_THRESHOLD) + + +def test_blocked_path(client): + """Request to /blocked is denied by the allow rule.""" + response = client.get("/blocked") + assert response.status_code == 403 + + +def test_threat_assessment_safe(client): + """Request with x-assess-threat header to a safe path passes threat check.""" + response = client.get("/get", headers={"x-assess-threat": "true"}) + assert response.status_code == 200 + assert response.headers.get("x-threat-assessed") == "true" + assert response.headers.get("x-threat-threshold") == str(THREAT_THRESHOLD) diff --git a/testsuite/tests/singlecluster/extensions/pipeline_policy/test_pipeline_policy_basic.py b/testsuite/tests/singlecluster/extensions/pipeline_policy/test_pipeline_policy_basic.py new file mode 100644 index 000000000..3b02ef654 --- /dev/null +++ b/testsuite/tests/singlecluster/extensions/pipeline_policy/test_pipeline_policy_basic.py @@ -0,0 +1,26 @@ +"""Basic happy-path tests for PipelinePolicy: deny action and response headers.""" + +import pytest + +pytestmark = [pytest.mark.kuadrant_only, pytest.mark.extensions] + + +@pytest.fixture(scope="module") +def pipeline_policy(pipeline_policy): + """Configure PipelinePolicy with deny action and response headers.""" + pipeline_policy.add_request_deny(predicate='request.url_path == "/blocked"', with_status=403) + pipeline_policy.add_response_headers([["x-pipeline-policy", "active"]]) + return pipeline_policy + + +def test_allowed_path(client): + """Request to an allowed path returns 200 with the custom response header.""" + response = client.get("/get") + assert response.status_code == 200 + assert response.headers.get("x-pipeline-policy") == "active" + + +def test_blocked_path(client): + """Request to /blocked is denied by the deny action.""" + response = client.get("/blocked") + assert response.status_code == 403 diff --git a/testsuite/tests/singlecluster/extensions/pipeline_policy/test_pipeline_policy_error_status.py b/testsuite/tests/singlecluster/extensions/pipeline_policy/test_pipeline_policy_error_status.py new file mode 100644 index 000000000..befe11990 --- /dev/null +++ b/testsuite/tests/singlecluster/extensions/pipeline_policy/test_pipeline_policy_error_status.py @@ -0,0 +1,125 @@ +"""Tests that PipelinePolicy surfaces error status conditions for invalid action method configurations.""" + +import pytest + +from testsuite.kubernetes import Selector +from testsuite.kubernetes.deployment import Deployment +from testsuite.kubernetes.service import Service, ServicePort +from testsuite.kuadrant.extensions.pipeline_policy import PipelinePolicy +from testsuite.kuadrant.policy import has_condition + +pytestmark = [pytest.mark.kuadrant_only, pytest.mark.extensions] + + +@pytest.fixture(scope="module", autouse=True) +def commit(request, authorization): + """Only commit authorization; each test creates its own policy with bad configuration.""" + request.addfinalizer(authorization.delete) + authorization.commit() + authorization.wait_for_ready() + + +@pytest.fixture(scope="module") +def threat_assessment_service(request, cluster, blame, module_label): + """Deploys the ThreatAssessmentService gRPC backend""" + name = blame("threat") + match_labels = {"app": module_label, "deployment": name} + + deployment = Deployment.create_instance( + cluster, + name, + container_name="threat-assessment", + image="quay.io/kuadrant/threat-assessment-service:latest", + ports={"grpc": 8080}, + selector=Selector(matchLabels=match_labels), + labels={"app": module_label}, + ) + request.addfinalizer(deployment.delete) + deployment.commit() + deployment.wait_for_ready() + + service = Service.create_instance( + cluster, + name, + selector=match_labels, + ports=[ServicePort(name="grpc", port=8080, targetPort="grpc")], + labels={"app": module_label}, + ) + request.addfinalizer(service.delete) + service.commit() + return service + + +def test_nonexistent_url(request, cluster, blame, route): + """PipelinePolicy reports error when action method URL points to a non-existent service.""" + policy = PipelinePolicy.create_instance(cluster, blame("bad-url"), route) + policy.add_action_method( + name="bad-method", + url="grpc://does-not-exist.default.svc.cluster.local:8080", + service="threat.v1.ThreatAssessmentService", + method="AssessRequest", + message_template="threat.v1.ThreatRequest{uri: request.path}", + ) + policy.add_request_deny(predicate='request.url_path == "/blocked"', with_status=403) + policy.add_request_grpc_method(method="bad-method") + + request.addfinalizer(policy.delete) + policy.commit() + + # TODO: add expected message assertion + assert policy.wait_until( + has_condition("Accepted", "False", "Unknown"), + timelimit=60, + ), f"Policy did not reach expected error status, instead: {policy.refresh().model.status.conditions}" + + +def test_wrong_service_name(request, cluster, blame, route, threat_assessment_service): + """PipelinePolicy reports error when action method references a non-existent gRPC service name.""" + svc_url = ( + f"grpc://{threat_assessment_service.name()}" f".{threat_assessment_service.namespace()}.svc.cluster.local:8080" + ) + policy = PipelinePolicy.create_instance(cluster, blame("bad-svc"), route) + policy.add_action_method( + name="bad-service", + url=svc_url, + service="nonexistent.v1.FakeService", + method="DoSomething", + message_template="nonexistent.v1.FakeRequest{uri: request.path}", + ) + policy.add_request_deny(predicate='request.url_path == "/blocked"', with_status=403) + policy.add_request_grpc_method(method="bad-service") + + request.addfinalizer(policy.delete) + policy.commit() + + # TODO: add expected message assertion + assert policy.wait_until( + has_condition("Accepted", "False", "Unknown"), + timelimit=60, + ), f"Policy did not reach expected error status, instead: {policy.refresh().model.status.conditions}" + + +def test_wrong_method_name(request, cluster, blame, route, threat_assessment_service): + """PipelinePolicy reports error when action method references a non-existent gRPC method.""" + svc_url = ( + f"grpc://{threat_assessment_service.name()}" f".{threat_assessment_service.namespace()}.svc.cluster.local:8080" + ) + policy = PipelinePolicy.create_instance(cluster, blame("bad-meth"), route) + policy.add_action_method( + name="wrong-method", + url=svc_url, + service="threat.v1.ThreatAssessmentService", + method="NonExistentMethod", + message_template="threat.v1.ThreatRequest{uri: request.path}", + ) + policy.add_request_deny(predicate='request.url_path == "/blocked"', with_status=403) + policy.add_request_grpc_method(method="wrong-method") + + request.addfinalizer(policy.delete) + policy.commit() + + # TODO: add expected message assertion + assert policy.wait_until( + has_condition("Accepted", "False", "Unknown"), + timelimit=60, + ), f"Policy did not reach expected error status, instead: {policy.refresh().model.status.conditions}"