Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,8 @@ podmonitors.monitoring.coreos.com,$\
apiservices.apiregistration.k8s.io,$\
horizontalpodautoscalers.autoscaling,$\
oidcpolicies.extensions.kuadrant.io,$\
planpolicies.extensions.kuadrant.io
planpolicies.extensions.kuadrant.io,$\
pipelinepolicies.extensions.kuadrant.io

clean: ## Clean all objects on cluster created by running this testsuite. Set the env variable USER to delete after someone else
@echo "Deleting objects for user: $(USER)"
Expand Down
140 changes: 140 additions & 0 deletions testsuite/kuadrant/extensions/pipeline_policy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
"""Module containing classes related to PipelinePolicy"""

from typing import Dict, List, Optional

from testsuite.gateway import Referencable
from testsuite.kubernetes import modify
from testsuite.kubernetes.client import KubernetesClient
from testsuite.kuadrant.policy import Policy


class PipelinePolicy(Policy):
"""PipelinePolicy for defining declarative action pipelines (request/response actions) on routes"""

@classmethod
def create_instance(
cls,
cluster: KubernetesClient,
name: str,
target: Referencable,
labels: Dict[str, str] = None,
section_name: str = None,
):
"""Creates base instance"""
model: Dict = {
"apiVersion": "extensions.kuadrant.io/v1alpha1",
"kind": "PipelinePolicy",
"metadata": {"name": name, "namespace": cluster.project, "labels": labels},
"spec": {
"targetRef": target.reference,
},
}
if section_name:
model["spec"]["targetRef"]["sectionName"] = section_name

return cls(model, context=cluster.context)

@modify
def add_action_method(self, name: str, url: str, service: str, method: str, message_template: str):
"""Add a gRPC upstream action method definition"""
self.model.spec.setdefault("actionMethods", []).append(
{
"name": name,
"url": url,
"service": service,
"method": method,
"messageTemplate": message_template,
}
)

@modify
def add_request_grpc_method(self, method: str, var: Optional[str] = None, predicate: Optional[str] = None):
"""Add a grpc_method request action that calls an upstream"""
action: Dict = {"type": "grpc_method", "method": method}
if var:
action["var"] = var
if predicate:
action["predicate"] = predicate
self.model.spec.setdefault("request", []).append(action)

@modify
def add_request_deny(
self,
predicate: Optional[str] = None,
with_status: Optional[int] = None,
with_headers: Optional[str] = None,
with_body: Optional[str] = None,
):
"""Add a deny request action"""
action: Dict = {"type": "deny"}
if predicate:
action["predicate"] = predicate
if with_status:
action["withStatus"] = with_status
if with_headers:
action["withHeaders"] = with_headers
if with_body:
action["withBody"] = with_body
self.model.spec.setdefault("request", []).append(action)

@modify
def add_request_fail(self, log_message: str, predicate: Optional[str] = None):
"""Add a fail request action"""
action: Dict = {"type": "fail", "logMessage": log_message}
if predicate:
action["predicate"] = predicate
self.model.spec.setdefault("request", []).append(action)

@modify
def add_request_headers(self, headers: List[List[str]], predicate: Optional[str] = None):
"""Add an add_headers request action"""
action: Dict = {"type": "add_headers", "headersToAdd": str(headers)}
if predicate:
action["predicate"] = predicate
self.model.spec.setdefault("request", []).append(action)

@modify
def add_response_grpc_method(self, method: str, var: Optional[str] = None, predicate: Optional[str] = None):
"""Add a grpc_method response action that calls an upstream"""
action: Dict = {"type": "grpc_method", "method": method}
if var:
action["var"] = var
if predicate:
action["predicate"] = predicate
self.model.spec.setdefault("response", []).append(action)

@modify
def add_response_deny(
self,
predicate: Optional[str] = None,
with_status: Optional[int] = None,
with_headers: Optional[str] = None,
with_body: Optional[str] = None,
):
"""Add a deny response action"""
action: Dict = {"type": "deny"}
if predicate:
action["predicate"] = predicate
if with_status:
action["withStatus"] = with_status
if with_headers:
action["withHeaders"] = with_headers
if with_body:
action["withBody"] = with_body
self.model.spec.setdefault("response", []).append(action)

@modify
def add_response_fail(self, log_message: str, predicate: Optional[str] = None):
"""Add a fail response action"""
action: Dict = {"type": "fail", "logMessage": log_message}
if predicate:
action["predicate"] = predicate
self.model.spec.setdefault("response", []).append(action)

@modify
def add_response_headers(self, headers: List[List[str]], predicate: Optional[str] = None):
"""Add an add_headers response action"""
action: Dict = {"type": "add_headers", "headersToAdd": str(headers)}
if predicate:
action["predicate"] = predicate
self.model.spec.setdefault("response", []).append(action)
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
"""Shared fixtures for PipelinePolicy testing."""

import pytest

from testsuite.kuadrant.extensions.pipeline_policy import PipelinePolicy


@pytest.fixture(scope="module")
def pipeline_policy(cluster, blame, route):
"""PipelinePolicy targeting the test HTTPRoute"""
return PipelinePolicy.create_instance(cluster, blame("pipeline"), route)


@pytest.fixture(scope="module", autouse=True)
def commit(request, pipeline_policy):
"""Commit and wait for PipelinePolicy to be ready."""
for component in [pipeline_policy]:
request.addfinalizer(component.delete)
component.commit()
component.wait_for_ready()
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
"""Tests for PipelinePolicy: action method calling a gRPC backend and conditional response headers."""

import pytest

from testsuite.kubernetes import Selector
from testsuite.kubernetes.deployment import Deployment
from testsuite.kubernetes.service import Service, ServicePort


@pytest.fixture(scope="module")
def threat_assessment_service(request, cluster, blame, module_label):
"""Deploys the ThreatAssessmentService gRPC backend"""
name = blame("threat")
match_labels = {"app": module_label, "deployment": name}

deployment = Deployment.create_instance(
cluster,
name,
container_name="threat-assessment",
image="quay.io/kuadrant/threat-assessment-service:latest",
ports={"grpc": 8080},
selector=Selector(matchLabels=match_labels),
labels={"app": module_label},
)
request.addfinalizer(deployment.delete)
deployment.commit()
deployment.wait_for_ready()

service = Service.create_instance(
cluster,
name,
selector=match_labels,
ports=[ServicePort(name="grpc", port=8080, targetPort="grpc")],
labels={"app": module_label},
)
request.addfinalizer(service.delete)
service.commit()
return service


THREAT_THRESHOLD = 50


@pytest.fixture(scope="module")
def pipeline_policy(pipeline_policy, threat_assessment_service): # pylint: disable=unused-argument
"""Configure PipelinePolicy with threat assessment gRPC action and conditional headers."""
svc_url = (
f"grpc://{threat_assessment_service.name()}.{threat_assessment_service.namespace()}.svc.cluster.local:8080"
)
pipeline_policy.add_action_method(
name="assess-threat",
url=svc_url,
service="threat.v1.ThreatAssessmentService",
method="AssessRequest",
message_template="threat.v1.ThreatRequest{uri: request.path, source_ip: source.address}",
)

pipeline_policy.add_request_grpc_method(
method="assess-threat",
var="threatResponse",
predicate='"x-assess-threat" in request.headers',
)
pipeline_policy.add_request_deny(predicate='request.url_path == "/blocked"', with_status=403)
pipeline_policy.add_request_deny(
predicate=f"threatResponse.threat_level >= {THREAT_THRESHOLD}",
with_status=403,
)

pipeline_policy.add_response_headers(
[["x-threat-assessed", "true"]],
predicate='"x-assess-threat" in request.headers',
)
pipeline_policy.add_response_headers(
[["x-threat-assessed", "false"]],
predicate='!("x-assess-threat" in request.headers)',
)
pipeline_policy.add_response_headers([["x-threat-threshold", str(THREAT_THRESHOLD)]])

return pipeline_policy


def test_allowed_path(client):
"""Request to an allowed path returns 200 without threat assessment."""
response = client.get("/get")
assert response.status_code == 200

assert response.headers.get("x-threat-assessed") == "false"
assert response.headers.get("x-threat-threshold") == str(THREAT_THRESHOLD)


def test_blocked_path(client):
"""Request to /blocked is denied by the allow rule."""
response = client.get("/blocked")
assert response.status_code == 403


def test_threat_assessment_safe(client):
"""Request with x-assess-threat header to a safe path passes threat check."""
response = client.get("/get", headers={"x-assess-threat": "true"})
assert response.status_code == 200
assert response.headers.get("x-threat-assessed") == "true"
assert response.headers.get("x-threat-threshold") == str(THREAT_THRESHOLD)
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
"""Basic happy-path tests for PipelinePolicy: deny action and response headers."""

import pytest

pytestmark = [pytest.mark.kuadrant_only, pytest.mark.extensions]


@pytest.fixture(scope="module")
def pipeline_policy(pipeline_policy):
"""Configure PipelinePolicy with deny action and response headers."""
pipeline_policy.add_request_deny(predicate='request.url_path == "/blocked"', with_status=403)
pipeline_policy.add_response_headers([["x-pipeline-policy", "active"]])
return pipeline_policy


def test_allowed_path(client):
"""Request to an allowed path returns 200 with the custom response header."""
response = client.get("/get")
assert response.status_code == 200
assert response.headers.get("x-pipeline-policy") == "active"


def test_blocked_path(client):
"""Request to /blocked is denied by the deny action."""
response = client.get("/blocked")
assert response.status_code == 403
Loading
Loading