From 0caea370640de320e2facad1ab137ea88f9c20eb Mon Sep 17 00:00:00 2001 From: Vladislav Polyakov Date: Thu, 2 Apr 2026 13:59:08 +0300 Subject: [PATCH 01/13] Refactor SLO workload to use environment variables - Migrate from CLI subcommands (table-run, topic-run) to environment-based configuration (WORKLOAD_NAME, YDB_ENDPOINT, etc.) - Simplify argument parsing: remove subcommand structure, add env var injection - Update metrics collection to always be enabled (remove otlp_endpoint checks) - Replace quantile-estimator with hdrhistogram - Update workflow matrix from include to sdk naming - Upgrade ydb-slo-action from commit hash to v2 tag - Add pyrightconfig.json for type checking - Update Dockerfile comments to reflect new env-var-based interface --- .github/workflows/slo-report.yml | 37 +---- .github/workflows/slo.yml | 153 ++++------------- pyproject.toml | 3 + tests/slo/Dockerfile | 18 +- tests/slo/requirements.txt | 2 +- tests/slo/src/__main__.py | 4 +- tests/slo/src/core/metrics.py | 220 +++++++++++++------------ tests/slo/src/jobs/async_topic_jobs.py | 4 - tests/slo/src/jobs/base.py | 3 - tests/slo/src/jobs/table_jobs.py | 4 +- tests/slo/src/options.py | 214 +++++++----------------- tests/slo/src/pyrightconfig.json | 4 + tests/slo/src/root_runner.py | 116 +++++-------- tests/slo/src/runners/table_runner.py | 2 +- tests/slo/src/runners/topic_runner.py | 4 +- 15 files changed, 268 insertions(+), 520 deletions(-) create mode 100644 tests/slo/src/pyrightconfig.json diff --git a/.github/workflows/slo-report.yml b/.github/workflows/slo-report.yml index 07231d3b..049f919e 100644 --- a/.github/workflows/slo-report.yml +++ b/.github/workflows/slo-report.yml @@ -16,42 +16,7 @@ jobs: pull-requests: write steps: - name: Publish YDB SLO Report - uses: ydb-platform/ydb-slo-action/report@13c687b7d4b2879da79dd12932dee0ed2b65dd1c + uses: ydb-platform/ydb-slo-action/report@v2 with: github_token: ${{ secrets.GITHUB_TOKEN }} github_run_id: ${{ github.event.workflow_run.id }} - - remove-slo-label: - if: always() && github.event.workflow_run.event == 'pull_request' - name: Remove SLO Label - needs: ydb-slo-action-report - runs-on: ubuntu-latest - permissions: - pull-requests: write - steps: - - name: Remove SLO label from PR - uses: actions/github-script@v7 - with: - script: | - const pullRequests = context.payload.workflow_run.pull_requests; - if (pullRequests && pullRequests.length > 0) { - for (const pr of pullRequests) { - try { - await github.rest.issues.removeLabel({ - owner: context.repo.owner, - repo: context.repo.repo, - issue_number: pr.number, - name: 'SLO' - }); - console.log(`Removed SLO label from PR #${pr.number}`); - } catch (error) { - if (error.status === 404) { - console.log(`SLO label not found on PR #${pr.number}, skipping`); - } else { - throw error; - } - } - } - } else { - console.log('No pull requests associated with this workflow run'); - } diff --git a/.github/workflows/slo.yml b/.github/workflows/slo.yml index fbbf66cc..465e173a 100644 --- a/.github/workflows/slo.yml +++ b/.github/workflows/slo.yml @@ -42,13 +42,11 @@ jobs: strategy: fail-fast: false matrix: - include: - - id: sync-table - prefix: table - workload: sync-table - - id: sync-query - prefix: table - workload: sync-query + sdk: + - name: sync-table + command: "--read-rps ${{ inputs.slo_workload_read_max_rps || '1000' }} --write-rps ${{ inputs.slo_workload_write_max_rps || '100' }}" + - name: sync-query + command: "--read-rps ${{ inputs.slo_workload_read_max_rps || '1000' }} --write-rps ${{ inputs.slo_workload_write_max_rps || '100' }}" concurrency: group: slo-${{ github.ref }}-${{ matrix.workload }} @@ -141,125 +139,32 @@ jobs: -t "ydb-app-baseline" \ "$GITHUB_WORKSPACE/baseline" - - name: Initialize YDB SLO - id: ydb_slo - uses: ydb-platform/ydb-slo-action/init@13c687b7d4b2879da79dd12932dee0ed2b65dd1c + - name: Run SLO Tests + uses: ydb-platform/ydb-slo-action/init@v2 + timeout-minutes: 30 with: - github_issue: ${{ github.event.pull_request.number || inputs.github_issue }} + github_issue: ${{ github.event.inputs.github_issue }} github_token: ${{ secrets.GITHUB_TOKEN }} - workload_name: ydb-python-${{ matrix.workload }} + workload_name: ${{ matrix.sdk.name }} + workload_duration: ${{ inputs.slo_workload_duration_seconds || '600' }} workload_current_ref: ${{ github.head_ref || github.ref_name }} + workload_current_image: ydb-app-current + workload_current_command: ${{ matrix.sdk.command }} workload_baseline_ref: ${{ steps.baseline.outputs.ref }} - - - name: Prepare SLO Database - run: | - docker run --rm \ - --network ydb_ydb-net \ - --add-host "ydb:172.28.0.11" \ - --add-host "ydb:172.28.0.12" \ - --add-host "ydb:172.28.0.13" \ - --add-host "ydb:172.28.0.99" \ - -e "WORKLOAD=${{ matrix.workload }}" \ - -e "REF=${{ github.head_ref || github.ref_name }}" \ - ydb-app-current \ - ${{ matrix.prefix }}-create grpc://ydb:2136 /Root/testdb - - - name: Run SLO Tests (current + baseline in parallel) - timeout-minutes: 15 - env: - WORKLOAD: ${{ matrix.workload }} - DURATION: ${{ inputs.slo_workload_duration_seconds || 600 }} - READ_RPS: ${{ inputs.slo_workload_read_max_rps || 1000 }} - WRITE_RPS: ${{ inputs.slo_workload_write_max_rps || 100 }} - CURRENT_REF: ${{ github.head_ref || github.ref_name }} - BASELINE_REF: ${{ steps.baseline.outputs.ref }} - run: | - ARGS="${{ matrix.prefix }}-run grpc://ydb:2136 /Root/testdb \ - --otlp-endpoint http://prometheus:9090/api/v1/otlp/v1/metrics \ - --report-period 250 \ - --time ${DURATION} \ - --read-rps ${READ_RPS} \ - --write-rps ${WRITE_RPS} \ - --read-timeout 1000 \ - --write-timeout 1000" - - echo "Starting current workload (ref=${CURRENT_REF}, workload=${WORKLOAD})..." - docker run -d \ - --name ydb-app-current \ - --network ydb_ydb-net \ - --add-host "ydb:172.28.0.11" \ - --add-host "ydb:172.28.0.12" \ - --add-host "ydb:172.28.0.13" \ - --add-host "ydb:172.28.0.99" \ - -e "REF=${CURRENT_REF}" \ - -e "WORKLOAD=${WORKLOAD}" \ - ydb-app-current \ - $ARGS - - echo "Starting baseline workload (ref=${BASELINE_REF}, workload=${WORKLOAD})..." - docker run -d \ - --name ydb-app-baseline \ - --network ydb_ydb-net \ - --add-host "ydb:172.28.0.11" \ - --add-host "ydb:172.28.0.12" \ - --add-host "ydb:172.28.0.13" \ - --add-host "ydb:172.28.0.99" \ - -e "REF=${BASELINE_REF}" \ - -e "WORKLOAD=${WORKLOAD}" \ - ydb-app-baseline \ - $ARGS - - echo "" - echo "==================== INITIAL CURRENT LOGS ====================" - docker logs -n 15 ydb-app-current 2>&1 || echo "No current container" - echo "" - echo "==================== INITIAL BASELINE LOGS ====================" - docker logs -n 15 ydb-app-baseline 2>&1 || echo "No baseline container" - echo "" - - echo "Waiting for workloads to complete (${DURATION}s)..." - sleep ${DURATION} - - echo "Stopping containers after ${DURATION}s..." - docker stop --timeout=30 ydb-app-current ydb-app-baseline 2>&1 || true - - # Force kill if still running - docker kill ydb-app-current ydb-app-baseline 2>&1 || true - - # Check exit codes - CURRENT_EXIT=$(docker inspect ydb-app-current --format='{{.State.ExitCode}}' 2>/dev/null || echo "1") - BASELINE_EXIT=$(docker inspect ydb-app-baseline --format='{{.State.ExitCode}}' 2>/dev/null || echo "1") - - echo "Current exit code: ${CURRENT_EXIT}" - echo "Baseline exit code: ${BASELINE_EXIT}" - - echo "" - echo "==================== FINAL CURRENT LOGS ====================" - docker logs -n 15 ydb-app-current 2>&1 || echo "No current container" - echo "" - echo "==================== FINAL BASELINE LOGS ====================" - docker logs -n 15 ydb-app-baseline 2>&1 || echo "No baseline container" - echo "" - - if [[ "${CURRENT_EXIT}" != "0" || "${BASELINE_EXIT}" != "0" ]]; then - echo "One or both workloads failed." - exit 0 - fi - - echo "SUCCESS: Workloads completed successfully" - - - if: always() - name: Store logs - run: | - docker logs ydb-app-current > current.log 2>&1 || echo "No current container" > current.log - docker logs ydb-app-baseline > baseline.log 2>&1 || echo "No baseline container" > baseline.log - - - if: always() - name: Upload logs - uses: actions/upload-artifact@v4 + workload_baseline_image: ydb-app-baseline + workload_baseline_command: ${{ matrix.sdk.command }} + + ydb-slo-action-report: + runs-on: ubuntu-latest + name: Publish YDB SLO Report + needs: ydb-slo-action + permissions: + checks: write + contents: read + pull-requests: write + steps: + - name: Publish YDB SLO Report + uses: ydb-platform/ydb-slo-action/report@v2 with: - name: ydb-python-${{ matrix.workload }}-logs - path: | - ./current.log - ./baseline.log - retention-days: 1 + github_token: ${{ secrets.GITHUB_TOKEN }} + github_run_id: ${{ github.run_id }} diff --git a/pyproject.toml b/pyproject.toml index 41e7ef6f..13ec71e7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,3 +1,6 @@ +[tool.ty.environment] +extra-paths = ["tests/slo/src"] + [tool.black] line-length = 120 diff --git a/tests/slo/Dockerfile b/tests/slo/Dockerfile index 4f18a2b3..ecf1bca2 100644 --- a/tests/slo/Dockerfile +++ b/tests/slo/Dockerfile @@ -1,13 +1,21 @@ # syntax=docker/dockerfile:1 # This image packages the Python SLO workload runner. -# It expects to be run with arguments like: -# docker run --rm table-run --otlp-endpoint http://prometheus:9090/api/v1/otlp/v1/metrics ... +# +# Connection and workload identity are configured via environment variables: +# YDB_ENDPOINT grpc://ydb:2136 +# YDB_DATABASE /Root/testdb +# WORKLOAD_DURATION 600 +# WORKLOAD_NAME sync-query | sync-table | topic +# WORKLOAD_REF +# OTEL_EXPORTER_OTLP_METRICS_ENDPOINT http://ydb-prometheus:9090/api/v1/otlp/v1/metrics +# +# Additional tuning flags (read/write RPS, timeouts, thread counts) are passed via +# the Docker CMD, e.g.: +# docker run --rm --env-file ... --read-rps 1000 --write-rps 100 # # Notes: # - OpenTelemetry 1.39.x requires Python >= 3.9. -# - The entrypoint is `python ./tests/slo/src`, i.e. it runs the `__main__.py` -# from that directory (same as `python tests/slo/src ...` in CI). FROM python:3.11-slim AS build @@ -36,3 +44,5 @@ COPY --from=build /opt/venv /opt/venv COPY --from=build /src/tests/slo/src /app/tests/slo/src ENTRYPOINT ["python", "./tests/slo/src"] + +CMD ["--read-rps", "1000", "--write-rps", "100"] diff --git a/tests/slo/requirements.txt b/tests/slo/requirements.txt index cd5cdfe1..01f894a1 100644 --- a/tests/slo/requirements.txt +++ b/tests/slo/requirements.txt @@ -1,6 +1,6 @@ requests==2.33.0 aiolimiter==1.1.0 -quantile-estimator==0.1.2 +hdrhistogram # OpenTelemetry (OTLP/HTTP exporter) # NOTE: OpenTelemetry 1.39.1 requires Python >= 3.9. diff --git a/tests/slo/src/__main__.py b/tests/slo/src/__main__.py index dd1ae0b7..96eb186c 100644 --- a/tests/slo/src/__main__.py +++ b/tests/slo/src/__main__.py @@ -2,7 +2,7 @@ import logging from options import parse_options -from root_runner import run_from_args +from root_runner import run_all if __name__ == "__main__": @@ -12,4 +12,4 @@ log_level = logging.DEBUG if args.debug else logging.INFO logging.basicConfig(level=log_level, format="%(asctime)s %(levelname)-8s %(message)s") - run_from_args(args) + run_all(args) diff --git a/tests/slo/src/core/metrics.py b/tests/slo/src/core/metrics.py index bff90eda..cacd73ea 100644 --- a/tests/slo/src/core/metrics.py +++ b/tests/slo/src/core/metrics.py @@ -1,6 +1,7 @@ from __future__ import annotations import logging +import threading import time from abc import ABC, abstractmethod from collections.abc import Iterable @@ -12,8 +13,8 @@ OP_TYPE_READ, OP_TYPE_WRITE = "read", "write" OP_STATUS_SUCCESS, OP_STATUS_FAILURE = "success", "err" -REF = environ.get("REF", "main") -WORKLOAD = environ.get("WORKLOAD", "sync-query") +WORKLOAD_REF = environ.get("WORKLOAD_REF", environ.get("REF", "main")) +WORKLOAD_NAME = environ.get("WORKLOAD_NAME", environ.get("WORKLOAD", "sync-query")) logger = logging.getLogger(__name__) @@ -101,116 +102,119 @@ def push(self) -> None: class OtlpMetrics(BaseMetrics): """ - Canonical OpenTelemetry metrics implementation. - - This exports metrics via OTLP/HTTP to a Prometheus server with OTLP receiver enabled: - POST http(s)://:/api/v1/otlp/v1/metrics - - Naming notes: - - Metric names follow OpenTelemetry conventions (dot-separated namespaces, e.g. `sdk.operations.total`). - - Prometheus OTLP translation typically converts dots to underscores and may add suffixes like - `_total` for counters and `_bucket/_sum/_count` for histograms. + OpenTelemetry metrics implementation. + + Exports via OTLP/HTTP; the endpoint is configured through standard OTel env vars: + OTEL_EXPORTER_OTLP_METRICS_ENDPOINT (e.g. http://ydb-prometheus:9090/api/v1/otlp/v1/metrics) + OTEL_EXPORTER_OTLP_ENDPOINT (fallback base URL) + OTEL_EXPORTER_OTLP_PROTOCOL (default: http/protobuf) + + Latency is tracked with an HDR histogram per (operation_type, operation_status) label + combination and published as three Gauge instruments: + sdk_operation_latency_p50_seconds + sdk_operation_latency_p95_seconds + sdk_operation_latency_p99_seconds """ - def __init__(self, otlp_metrics_endpoint: str): + # HDR histogram range: 1 µs … 60 s (in microseconds), 3 significant figures. + _HDR_MIN_US = 1 + _HDR_MAX_US = 60_000_000 + _HDR_SIG_FIGS = 3 + + def __init__(self): from opentelemetry.exporter.otlp.proto.http.metric_exporter import ( OTLPMetricExporter, ) from opentelemetry.sdk.metrics import MeterProvider from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader - from opentelemetry.sdk.metrics.view import ( - ExplicitBucketHistogramAggregation, - View, - ) from opentelemetry.sdk.resources import Resource - # Resource attributes: Prometheus maps service.name -> job, service.instance.id -> instance. resource = Resource.create( { - "service.name": f"workload-{WORKLOAD}", - "service.instance.id": environ.get("SLO_INSTANCE_ID", f"{REF}-{WORKLOAD}"), - "ref": REF, + "service.name": f"workload-{WORKLOAD_NAME}", + "service.instance.id": environ.get( + "SLO_INSTANCE_ID", f"{WORKLOAD_REF}-{WORKLOAD_NAME}" + ), + "ref": WORKLOAD_REF, "sdk": "ydb-python-sdk", "sdk_version": version("ydb"), - "workload": WORKLOAD, + "workload": WORKLOAD_NAME, "workload_version": "0.0.0", } ) - exporter = OTLPMetricExporter(endpoint=otlp_metrics_endpoint) - reader = PeriodicExportingMetricReader(exporter) # we force_flush() explicitly in push() - - latency_view = View( - instrument_name="sdk.operation.latency", - aggregation=ExplicitBucketHistogramAggregation( - boundaries=( - 0.001, - 0.002, - 0.003, - 0.004, - 0.005, - 0.0075, - 0.010, - 0.020, - 0.050, - 0.100, - 0.200, - 0.500, - 1.000, - ) - ), - ) + # Endpoint is read automatically from OTEL_EXPORTER_OTLP_METRICS_ENDPOINT / + # OTEL_EXPORTER_OTLP_ENDPOINT by the exporter; no need to pass it explicitly. + exporter = OTLPMetricExporter() + reader = PeriodicExportingMetricReader(exporter) - self._provider = MeterProvider( - resource=resource, - metric_readers=[reader], - views=[latency_view], - ) + self._provider = MeterProvider(resource=resource, metric_readers=[reader]) self._meter = self._provider.get_meter("ydb-slo") - # Instruments (sync) + # Counters self._errors = self._meter.create_counter( name="sdk.errors.total", description="Total number of errors encountered, categorized by error type.", ) self._operations_total = self._meter.create_counter( name="sdk.operations.total", - description="Total number of operations, categorized by type attempted by the SDK.", + description="Total number of operations attempted by the SDK.", ) self._operations_success_total = self._meter.create_counter( name="sdk.operations.success.total", - description="Total number of successful operations, categorized by type.", + description="Total number of successful operations.", ) self._operations_failure_total = self._meter.create_counter( name="sdk.operations.failure.total", - description="Total number of failed operations, categorized by type.", + description="Total number of failed operations.", ) - self._latency = self._meter.create_histogram( - name="sdk.operation.latency", - unit="s", - description="Latency of operations performed by the SDK in seconds, categorized by type and status.", + self._retry_attempts_total = self._meter.create_counter( + name="sdk.retry.attempts.total", + description="Total number of retry attempts.", ) - self._pending = self._meter.create_up_down_counter( name="sdk.pending.operations", - description="Current number of pending operations, categorized by type.", + description="Current number of pending operations.", ) - self._retry_attempts_total = self._meter.create_counter( - name="sdk.retry.attempts.total", - description="Total number of retry attempts, categorized by ref and operation type.", + # Latency gauges (fed from HDR histograms via push()) + self._latency_p50 = self._meter.create_gauge( + name="sdk_operation_latency_p50_seconds", + unit="s", + description="P50 operation latency in seconds.", + ) + self._latency_p95 = self._meter.create_gauge( + name="sdk_operation_latency_p95_seconds", + unit="s", + description="P95 operation latency in seconds.", + ) + self._latency_p99 = self._meter.create_gauge( + name="sdk_operation_latency_p99_seconds", + unit="s", + description="P99 operation latency in seconds.", ) + # HDR histograms: key → (operation_type, operation_status) + self._hdr_lock = threading.Lock() + self._hdr: dict = {} + self.reset() + def _hdr_for(self, key: tuple): + """Return (creating if necessary) an HDR histogram for the given label key.""" + from hdrh.histogram import HdrHistogram + + hist = self._hdr.get(key) + if hist is None: + hist = HdrHistogram(self._HDR_MIN_US, self._HDR_MAX_US, self._HDR_SIG_FIGS) + self._hdr[key] = hist + return hist + def start(self, labels) -> float: labels_t = _normalize_labels(labels) self._pending.add( 1, - attributes={ - "ref": REF, - "operation_type": labels_t[0], - }, + attributes={"ref": WORKLOAD_REF, "operation_type": labels_t[0]}, ) return time.time() @@ -223,76 +227,74 @@ def stop( ) -> None: labels_t = _normalize_labels(labels) duration = time.time() - start_time + duration_us = max(self._HDR_MIN_US, int(duration * 1_000_000)) op_type = labels_t[0] - base_attrs = { - "ref": REF, - "operation_type": op_type, - } + op_status = OP_STATUS_SUCCESS if error is None else OP_STATUS_FAILURE + base_attrs = {"ref": WORKLOAD_REF, "operation_type": op_type} - # Update instruments self._retry_attempts_total.add(int(attempts), attributes=base_attrs) self._pending.add(-1, attributes=base_attrs) - - # Counters + latency self._operations_total.add(1, attributes=base_attrs) if error is not None: self._errors.add( 1, - attributes={ - **base_attrs, - "error_type": type(error).__name__, - }, + attributes={**base_attrs, "error_type": type(error).__name__}, ) self._operations_failure_total.add(1, attributes=base_attrs) - self._latency.record( - duration, - attributes={ - **base_attrs, - "operation_status": OP_STATUS_FAILURE, - }, - ) - return - - self._operations_success_total.add(1, attributes=base_attrs) - self._latency.record( - duration, - attributes={ - **base_attrs, - "operation_status": OP_STATUS_SUCCESS, - }, - ) + else: + self._operations_success_total.add(1, attributes=base_attrs) + + with self._hdr_lock: + self._hdr_for((op_type, op_status)).record_value(duration_us) def push(self) -> None: - # Metrics job calls push() with the cadence of --report-period. - # force_flush() makes the exporter send immediately. + with self._hdr_lock: + snapshot = {k: v for k, v in self._hdr.items()} + + for (op_type, op_status), hist in snapshot.items(): + attrs = { + "ref": WORKLOAD_REF, + "operation_type": op_type, + "operation_status": op_status, + } + p50 = hist.get_value_at_percentile(50.0) / 1_000_000 + p95 = hist.get_value_at_percentile(95.0) / 1_000_000 + p99 = hist.get_value_at_percentile(99.0) / 1_000_000 + self._latency_p50.set(p50, attributes=attrs) + self._latency_p95.set(p95, attributes=attrs) + self._latency_p99.set(p99, attributes=attrs) + self._provider.force_flush() def reset(self) -> None: - # OpenTelemetry counters/histograms are cumulative and cannot be reset. - # Reset is implemented as an immediate push/flush. self.push() -def create_metrics(otlp_endpoint: Optional[str]) -> BaseMetrics: +def create_metrics() -> BaseMetrics: """ Factory used by SLO runners. - Metrics are enabled if either: - - OTLP_ENDPOINT env var is set, or - - `--otlp-endpoint` is provided (and non-empty) - - If endpoint is empty, metrics are disabled (DummyMetrics). + Metrics are enabled when OTEL_EXPORTER_OTLP_METRICS_ENDPOINT (or the generic + OTEL_EXPORTER_OTLP_ENDPOINT) is set in the environment; otherwise a no-op + DummyMetrics is returned. """ - endpoint = (environ.get("OTLP_ENDPOINT") or (otlp_endpoint or "")).strip() + endpoint = ( + environ.get("OTEL_EXPORTER_OTLP_METRICS_ENDPOINT") + or environ.get("OTEL_EXPORTER_OTLP_ENDPOINT") + or "" + ).strip() + if not endpoint: - logger.info("Creating dummy metrics (metrics disabled)") + logger.info("OTLP endpoint not configured — metrics disabled") return DummyMetrics() - logger.info("Creating OTLP metrics exporter to Prometheus: %s", endpoint) + logger.info("Creating OTLP metrics exporter (endpoint from env)") try: - return OtlpMetrics(endpoint) + return OtlpMetrics() except Exception: - logger.exception("Failed to initialize OTLP metrics exporter; falling back to DummyMetrics") + logger.exception( + "Failed to initialize OTLP metrics — falling back to DummyMetrics" + ) return DummyMetrics() diff --git a/tests/slo/src/jobs/async_topic_jobs.py b/tests/slo/src/jobs/async_topic_jobs.py index 92ae9fa4..03f10f30 100644 --- a/tests/slo/src/jobs/async_topic_jobs.py +++ b/tests/slo/src/jobs/async_topic_jobs.py @@ -120,10 +120,6 @@ async def _run_topic_reads(self, limiter: AsyncLimiter): logger.info("Stop async topic read workload") def _run_metric_job(self): - # Metrics are enabled only if an OTLP endpoint is provided (CLI: --otlp-endpoint). - if not getattr(self.args, "otlp_endpoint", None): - return [] - task = asyncio.create_task( self._async_metric_sender(self.args.time), name="slo_metrics_sender", diff --git a/tests/slo/src/jobs/base.py b/tests/slo/src/jobs/base.py index 32246fa7..4bf1a529 100644 --- a/tests/slo/src/jobs/base.py +++ b/tests/slo/src/jobs/base.py @@ -54,9 +54,6 @@ def run_tests(self): pass def _run_metric_job(self): - if not getattr(self.args, "otlp_endpoint", None): - return [] - report_period_ms = max(1, int(self.args.report_period)) limiter = SyncRateLimiter(min_interval_s=report_period_ms / 1000.0) diff --git a/tests/slo/src/jobs/table_jobs.py b/tests/slo/src/jobs/table_jobs.py index 2452c7b7..8e4cf6bd 100644 --- a/tests/slo/src/jobs/table_jobs.py +++ b/tests/slo/src/jobs/table_jobs.py @@ -92,9 +92,9 @@ def __init__(self, driver, args, metrics, table_name, max_id): self.table_name = table_name self.max_id = max_id - from core.metrics import WORKLOAD + from core.metrics import WORKLOAD_NAME - self.workload_type = WORKLOAD + self.workload_type = WORKLOAD_NAME def run_tests(self): if self.workload_type == "sync-table": diff --git a/tests/slo/src/options.py b/tests/slo/src/options.py index 7082d086..11094eb7 100644 --- a/tests/slo/src/options.py +++ b/tests/slo/src/options.py @@ -1,182 +1,86 @@ import argparse +import os -def add_common_options(parser): - parser.add_argument("endpoint", help="YDB endpoint") - parser.add_argument("db", help="YDB database name") - parser.add_argument("-t", "--table-name", default="key_value", help="Table name") - parser.add_argument("--debug", action="store_true", help="Enable debug logging") - parser.add_argument("--async", action="store_true", help="Use async mode for operations") - +def parse_options(): + """ + Parse CLI arguments (passed via Docker CMD section). + Connection, duration, and workload identity are configured via environment variables: + YDB_ENDPOINT — YDB endpoint (e.g. grpc://ydb:2136) + YDB_DATABASE — YDB database path (e.g. /Root/testdb) + WORKLOAD_DURATION — total run duration in seconds (default: 600) + """ + parser = argparse.ArgumentParser( + description="YDB Python SLO workload", + formatter_class=argparse.RawDescriptionHelpFormatter, + ) -def make_table_create_parser(subparsers): - table_create_parser = subparsers.add_parser("table-create", help="Create tables and fill with initial content") - add_common_options(table_create_parser) + parser.add_argument("--debug", action="store_true", help="Enable debug logging") - table_create_parser.add_argument( - "-p-min", - "--min-partitions-count", - default=6, - type=int, - help="Minimum amount of partitions in table", - ) - table_create_parser.add_argument( - "-p-max", - "--max-partitions-count", - default=1000, - type=int, - help="Maximum amount of partitions in table", + # Table params + parser.add_argument("--table-name", default="key_value", help="Table name") + parser.add_argument("--min-partitions-count", default=6, type=int) + parser.add_argument("--max-partitions-count", default=1000, type=int) + parser.add_argument( + "--partition-size", default=100, type=int, help="Partition size [mb]" ) - table_create_parser.add_argument("-p-size", "--partition-size", default=100, type=int, help="Partition size [mb]") - table_create_parser.add_argument( - "-c", + parser.add_argument( "--initial-data-count", default=1000, type=int, - help="Total number of records to generate", + help="Number of rows to pre-fill", ) - - table_create_parser.add_argument( - "--write-timeout", - default=20000, - type=int, - help="Write requests execution timeout [ms]", + parser.add_argument( + "--batch-size", default=100, type=int, help="Rows per insert batch" ) - - table_create_parser.add_argument( - "--batch-size", - default=100, - type=int, - help="Number of new records in each create request", + parser.add_argument( + "--threads", default=10, type=int, help="Threads for initial data fill" ) - table_create_parser.add_argument("--threads", default=10, type=int, help="Number of threads to use") - -def make_table_run_parser(subparsers): - table_run_parser = subparsers.add_parser("table-run", help="Run table SLO workload") - add_common_options(table_run_parser) - - table_run_parser.add_argument("--read-rps", default=100, type=int, help="Read request rps") - table_run_parser.add_argument( - "--read-timeout", - default=10000, - type=int, - help="Read requests execution timeout [ms]", + # Run params + parser.add_argument("--read-rps", default=100, type=int, help="Read RPS limit") + parser.add_argument( + "--read-timeout", default=10000, type=int, help="Read timeout [ms]" ) - - table_run_parser.add_argument("--write-rps", default=10, type=int, help="Write request rps") - table_run_parser.add_argument( - "--write-timeout", - default=20000, - type=int, - help="Write requests execution timeout [ms]", + parser.add_argument("--write-rps", default=10, type=int, help="Write RPS limit") + parser.add_argument( + "--write-timeout", default=20000, type=int, help="Write timeout [ms]" ) - - table_run_parser.add_argument("--time", default=10, type=int, help="Time to run in seconds") - table_run_parser.add_argument( - "--shutdown-time", - default=10, - type=int, - help="Graceful shutdown time in seconds", + parser.add_argument( + "--read-threads", default=8, type=int, help="Read worker threads" ) - - table_run_parser.add_argument( - "--otlp-endpoint", - default="http://localhost:9090/api/v1/otlp/v1/metrics", - type=str, - help="Full Prometheus OTLP metrics endpoint (e.g. http://ydb-prometheus:9090/api/v1/otlp/v1/metrics). Empty to disable.", + parser.add_argument( + "--write-threads", default=4, type=int, help="Write worker threads" ) - table_run_parser.add_argument("--report-period", default=1000, type=int, help="Prometheus push period in [ms]") - - table_run_parser.add_argument("--read-threads", default=8, type=int, help="Number of threads to use for write") - table_run_parser.add_argument("--write-threads", default=4, type=int, help="Number of threads to use for read") - - -def make_table_cleanup_parser(subparsers): - table_cleanup_parser = subparsers.add_parser("table-cleanup", help="Drop tables") - add_common_options(table_cleanup_parser) - - -def make_topic_create_parser(subparsers): - topic_create_parser = subparsers.add_parser("topic-create", help="Create topic with consumer") - add_common_options(topic_create_parser) - - topic_create_parser.add_argument("--path", default="/local/slo_topic", type=str, help="Topic path") - topic_create_parser.add_argument("--consumer", default="slo_consumer", type=str, help="Topic consumer name") - topic_create_parser.add_argument("--partitions-count", default=1, type=int, help="Partition count") - - -def make_topic_run_parser(subparsers): - topic_parser = subparsers.add_parser("topic-run", help="Run topic SLO workload") - add_common_options(topic_parser) - - topic_parser.add_argument("--path", default="/local/slo_topic", type=str, help="Topic path") - topic_parser.add_argument("--consumer", default="slo_consumer", type=str, help="Topic consumer name") - topic_parser.add_argument("--partitions-count", default=1, type=int, help="Partition count") - topic_parser.add_argument("--read-rps", default=100, type=int, help="Topic read request rps") - topic_parser.add_argument("--read-timeout", default=5000, type=int, help="Topic read timeout [ms]") - topic_parser.add_argument("--write-rps", default=100, type=int, help="Topic write request rps") - topic_parser.add_argument("--write-timeout", default=5000, type=int, help="Topic write timeout [ms]") - topic_parser.add_argument( - "--read-threads", - default=1, - type=int, - help="Number of threads for topic reading", + parser.add_argument( + "--shutdown-time", default=10, type=int, help="Graceful shutdown time [s]" ) - topic_parser.add_argument( - "--write-threads", - default=1, - type=int, - help="Number of threads for topic writing", + parser.add_argument( + "--report-period", default=1000, type=int, help="Metrics push period [ms]" ) - topic_parser.add_argument("--message-size", default=100, type=int, help="Topic message size in bytes") - topic_parser.add_argument("--time", default=10, type=int, help="Time to run in seconds") - topic_parser.add_argument( - "--shutdown-time", - default=10, - type=int, - help="Graceful shutdown time in seconds", + # Topic params (used when WORKLOAD_NAME contains 'topic') + parser.add_argument("--topic-path", default="/local/slo_topic", help="Topic path") + parser.add_argument( + "--topic-consumer", default="slo_consumer", help="Topic consumer name" ) - topic_parser.add_argument( - "--otlp-endpoint", - default="http://localhost:9090/api/v1/otlp/v1/metrics", - type=str, - help="Full Prometheus OTLP metrics endpoint (e.g. http://ydb-prometheus:9090/api/v1/otlp/v1/metrics). Empty to disable.", + parser.add_argument( + "--topic-partitions", default=1, type=int, help="Topic partition count" ) - topic_parser.add_argument("--report-period", default=1000, type=int, help="Prometheus push period in [ms]") - - -def make_topic_cleanup_parser(subparsers): - topic_cleanup_parser = subparsers.add_parser("topic-cleanup", help="Drop topic") - add_common_options(topic_cleanup_parser) - - topic_cleanup_parser.add_argument("--path", default="/local/slo_topic", type=str, help="Topic path") - - -def get_root_parser(): - parser = argparse.ArgumentParser( - formatter_class=argparse.RawDescriptionHelpFormatter, - description="YDB Python SLO application", - ) - - subparsers = parser.add_subparsers( - title="subcommands", - dest="subcommand", - help="List of subcommands", + parser.add_argument( + "--message-size", default=100, type=int, help="Topic message size [bytes]" ) - make_table_create_parser(subparsers) - make_table_run_parser(subparsers) - make_table_cleanup_parser(subparsers) + args = parser.parse_args() - make_topic_create_parser(subparsers) - make_topic_run_parser(subparsers) - make_topic_cleanup_parser(subparsers) + # Inject env-var-driven config as attributes so the rest of the code can use args.* uniformly + args.endpoint = os.environ.get("YDB_ENDPOINT", "grpc://localhost:2136") + args.db = os.environ.get("YDB_DATABASE", "/local") + args.time = int(os.environ.get("WORKLOAD_DURATION", "600")) - return parser + # Aliases used by topic runner + args.path = args.topic_path + args.consumer = args.topic_consumer + args.partitions_count = args.topic_partitions - -def parse_options(): - parser = get_root_parser() - return parser.parse_args() + return args diff --git a/tests/slo/src/pyrightconfig.json b/tests/slo/src/pyrightconfig.json new file mode 100644 index 00000000..e2cad99c --- /dev/null +++ b/tests/slo/src/pyrightconfig.json @@ -0,0 +1,4 @@ +{ + "pythonVersion": "3.9", + "extraPaths": ["."] +} diff --git a/tests/slo/src/root_runner.py b/tests/slo/src/root_runner.py index 20589c14..73d92bb3 100644 --- a/tests/slo/src/root_runner.py +++ b/tests/slo/src/root_runner.py @@ -1,94 +1,56 @@ import asyncio -import ydb -import ydb.aio import logging -from typing import Dict -from runners.topic_runner import TopicRunner +from core.metrics import WORKLOAD_NAME from runners.table_runner import TableRunner -from runners.base import BaseRunner - -logger = logging.getLogger(__name__) - - -class SLORunner: - def __init__(self): - self.runners: Dict[str, type(BaseRunner)] = {} - - def register_runner(self, prefix: str, runner_cls: type(BaseRunner)): - self.runners[prefix] = runner_cls +from runners.topic_runner import TopicRunner - def run_command(self, args): - subcommand_parts = args.subcommand.split("-", 1) - if len(subcommand_parts) < 2: - raise ValueError(f"Invalid subcommand format: {args.subcommand}. Expected 'prefix-command'") +import ydb +import ydb.aio - prefix, command = subcommand_parts - if prefix not in self.runners: - raise ValueError(f"Unknown prefix: {prefix}. Available: {list(self.runners.keys())}") +logger = logging.getLogger(__name__) - runner_instance = self.runners[prefix]() +_RUNNERS = { + "sync-table": TableRunner, + "sync-query": TableRunner, + "topic": TopicRunner, +} - # Check if async mode is requested and command is 'run' - if getattr(args, "async", False) and command == "run": - asyncio.run(self._run_async_command(args, runner_instance, command)) - else: - self._run_sync_command(args, runner_instance, command) - def _run_sync_command(self, args, runner_instance, command): - """Run command in synchronous mode""" - driver_config = ydb.DriverConfig( - args.endpoint, - database=args.db, - grpc_keep_alive_timeout=5000, +def _get_runner(): + runner_cls = _RUNNERS.get(WORKLOAD_NAME) + if runner_cls is None: + raise ValueError( + f"Unknown WORKLOAD_NAME: {WORKLOAD_NAME!r}. Known: {list(_RUNNERS)}" ) + return runner_cls() - with ydb.Driver(driver_config) as driver: - driver.wait(timeout=300) - try: - runner_instance.set_driver(driver) - if command == "create": - runner_instance.create(args) - elif command == "run": - runner_instance.run(args) - elif command == "cleanup": - runner_instance.cleanup(args) - else: - raise RuntimeError(f"Unknown command {command} for prefix {runner_instance.prefix}") - except BaseException: - logger.exception("Something went wrong") - raise - finally: - driver.stop(timeout=getattr(args, "shutdown_time", 10)) - async def _run_async_command(self, args, runner_instance, command): - """Run command in asynchronous mode""" - driver_config = ydb.DriverConfig( - args.endpoint, - database=args.db, - grpc_keep_alive_timeout=5000, - ) +def run_all(args): + """Create infrastructure, run the workload, then clean up — all in one go.""" + runner = _get_runner() - async with ydb.aio.Driver(driver_config) as driver: - await driver.wait(timeout=300) - try: - runner_instance.set_driver(driver) - if command == "run": - await runner_instance.run_async(args) - else: - raise RuntimeError(f"Async mode only supports 'run' command, got '{command}'") - except BaseException: - logger.exception("Something went wrong in async mode") - raise + driver_config = ydb.DriverConfig( + args.endpoint, + database=args.db, + grpc_keep_alive_timeout=5000, + ) + with ydb.Driver(driver_config) as driver: + driver.wait(timeout=300) + runner.set_driver(driver) -def create_runner() -> SLORunner: - runner = SLORunner() - runner.register_runner("table", TableRunner) - runner.register_runner("topic", TopicRunner) - return runner + try: + logger.info("[%s] Creating resources", WORKLOAD_NAME) + runner.create(args) + logger.info("[%s] Running workload for %d s", WORKLOAD_NAME, args.time) + runner.run(args) + finally: + logger.info("[%s] Cleaning up resources", WORKLOAD_NAME) + try: + runner.cleanup(args) + except Exception: + logger.exception("Cleanup failed — ignoring") -def run_from_args(args): - runner = create_runner() - runner.run_command(args) + driver.stop(timeout=args.shutdown_time) diff --git a/tests/slo/src/runners/table_runner.py b/tests/slo/src/runners/table_runner.py index fb6f00dc..f5643912 100644 --- a/tests/slo/src/runners/table_runner.py +++ b/tests/slo/src/runners/table_runner.py @@ -88,7 +88,7 @@ def transaction(session: ydb.table.Session): self.logger.info("Table creation completed") def run(self, args): - metrics = create_metrics(args.otlp_endpoint) + metrics = create_metrics() self.logger.info("Starting table SLO tests") diff --git a/tests/slo/src/runners/topic_runner.py b/tests/slo/src/runners/topic_runner.py index c9a8bdaa..e8de9a93 100644 --- a/tests/slo/src/runners/topic_runner.py +++ b/tests/slo/src/runners/topic_runner.py @@ -70,7 +70,7 @@ def create(self, args): def run(self, args): assert self.driver is not None, "Driver is not initialized. Call set_driver() before run()." - metrics = create_metrics(args.otlp_endpoint) + metrics = create_metrics() self.logger.info("Starting topic SLO tests") @@ -85,7 +85,7 @@ def run(self, args): async def run_async(self, args): """Async version of topic SLO tests using ydb.aio.Driver""" assert self.driver is not None, "Driver is not initialized. Call set_driver() before run_async()." - metrics = create_metrics(args.otlp_endpoint) + metrics = create_metrics() self.logger.info("Starting async topic SLO tests") From 520747975f74ee749b61d0a7378c8896196532ed Mon Sep 17 00:00:00 2001 From: Vladislav Polyakov Date: Thu, 2 Apr 2026 14:12:55 +0300 Subject: [PATCH 02/13] Configure Ruff and reformat code to 120 chars Add Ruff configuration to match Black's line length, then reformat code accordingly. Remove unnecessary blank line in __main__.py. --- pyproject.toml | 3 +++ tests/slo/src/__main__.py | 1 - tests/slo/src/core/metrics.py | 12 +++------ tests/slo/src/options.py | 48 +++++++++-------------------------- tests/slo/src/root_runner.py | 4 +-- 5 files changed, 19 insertions(+), 49 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 13ec71e7..0bda93e7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,9 @@ [tool.ty.environment] extra-paths = ["tests/slo/src"] +[tool.ruff] +line-length = 120 + [tool.black] line-length = 120 diff --git a/tests/slo/src/__main__.py b/tests/slo/src/__main__.py index 96eb186c..eb5504e0 100644 --- a/tests/slo/src/__main__.py +++ b/tests/slo/src/__main__.py @@ -4,7 +4,6 @@ from options import parse_options from root_runner import run_all - if __name__ == "__main__": args = parse_options() gc.disable() diff --git a/tests/slo/src/core/metrics.py b/tests/slo/src/core/metrics.py index cacd73ea..3a10b342 100644 --- a/tests/slo/src/core/metrics.py +++ b/tests/slo/src/core/metrics.py @@ -132,9 +132,7 @@ def __init__(self): resource = Resource.create( { "service.name": f"workload-{WORKLOAD_NAME}", - "service.instance.id": environ.get( - "SLO_INSTANCE_ID", f"{WORKLOAD_REF}-{WORKLOAD_NAME}" - ), + "service.instance.id": environ.get("SLO_INSTANCE_ID", f"{WORKLOAD_REF}-{WORKLOAD_NAME}"), "ref": WORKLOAD_REF, "sdk": "ydb-python-sdk", "sdk_version": version("ydb"), @@ -281,9 +279,7 @@ def create_metrics() -> BaseMetrics: DummyMetrics is returned. """ endpoint = ( - environ.get("OTEL_EXPORTER_OTLP_METRICS_ENDPOINT") - or environ.get("OTEL_EXPORTER_OTLP_ENDPOINT") - or "" + environ.get("OTEL_EXPORTER_OTLP_METRICS_ENDPOINT") or environ.get("OTEL_EXPORTER_OTLP_ENDPOINT") or "" ).strip() if not endpoint: @@ -294,7 +290,5 @@ def create_metrics() -> BaseMetrics: try: return OtlpMetrics() except Exception: - logger.exception( - "Failed to initialize OTLP metrics — falling back to DummyMetrics" - ) + logger.exception("Failed to initialize OTLP metrics — falling back to DummyMetrics") return DummyMetrics() diff --git a/tests/slo/src/options.py b/tests/slo/src/options.py index 11094eb7..61a9e6ab 100644 --- a/tests/slo/src/options.py +++ b/tests/slo/src/options.py @@ -21,55 +21,31 @@ def parse_options(): parser.add_argument("--table-name", default="key_value", help="Table name") parser.add_argument("--min-partitions-count", default=6, type=int) parser.add_argument("--max-partitions-count", default=1000, type=int) - parser.add_argument( - "--partition-size", default=100, type=int, help="Partition size [mb]" - ) + parser.add_argument("--partition-size", default=100, type=int, help="Partition size [mb]") parser.add_argument( "--initial-data-count", default=1000, type=int, help="Number of rows to pre-fill", ) - parser.add_argument( - "--batch-size", default=100, type=int, help="Rows per insert batch" - ) - parser.add_argument( - "--threads", default=10, type=int, help="Threads for initial data fill" - ) + parser.add_argument("--batch-size", default=100, type=int, help="Rows per insert batch") + parser.add_argument("--threads", default=10, type=int, help="Threads for initial data fill") # Run params parser.add_argument("--read-rps", default=100, type=int, help="Read RPS limit") - parser.add_argument( - "--read-timeout", default=10000, type=int, help="Read timeout [ms]" - ) + parser.add_argument("--read-timeout", default=10000, type=int, help="Read timeout [ms]") parser.add_argument("--write-rps", default=10, type=int, help="Write RPS limit") - parser.add_argument( - "--write-timeout", default=20000, type=int, help="Write timeout [ms]" - ) - parser.add_argument( - "--read-threads", default=8, type=int, help="Read worker threads" - ) - parser.add_argument( - "--write-threads", default=4, type=int, help="Write worker threads" - ) - parser.add_argument( - "--shutdown-time", default=10, type=int, help="Graceful shutdown time [s]" - ) - parser.add_argument( - "--report-period", default=1000, type=int, help="Metrics push period [ms]" - ) + parser.add_argument("--write-timeout", default=20000, type=int, help="Write timeout [ms]") + parser.add_argument("--read-threads", default=8, type=int, help="Read worker threads") + parser.add_argument("--write-threads", default=4, type=int, help="Write worker threads") + parser.add_argument("--shutdown-time", default=10, type=int, help="Graceful shutdown time [s]") + parser.add_argument("--report-period", default=1000, type=int, help="Metrics push period [ms]") # Topic params (used when WORKLOAD_NAME contains 'topic') parser.add_argument("--topic-path", default="/local/slo_topic", help="Topic path") - parser.add_argument( - "--topic-consumer", default="slo_consumer", help="Topic consumer name" - ) - parser.add_argument( - "--topic-partitions", default=1, type=int, help="Topic partition count" - ) - parser.add_argument( - "--message-size", default=100, type=int, help="Topic message size [bytes]" - ) + parser.add_argument("--topic-consumer", default="slo_consumer", help="Topic consumer name") + parser.add_argument("--topic-partitions", default=1, type=int, help="Topic partition count") + parser.add_argument("--message-size", default=100, type=int, help="Topic message size [bytes]") args = parser.parse_args() diff --git a/tests/slo/src/root_runner.py b/tests/slo/src/root_runner.py index 73d92bb3..2270383d 100644 --- a/tests/slo/src/root_runner.py +++ b/tests/slo/src/root_runner.py @@ -20,9 +20,7 @@ def _get_runner(): runner_cls = _RUNNERS.get(WORKLOAD_NAME) if runner_cls is None: - raise ValueError( - f"Unknown WORKLOAD_NAME: {WORKLOAD_NAME!r}. Known: {list(_RUNNERS)}" - ) + raise ValueError(f"Unknown WORKLOAD_NAME: {WORKLOAD_NAME!r}. Known: {list(_RUNNERS)}") return runner_cls() From baacda34c517b0e2d2154c3f0c36feeebef26387 Mon Sep 17 00:00:00 2001 From: Vladislav Polyakov Date: Thu, 2 Apr 2026 14:14:29 +0300 Subject: [PATCH 03/13] Use SDK name in SLO workflow concurrency group Replace matrix.workload with matrix.sdk.name to properly identify concurrent jobs by SDK variant. --- .github/workflows/slo.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/slo.yml b/.github/workflows/slo.yml index 465e173a..fb8e8455 100644 --- a/.github/workflows/slo.yml +++ b/.github/workflows/slo.yml @@ -49,7 +49,7 @@ jobs: command: "--read-rps ${{ inputs.slo_workload_read_max_rps || '1000' }} --write-rps ${{ inputs.slo_workload_write_max_rps || '100' }}" concurrency: - group: slo-${{ github.ref }}-${{ matrix.workload }} + group: slo-${{ github.ref }}-${{ matrix.sdk.name }} cancel-in-progress: true steps: From a3013cd75941bd3bcc525081e4c12b8bec35ddda Mon Sep 17 00:00:00 2001 From: Vladislav Polyakov Date: Thu, 2 Apr 2026 14:23:46 +0300 Subject: [PATCH 04/13] Pin hdrhistogram and improve argument parsing Add type hint to _run_metric_job return type. Move environment variable fallback logic from post-parse injection into argument parser definition using nargs="?" and default values. Remove unused asyncio import. Pin hdrhistogram and improve argument parsing - Pin hdrhistogram to v0.10.3 for reproducibility - Move YDB and workload config to proper argparse arguments with environment variable fallbacks instead of post-parsing assignment - Add type hint to _run_metric_job return type - Remove unused asyncio import --- tests/slo/requirements.txt | 2 +- tests/slo/src/jobs/base.py | 3 ++- tests/slo/src/options.py | 26 +++++++++++++++++++++----- tests/slo/src/root_runner.py | 1 - 4 files changed, 24 insertions(+), 8 deletions(-) diff --git a/tests/slo/requirements.txt b/tests/slo/requirements.txt index 01f894a1..d9021cc5 100644 --- a/tests/slo/requirements.txt +++ b/tests/slo/requirements.txt @@ -1,6 +1,6 @@ requests==2.33.0 aiolimiter==1.1.0 -hdrhistogram +hdrhistogram==0.10.3 # OpenTelemetry (OTLP/HTTP exporter) # NOTE: OpenTelemetry 1.39.1 requires Python >= 3.9. diff --git a/tests/slo/src/jobs/base.py b/tests/slo/src/jobs/base.py index 4bf1a529..9cc06598 100644 --- a/tests/slo/src/jobs/base.py +++ b/tests/slo/src/jobs/base.py @@ -2,6 +2,7 @@ import threading import time from abc import ABC, abstractmethod +from typing import Any import ydb @@ -53,7 +54,7 @@ def __init__(self, driver, args, metrics): def run_tests(self): pass - def _run_metric_job(self): + def _run_metric_job(self) -> list[Any]: report_period_ms = max(1, int(self.args.report_period)) limiter = SyncRateLimiter(min_interval_s=report_period_ms / 1000.0) diff --git a/tests/slo/src/options.py b/tests/slo/src/options.py index 61a9e6ab..883cfbf6 100644 --- a/tests/slo/src/options.py +++ b/tests/slo/src/options.py @@ -15,6 +15,27 @@ def parse_options(): formatter_class=argparse.RawDescriptionHelpFormatter, ) + # Positional args with env var fallback — pass explicitly for local runs, + # or rely on env vars (YDB_ENDPOINT / YDB_DATABASE / WORKLOAD_DURATION) in Docker/CI. + parser.add_argument( + "endpoint", + nargs="?", + default=os.environ.get("YDB_ENDPOINT", "grpc://localhost:2136"), + help="YDB endpoint (default: $YDB_ENDPOINT)", + ) + parser.add_argument( + "db", + nargs="?", + default=os.environ.get("YDB_DATABASE", "/local"), + help="YDB database (default: $YDB_DATABASE)", + ) + parser.add_argument( + "--time", + default=int(os.environ.get("WORKLOAD_DURATION", "600")), + type=int, + help="Workload duration in seconds (default: $WORKLOAD_DURATION)", + ) + parser.add_argument("--debug", action="store_true", help="Enable debug logging") # Table params @@ -49,11 +70,6 @@ def parse_options(): args = parser.parse_args() - # Inject env-var-driven config as attributes so the rest of the code can use args.* uniformly - args.endpoint = os.environ.get("YDB_ENDPOINT", "grpc://localhost:2136") - args.db = os.environ.get("YDB_DATABASE", "/local") - args.time = int(os.environ.get("WORKLOAD_DURATION", "600")) - # Aliases used by topic runner args.path = args.topic_path args.consumer = args.topic_consumer diff --git a/tests/slo/src/root_runner.py b/tests/slo/src/root_runner.py index 2270383d..c3d75fb2 100644 --- a/tests/slo/src/root_runner.py +++ b/tests/slo/src/root_runner.py @@ -1,4 +1,3 @@ -import asyncio import logging from core.metrics import WORKLOAD_NAME From 27bdeab0016a2c786ad890c5523244f085861ad8 Mon Sep 17 00:00:00 2001 From: Vladislav Polyakov Date: Thu, 2 Apr 2026 14:50:23 +0300 Subject: [PATCH 05/13] Update SLO workflow baseline image to use current version. --- .github/workflows/slo.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/slo.yml b/.github/workflows/slo.yml index fb8e8455..e8b68a79 100644 --- a/.github/workflows/slo.yml +++ b/.github/workflows/slo.yml @@ -151,7 +151,7 @@ jobs: workload_current_image: ydb-app-current workload_current_command: ${{ matrix.sdk.command }} workload_baseline_ref: ${{ steps.baseline.outputs.ref }} - workload_baseline_image: ydb-app-baseline + workload_baseline_image: ydb-app-current workload_baseline_command: ${{ matrix.sdk.command }} ydb-slo-action-report: From a7754b189457689b16d1418a04c9eca2da0c2766 Mon Sep 17 00:00:00 2001 From: Vladislav Polyakov Date: Mon, 13 Apr 2026 08:55:58 +0300 Subject: [PATCH 06/13] Upgrade checkout action to v6 and remove redundant Docker version check --- .github/workflows/slo.yml | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/.github/workflows/slo.yml b/.github/workflows/slo.yml index e8b68a79..fd7ec605 100644 --- a/.github/workflows/slo.yml +++ b/.github/workflows/slo.yml @@ -82,7 +82,7 @@ jobs: docker compose version - name: Checkout current version - uses: actions/checkout@v5 + uses: actions/checkout@v6 with: path: current fetch-depth: 0 @@ -116,16 +116,10 @@ jobs: echo "ref=$BASELINE_REF" >> $GITHUB_OUTPUT - name: Checkout baseline version - uses: actions/checkout@v5 + uses: actions/checkout@v6 with: ref: ${{ steps.baseline.outputs.sha }} path: baseline - fetch-depth: 1 - - - name: Show Docker versions - run: | - docker --version - docker compose version - name: Build workload images (current + baseline) run: | From a96e08e38972bc72ba4dad43874421b78cf4eda3 Mon Sep 17 00:00:00 2001 From: Vladislav Polyakov Date: Mon, 13 Apr 2026 11:07:30 +0300 Subject: [PATCH 07/13] Refactor SLO metrics to accept config via arguments Remove global environment variable reads from metrics module and pass workload_name, workload_ref, and otlp_endpoint as explicit --- tests/slo/src/core/metrics.py | 57 +++++++++++---------------- tests/slo/src/jobs/table_jobs.py | 4 +- tests/slo/src/options.py | 29 +++++++++----- tests/slo/src/root_runner.py | 16 ++++---- tests/slo/src/runners/table_runner.py | 2 +- tests/slo/src/runners/topic_runner.py | 4 +- 6 files changed, 55 insertions(+), 57 deletions(-) diff --git a/tests/slo/src/core/metrics.py b/tests/slo/src/core/metrics.py index 3a10b342..fa151c2d 100644 --- a/tests/slo/src/core/metrics.py +++ b/tests/slo/src/core/metrics.py @@ -13,9 +13,6 @@ OP_TYPE_READ, OP_TYPE_WRITE = "read", "write" OP_STATUS_SUCCESS, OP_STATUS_FAILURE = "success", "err" -WORKLOAD_REF = environ.get("WORKLOAD_REF", environ.get("REF", "main")) -WORKLOAD_NAME = environ.get("WORKLOAD_NAME", environ.get("WORKLOAD", "sync-query")) - logger = logging.getLogger(__name__) @@ -121,7 +118,7 @@ class OtlpMetrics(BaseMetrics): _HDR_MAX_US = 60_000_000 _HDR_SIG_FIGS = 3 - def __init__(self): + def __init__(self, workload_name: str, workload_ref: str): from opentelemetry.exporter.otlp.proto.http.metric_exporter import ( OTLPMetricExporter, ) @@ -129,14 +126,17 @@ def __init__(self): from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader from opentelemetry.sdk.resources import Resource + self._workload_name = workload_name + self._workload_ref = workload_ref + resource = Resource.create( { - "service.name": f"workload-{WORKLOAD_NAME}", - "service.instance.id": environ.get("SLO_INSTANCE_ID", f"{WORKLOAD_REF}-{WORKLOAD_NAME}"), - "ref": WORKLOAD_REF, + "service.name": f"workload-{workload_name}", + "service.instance.id": environ.get("SLO_INSTANCE_ID", f"{workload_ref}-{workload_name}"), + "ref": workload_ref, "sdk": "ydb-python-sdk", "sdk_version": version("ydb"), - "workload": WORKLOAD_NAME, + "workload": workload_name, "workload_version": "0.0.0", } ) @@ -149,22 +149,14 @@ def __init__(self): self._provider = MeterProvider(resource=resource, metric_readers=[reader]) self._meter = self._provider.get_meter("ydb-slo") - # Counters + # Counters — names/labels match metrics.yaml in ydb-slo-action self._errors = self._meter.create_counter( name="sdk.errors.total", description="Total number of errors encountered, categorized by error type.", ) self._operations_total = self._meter.create_counter( name="sdk.operations.total", - description="Total number of operations attempted by the SDK.", - ) - self._operations_success_total = self._meter.create_counter( - name="sdk.operations.success.total", - description="Total number of successful operations.", - ) - self._operations_failure_total = self._meter.create_counter( - name="sdk.operations.failure.total", - description="Total number of failed operations.", + description="Total number of operations, labeled by operation_status (success/err).", ) self._retry_attempts_total = self._meter.create_counter( name="sdk.retry.attempts.total", @@ -212,7 +204,7 @@ def start(self, labels) -> float: labels_t = _normalize_labels(labels) self._pending.add( 1, - attributes={"ref": WORKLOAD_REF, "operation_type": labels_t[0]}, + attributes={"ref": self._workload_ref, "operation_type": labels_t[0]}, ) return time.time() @@ -229,20 +221,17 @@ def stop( op_type = labels_t[0] op_status = OP_STATUS_SUCCESS if error is None else OP_STATUS_FAILURE - base_attrs = {"ref": WORKLOAD_REF, "operation_type": op_type} + base_attrs = {"ref": self._workload_ref, "operation_type": op_type} self._retry_attempts_total.add(int(attempts), attributes=base_attrs) self._pending.add(-1, attributes=base_attrs) - self._operations_total.add(1, attributes=base_attrs) + self._operations_total.add(1, attributes={**base_attrs, "operation_status": op_status}) if error is not None: self._errors.add( 1, attributes={**base_attrs, "error_type": type(error).__name__}, ) - self._operations_failure_total.add(1, attributes=base_attrs) - else: - self._operations_success_total.add(1, attributes=base_attrs) with self._hdr_lock: self._hdr_for((op_type, op_status)).record_value(duration_us) @@ -253,7 +242,7 @@ def push(self) -> None: for (op_type, op_status), hist in snapshot.items(): attrs = { - "ref": WORKLOAD_REF, + "ref": self._workload_ref, "operation_type": op_type, "operation_status": op_status, } @@ -270,25 +259,25 @@ def reset(self) -> None: self.push() -def create_metrics() -> BaseMetrics: +def create_metrics(args) -> BaseMetrics: """ Factory used by SLO runners. - Metrics are enabled when OTEL_EXPORTER_OTLP_METRICS_ENDPOINT (or the generic - OTEL_EXPORTER_OTLP_ENDPOINT) is set in the environment; otherwise a no-op - DummyMetrics is returned. + Uses args.otlp_endpoint, args.workload_name, args.workload_ref from parsed CLI arguments. + If the endpoint is empty, returns a no-op DummyMetrics. """ - endpoint = ( - environ.get("OTEL_EXPORTER_OTLP_METRICS_ENDPOINT") or environ.get("OTEL_EXPORTER_OTLP_ENDPOINT") or "" - ).strip() + endpoint = (args.otlp_endpoint or "").strip() if not endpoint: logger.info("OTLP endpoint not configured — metrics disabled") return DummyMetrics() - logger.info("Creating OTLP metrics exporter (endpoint from env)") + environ.setdefault("OTEL_EXPORTER_OTLP_PROTOCOL", "http/protobuf") + environ["OTEL_EXPORTER_OTLP_ENDPOINT"] = endpoint + + logger.info("Creating OTLP metrics exporter (endpoint: %s)", endpoint) try: - return OtlpMetrics() + return OtlpMetrics(args.workload_name, args.workload_ref) except Exception: logger.exception("Failed to initialize OTLP metrics — falling back to DummyMetrics") return DummyMetrics() diff --git a/tests/slo/src/jobs/table_jobs.py b/tests/slo/src/jobs/table_jobs.py index 8e4cf6bd..144fd686 100644 --- a/tests/slo/src/jobs/table_jobs.py +++ b/tests/slo/src/jobs/table_jobs.py @@ -92,9 +92,7 @@ def __init__(self, driver, args, metrics, table_name, max_id): self.table_name = table_name self.max_id = max_id - from core.metrics import WORKLOAD_NAME - - self.workload_type = WORKLOAD_NAME + self.workload_type = args.workload_name def run_tests(self): if self.workload_type == "sync-table": diff --git a/tests/slo/src/options.py b/tests/slo/src/options.py index 883cfbf6..c9f340c6 100644 --- a/tests/slo/src/options.py +++ b/tests/slo/src/options.py @@ -4,19 +4,15 @@ def parse_options(): """ - Parse CLI arguments (passed via Docker CMD section). - Connection, duration, and workload identity are configured via environment variables: - YDB_ENDPOINT — YDB endpoint (e.g. grpc://ydb:2136) - YDB_DATABASE — YDB database path (e.g. /Root/testdb) - WORKLOAD_DURATION — total run duration in seconds (default: 600) + Parse CLI arguments. + + Every flag supports a fallback chain: CLI arg > environment variable > hardcoded default. """ parser = argparse.ArgumentParser( description="YDB Python SLO workload", formatter_class=argparse.RawDescriptionHelpFormatter, ) - # Positional args with env var fallback — pass explicitly for local runs, - # or rely on env vars (YDB_ENDPOINT / YDB_DATABASE / WORKLOAD_DURATION) in Docker/CI. parser.add_argument( "endpoint", nargs="?", @@ -33,7 +29,22 @@ def parse_options(): "--time", default=int(os.environ.get("WORKLOAD_DURATION", "600")), type=int, - help="Workload duration in seconds (default: $WORKLOAD_DURATION)", + help="Workload duration in seconds (default: $WORKLOAD_DURATION or 600)", + ) + parser.add_argument( + "--workload-name", + default=os.environ.get("WORKLOAD_NAME", os.environ.get("WORKLOAD", "sync-query")), + help="Workload type: sync-table, sync-query, topic (default: $WORKLOAD_NAME or sync-query)", + ) + parser.add_argument( + "--workload-ref", + default=os.environ.get("WORKLOAD_REF", os.environ.get("REF", "main")), + help="Reference label for metrics (default: $WORKLOAD_REF or main)", + ) + parser.add_argument( + "--otlp-endpoint", + default=os.environ.get("OTEL_EXPORTER_OTLP_ENDPOINT", "http://localhost:9090/api/v1/otlp"), + help="OTLP endpoint (default: $OTEL_EXPORTER_OTLP_ENDPOINT or http://localhost:9090/api/v1/otlp)", ) parser.add_argument("--debug", action="store_true", help="Enable debug logging") @@ -62,7 +73,7 @@ def parse_options(): parser.add_argument("--shutdown-time", default=10, type=int, help="Graceful shutdown time [s]") parser.add_argument("--report-period", default=1000, type=int, help="Metrics push period [ms]") - # Topic params (used when WORKLOAD_NAME contains 'topic') + # Topic params (used when --workload-name is 'topic') parser.add_argument("--topic-path", default="/local/slo_topic", help="Topic path") parser.add_argument("--topic-consumer", default="slo_consumer", help="Topic consumer name") parser.add_argument("--topic-partitions", default=1, type=int, help="Topic partition count") diff --git a/tests/slo/src/root_runner.py b/tests/slo/src/root_runner.py index c3d75fb2..7a8e2060 100644 --- a/tests/slo/src/root_runner.py +++ b/tests/slo/src/root_runner.py @@ -1,6 +1,5 @@ import logging -from core.metrics import WORKLOAD_NAME from runners.table_runner import TableRunner from runners.topic_runner import TopicRunner @@ -16,16 +15,17 @@ } -def _get_runner(): - runner_cls = _RUNNERS.get(WORKLOAD_NAME) +def _get_runner(workload_name: str): + runner_cls = _RUNNERS.get(workload_name) if runner_cls is None: - raise ValueError(f"Unknown WORKLOAD_NAME: {WORKLOAD_NAME!r}. Known: {list(_RUNNERS)}") + raise ValueError(f"Unknown workload_name: {workload_name!r}. Known: {list(_RUNNERS)}") return runner_cls() def run_all(args): """Create infrastructure, run the workload, then clean up — all in one go.""" - runner = _get_runner() + workload_name = args.workload_name + runner = _get_runner(workload_name) driver_config = ydb.DriverConfig( args.endpoint, @@ -38,13 +38,13 @@ def run_all(args): runner.set_driver(driver) try: - logger.info("[%s] Creating resources", WORKLOAD_NAME) + logger.info("[%s] Creating resources", workload_name) runner.create(args) - logger.info("[%s] Running workload for %d s", WORKLOAD_NAME, args.time) + logger.info("[%s] Running workload for %d s", workload_name, args.time) runner.run(args) finally: - logger.info("[%s] Cleaning up resources", WORKLOAD_NAME) + logger.info("[%s] Cleaning up resources", workload_name) try: runner.cleanup(args) except Exception: diff --git a/tests/slo/src/runners/table_runner.py b/tests/slo/src/runners/table_runner.py index f5643912..628c4802 100644 --- a/tests/slo/src/runners/table_runner.py +++ b/tests/slo/src/runners/table_runner.py @@ -88,7 +88,7 @@ def transaction(session: ydb.table.Session): self.logger.info("Table creation completed") def run(self, args): - metrics = create_metrics() + metrics = create_metrics(args) self.logger.info("Starting table SLO tests") diff --git a/tests/slo/src/runners/topic_runner.py b/tests/slo/src/runners/topic_runner.py index e8de9a93..a2800b42 100644 --- a/tests/slo/src/runners/topic_runner.py +++ b/tests/slo/src/runners/topic_runner.py @@ -70,7 +70,7 @@ def create(self, args): def run(self, args): assert self.driver is not None, "Driver is not initialized. Call set_driver() before run()." - metrics = create_metrics() + metrics = create_metrics(args) self.logger.info("Starting topic SLO tests") @@ -85,7 +85,7 @@ def run(self, args): async def run_async(self, args): """Async version of topic SLO tests using ydb.aio.Driver""" assert self.driver is not None, "Driver is not initialized. Call set_driver() before run_async()." - metrics = create_metrics() + metrics = create_metrics(args) self.logger.info("Starting async topic SLO tests") From d40af46a8e63910c313aba1cda38bae7b64f59d1 Mon Sep 17 00:00:00 2001 From: Vladislav Polyakov Date: Mon, 13 Apr 2026 11:21:37 +0300 Subject: [PATCH 08/13] Simplify SLO workload Dockerfile and documentation Remove multi-stage build and venv isolation. Inline all dependencies into a single stage for simpler maintenance. Update README with clearer CLI argument documentation, environment variable fallback chains, and reorganized configuration tables. Clarify workload invocation and metric collection. --- tests/slo/Dockerfile | 48 +++---- tests/slo/README.md | 324 +++++++++++++------------------------------ 2 files changed, 114 insertions(+), 258 deletions(-) diff --git a/tests/slo/Dockerfile b/tests/slo/Dockerfile index ecf1bca2..7c1990d5 100644 --- a/tests/slo/Dockerfile +++ b/tests/slo/Dockerfile @@ -2,46 +2,40 @@ # This image packages the Python SLO workload runner. # -# Connection and workload identity are configured via environment variables: -# YDB_ENDPOINT grpc://ydb:2136 -# YDB_DATABASE /Root/testdb -# WORKLOAD_DURATION 600 -# WORKLOAD_NAME sync-query | sync-table | topic -# WORKLOAD_REF -# OTEL_EXPORTER_OTLP_METRICS_ENDPOINT http://ydb-prometheus:9090/api/v1/otlp/v1/metrics +# All settings are CLI arguments with env-var fallback (CLI arg > env var > default): +# endpoint $YDB_ENDPOINT grpc://ydb:2136 +# db $YDB_DATABASE /Root/testdb +# --workload-name $WORKLOAD_NAME sync-query +# --workload-ref $WORKLOAD_REF / $REF main +# --otlp-endpoint $OTEL_EXPORTER_OTLP_ENDPOINT http://ydb-prometheus:9090/api/v1/otlp +# --time $WORKLOAD_DURATION 600 # -# Additional tuning flags (read/write RPS, timeouts, thread counts) are passed via -# the Docker CMD, e.g.: -# docker run --rm --env-file ... --read-rps 1000 --write-rps 100 +# Example: +# docker run --rm grpc://ydb:2136 /Root/testdb --workload-name topic --time 120 # # Notes: # - OpenTelemetry 1.39.x requires Python >= 3.9. -FROM python:3.11-slim AS build +FROM python:3.11-slim ENV PYTHONDONTWRITEBYTECODE=1 \ PYTHONUNBUFFERED=1 -WORKDIR /src -COPY . /src - -# Install runtime deps into an isolated venv so we can copy it into the final stage. -RUN python -m venv /opt/venv \ - && /opt/venv/bin/python -m pip install --no-cache-dir --upgrade pip \ - && /opt/venv/bin/pip install --no-cache-dir . \ - && /opt/venv/bin/pip install --no-cache-dir -r tests/slo/requirements.txt - +RUN apt-get update && apt-get install -y --no-install-recommends gcc libc6-dev && rm -rf /var/lib/apt/lists/* -FROM python:3.11-slim +WORKDIR /src -ENV PYTHONDONTWRITEBYTECODE=1 \ - PYTHONUNBUFFERED=1 \ - PATH="/opt/venv/bin:${PATH}" +# 1. YDB SDK +COPY setup.py pyproject.toml README.md requirements.txt ./ +COPY ydb/ ydb/ +RUN pip install --no-cache-dir . -WORKDIR /app +# 2. SLO deps +COPY tests/slo/requirements.txt tests/slo/requirements.txt +RUN pip install --no-cache-dir -r tests/slo/requirements.txt -COPY --from=build /opt/venv /opt/venv -COPY --from=build /src/tests/slo/src /app/tests/slo/src +# 3. Workload source +COPY tests/slo/src /src/tests/slo/src ENTRYPOINT ["python", "./tests/slo/src"] diff --git a/tests/slo/README.md b/tests/slo/README.md index c7b3e6b8..94b144f5 100644 --- a/tests/slo/README.md +++ b/tests/slo/README.md @@ -3,240 +3,111 @@ SLO is the type of test where app based on ydb-sdk is tested against falling YDB cluster nodes, tablets, network (that is possible situations for distributed DBs with hundreds of nodes) -### Workload types: +## Workload types -There are two workload types: +- **sync-query** — tests table operations via Query API (read/write) +- **sync-table** — tests table operations via Table API (read/write) +- **topic** — tests topic operations (publish/consume) -- **Table SLO** - tests table operations (read/write) -- **Topic SLO** - tests topic operations (publish/consume) +## Quick start -### Implementations: +The runner script handles everything: clones infra configs, builds the workload image, +starts YDB cluster + Prometheus via docker compose, runs the workload, and tears down on exit. -- `sync` -- `async` (now unimplemented) +```sh +cd tests/slo -### Usage: +# Run topic workload (default) +WORKLOAD_NAME=topic ./slo_runner.sh -Each workload type has 3 commands: - -**Table commands:** -- `table-create` - creates table in database -- `table-cleanup` - drops table in database -- `table-run` - runs table workload (read and write to table with set RPS) - -**Topic commands:** -- `topic-create` - creates topic with consumer in database -- `topic-cleanup` - drops topic in database -- `topic-run` - runs topic workload (publish and consume messages with set RPS) - -### Infra (Docker Compose) - -SLO workload is designed to run **inside the Docker Compose network** so it can reach YDB/Prometheus by service DNS names without publishing ports to localhost. - -Infra compose configs are maintained in a separate repo: -- https://github.com/ydb-platform/ydb-slo-action/tree/main/deploy - -Expected setup: -- Start infra using `deploy/compose.yml` from `ydb-slo-action` -- Infra network name should be `ydb_cluster` -- Workload container attaches to that network - -Example infra start (from the `ydb-slo-action` repo root): -- `docker compose -f deploy/compose.yml --profile telemetry up -d --build` - -### Runner script (`tests/slo/slo_runner.sh`) - -This repo contains a simple maintainer convenience runner that: -1) builds the workload image -2) runs a basic SLO workload inside `ydb_cluster` - -It is intentionally minimal (not a complete interface for all workload options). For full control, use the commands in `tests/slo/src/` directly. - -Example usage (infra must already be running): -- `NETWORK_NAME=ydb_cluster ./tests/slo/slo_runner.sh` - -Defaults used by the runner (override via env vars): -- `NETWORK_NAME=ydb_cluster` -- `YDB_ENDPOINT=grpc://ydb-storage-1:2136` (also commonly works as `grpc://storage-1:2136`) -- `YDB_DATABASE=/Root/testdb` -- `OTLP_ENDPOINT=http://prometheus:9090/api/v1/otlp/v1/metrics` - -### Run examples with all arguments: - -You can also configure the OTLP endpoint via environment variable: -- `OTLP_ENDPOINT=http://ydb-prometheus:9090/api/v1/otlp/v1/metrics` (full OTLP metrics endpoint) - -**Table examples:** - -table-create: -`python tests/slo/src/ table-create localhost:2136 /local -t tableName ---min-partitions-count 6 --max-partitions-count 1000 --partition-size 1 -с 1000 ---write-timeout 10000` - -table-cleanup: -`python tests/slo/src/ table-cleanup localhost:2136 /local -t tableName` - -table-run: -`python tests/slo/src/ table-run localhost:2136 /local -t tableName ---otlp-endpoint http://ydb-prometheus:9090/api/v1/otlp/v1/metrics ---report-period 250 ---read-rps 1000 --read-timeout 10000 ---write-rps 100 --write-timeout 10000 ---time 600 --shutdown-time 30` - -**Topic examples:** - -topic-create: -`python tests/slo/src/ topic-create localhost:2136 /local ---topic-path /local/slo_topic --topic-consumer slo_consumer` - -topic-cleanup: -`python tests/slo/src/ topic-cleanup localhost:2136 /local --topic-path /local/slo_topic` - -topic-run: -`python tests/slo/src/ topic-run localhost:2136 /local ---topic-path /local/slo_topic --topic-consumer slo_consumer ---otlp-endpoint http://ydb-prometheus:9090/api/v1/otlp/v1/metrics ---report-period 250 ---topic-write-rps 50 --topic-read-rps 100 ---topic-write-timeout 5000 --topic-read-timeout 3000 ---time 600 --shutdown-time 30` - -## Arguments for commands: - -### table-create -`python tests/slo/src/ table-create [options]` - -``` -Arguments: - endpoint YDB endpoint to connect to - db YDB database to connect to - -Options: - -t --table-name table name to create - - -p-min --min-partitions-count minimum amount of partitions in table - -p-max --max-partitions-count maximum amount of partitions in table - -p-size --partition-size partition size in mb - - -c --initial-data-count amount of initially created rows - - --write-timeout write timeout milliseconds - - --batch-size amount of new records in each create request - --threads number of threads to use - -``` - -### table-cleanup -`python tests/slo/src/ table-cleanup [options]` - -``` -Arguments: - endpoint YDB endpoint to connect to - db YDB database to connect to - -Options: - -t --table-name table name to create -``` - -### table-run -`python tests/slo/src/ table-run [options]` - -``` -Arguments: - endpoint YDB endpoint to connect to - db YDB database to connect to - -Options: - -t --table-name table name to create - - --otlp-endpoint Prometheus OTLP metrics endpoint (e.g. http://ydb-prometheus:9090/api/v1/otlp/v1/metrics) - --report-period metrics export period in milliseconds - - --read-rps read RPS - --read-timeout read timeout milliseconds - - --write-rps write RPS - --write-timeout write timeout milliseconds - - --time run time in seconds - --shutdown-time graceful shutdown time in seconds - - --read-threads number of threads to use for write requests - --write-threads number of threads to use for read requests +# Run table workload +WORKLOAD_NAME=sync-query ./slo_runner.sh ``` -### topic-create -`python tests/slo/src/ topic-create [options]` +Infra configs: https://github.com/ydb-platform/ydb-slo-action/tree/v2/deploy -``` -Arguments: - endpoint YDB endpoint to connect to - db YDB database to connect to - -Options: - --topic-path topic path to create - --topic-consumer consumer name - --topic-min-partitions minimum active partitions - --topic-max-partitions maximum active partitions - --topic-retention-hours retention period in hours -``` +### Configuration -### topic-cleanup -`python tests/slo/src/ topic-cleanup [options]` +Override defaults via environment variables: -``` -Arguments: - endpoint YDB endpoint to connect to - db YDB database to connect to +| Variable | Default | Description | +|----------|---------|-------------| +| `WORKLOAD_NAME` | `topic` | Workload type: `sync-query`, `sync-table`, `topic` | +| `RUN_TIME_SEC` | `120` | Workload run time in seconds | +| `WRITE_RPS` | `1` | Write RPS | +| `READ_THREADS` | `0` | Read worker threads | +| `WRITE_THREADS` | `1` | Write worker threads | +| `MESSAGE_SIZE` | `100` | Topic message size in bytes | +| `REPORT_PERIOD_MS` | `1000` | Metrics flush period in ms | +| `DEBUG` | `0` | Set to `1` to enable debug logging | +| `WORKLOAD_IMAGE` | `ydb-python-slo:local` | Docker image name for the workload | -Options: - --topic-path topic path to drop -``` +## CLI arguments -### topic-run -`python tests/slo/src/ topic-run [options]` +The workload runs as a single command that creates resources, runs the workload, and cleans up. +Every flag supports a fallback chain: **CLI arg > environment variable > hardcoded default**. ``` -Arguments: - endpoint YDB endpoint to connect to - db YDB database to connect to - -Options: - --topic-path topic path - --topic-consumer consumer name - - --otlp-endpoint Prometheus OTLP metrics endpoint (e.g. http://ydb-prometheus:9090/api/v1/otlp/v1/metrics) - --report-period metrics export period in milliseconds - - --topic-read-rps read RPS for topics - --topic-read-timeout read timeout milliseconds for topics - --topic-write-rps write RPS for topics - --topic-write-timeout write timeout milliseconds for topics - - --topic-message-size message size in bytes - --topic-read-threads number of threads to use for read requests - --topic-write-threads number of threads to use for write requests - - --time run time in seconds - --shutdown-time graceful shutdown time in seconds +python tests/slo/src [endpoint] [db] [options] ``` -## Authentication - -Workload using [auth-env](https://ydb.yandex-team.ru/docs/reference/ydb-sdk/recipes/auth-env) for authentication. +### Connection & identity + +| Argument | Env var | Default | Description | +|----------|---------|---------|-------------| +| `endpoint` (positional) | `YDB_ENDPOINT` | `grpc://ydb:2136` | YDB endpoint | +| `db` (positional) | `YDB_DATABASE` | `/Root/testdb` | YDB database path | +| `--workload-name` | `WORKLOAD_NAME` | `sync-query` | Workload type | +| `--workload-ref` | `WORKLOAD_REF` / `REF` | `main` | Reference label for metrics | +| `--otlp-endpoint` | `OTEL_EXPORTER_OTLP_ENDPOINT` | `http://ydb-prometheus:9090/api/v1/otlp` | OTLP endpoint | +| `--time` | `WORKLOAD_DURATION` | `600` | Workload duration in seconds | +| `--debug` | — | `false` | Enable debug logging | + +### Run parameters + +| Argument | Default | Description | +|----------|---------|-------------| +| `--read-rps` | `100` | Read RPS limit | +| `--read-timeout` | `10000` | Read timeout in ms | +| `--write-rps` | `10` | Write RPS limit | +| `--write-timeout` | `20000` | Write timeout in ms | +| `--read-threads` | `8` | Read worker threads | +| `--write-threads` | `4` | Write worker threads | +| `--shutdown-time` | `10` | Graceful shutdown time in seconds | +| `--report-period` | `1000` | Metrics push period in ms | + +### Table parameters + +| Argument | Default | Description | +|----------|---------|-------------| +| `--table-name` | `key_value` | Table name | +| `--min-partitions-count` | `6` | Minimum partition count | +| `--max-partitions-count` | `1000` | Maximum partition count | +| `--partition-size` | `100` | Partition size in MB | +| `--initial-data-count` | `1000` | Rows to pre-fill | +| `--batch-size` | `100` | Rows per insert batch | +| `--threads` | `10` | Threads for initial data fill | + +### Topic parameters + +| Argument | Default | Description | +|----------|---------|-------------| +| `--topic-path` | `/Root/testdb/slo_topic` | Topic path | +| `--topic-consumer` | `slo_consumer` | Consumer name | +| `--topic-partitions` | `1` | Topic partition count | +| `--message-size` | `100` | Message size in bytes | ## What's inside ### Table workload -When running `table-run` command, the program creates three jobs: `readJob`, `writeJob`, `metricsJob`. -- `readJob` reads rows from the table one by one with random identifiers generated by writeJob -- `writeJob` generates and inserts rows -- `metricsJob` periodically sends metrics to Prometheus +Creates three jobs: `readJob`, `writeJob`, `metricsJob`. -Table have these fields: +- `readJob` — reads rows from the table with random identifiers +- `writeJob` — generates and inserts rows +- `metricsJob` — periodically sends metrics to Prometheus + +Table schema: - `object_id Uint64` - `object_hash Uint64 Digest::NumericHash(id)` - `payload_str UTF8` @@ -246,28 +117,19 @@ Table have these fields: Primary key: `("object_hash", "object_id")` ### Topic workload -When running `topic-run` command, the program creates three jobs: `readJob`, `writeJob`, `metricsJob`. -- `readJob` reads messages from topic using TopicReader and commits offsets -- `writeJob` generates and publishes messages to topic using TopicWriter -- `metricsJob` periodically sends metrics to Prometheus +Creates three jobs: `readJob`, `writeJob`, `metricsJob`. -Messages contain: -- Sequential message ID -- Thread identifier -- Configurable payload size (padded with 'x' characters) +- `readJob` — reads messages from topic using TopicReader and commits offsets +- `writeJob` — generates and publishes messages using TopicWriter +- `metricsJob` — periodically sends metrics to Prometheus ## Collected metrics -- `oks` - amount of OK requests -- `not_oks` - amount of not OK requests -- `inflight` - amount of requests in flight -- `latency` - summary of latencies in ms -- `attempts` - summary of amount for request - -Metrics are collected for both table operations (`read`, `write`) and topic operations (`read`, `write`). - -> Note: with Prometheus OTLP receiver (no Pushgateway) counters/histograms are cumulative and cannot be reset to `0`. -> If you need clean separation between runs, use distinct `REF`/`WORKLOAD` (and/or `SLO_INSTANCE_ID`) so each run writes into separate time series. -## Look at metrics in grafana -You can get dashboard used in that test [here](https://github.com/ydb-platform/slo-tests/blob/main/k8s/helms/grafana.yaml#L69) - you will need to import json into grafana. +- `sdk_operations_total` — total operations (labeled by `operation_status`: success/err) +- `sdk_errors_total` — errors by type +- `sdk_pending_operations` — in-flight operations +- `sdk_retry_attempts_total` — retry attempts +- `sdk_operation_latency_p50_seconds` — P50 latency +- `sdk_operation_latency_p95_seconds` — P95 latency +- `sdk_operation_latency_p99_seconds` — P99 latency From 7580d7053b5ae64c7b157d30989dff0173914f76 Mon Sep 17 00:00:00 2001 From: Vladislav Polyakov Date: Mon, 13 Apr 2026 11:22:25 +0300 Subject: [PATCH 09/13] Refactor SLO runner to use docker compose from infra repo The script now automatically fetches and manages the YDB cluster, Prometheus, and workload via docker compose instead of requiring manual infra setup. The workload configuration is passed as env vars to the compose file. --- tests/slo/slo_runner.sh | 156 +++++++++++++++++++++------------------- 1 file changed, 82 insertions(+), 74 deletions(-) diff --git a/tests/slo/slo_runner.sh b/tests/slo/slo_runner.sh index 9742ca2f..87d89fbb 100755 --- a/tests/slo/slo_runner.sh +++ b/tests/slo/slo_runner.sh @@ -4,20 +4,18 @@ set -euo pipefail # Local SLO runner. # # This script: -# 1) checks that the infra docker network exists (default: ydb_cluster) +# 1) clones / updates ydb-slo-action deploy configs # 2) builds the workload image -# 3) runs topic-create + topic-run inside that network +# 3) starts everything via docker compose (YDB + Prometheus + workload) +# 4) tears down on exit # -# Why it runs the workload as a container: -# - infra compose does not necessarily publish YDB/Prometheus ports to localhost -# - attaching to the compose network makes service discovery reliable (DNS) +# The workload runs as the "workload-current" service from the compose file, +# configured via WORKLOAD_CURRENT_IMAGE and WORKLOAD_CURRENT_COMMAND env vars. +# +# Infra configs: https://github.com/ydb-platform/ydb-slo-action/tree/v2/deploy # # Configuration (env vars): -# NETWORK_NAME : docker network to attach workload container to (default: ydb_cluster) -# YDB_ENDPOINT : grpc endpoint inside the network (default: grpc://ydb-storage-1:2136) -# YDB_DATABASE : database (default: /Root/testdb) -# TOPIC_PATH : topic path (default: /Root/testdb/slo_topic) -# OTLP_ENDPOINT : Prometheus OTLP receiver URL (default: http://prometheus:9090/api/v1/otlp/v1/metrics) +# WORKLOAD_NAME : workload type (default: topic) # RUN_TIME_SEC : workload run time seconds (default: 120) # WRITE_RPS : topic write rps (default: 1) # READ_THREADS : topic read threads (default: 0) @@ -29,12 +27,11 @@ set -euo pipefail SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" REPO_ROOT="$(cd "${SCRIPT_DIR}/../.." && pwd)" -NETWORK_NAME="${NETWORK_NAME:-ydb_cluster}" - -YDB_DATABASE="${YDB_DATABASE:-/Root/testdb}" -TOPIC_PATH="${TOPIC_PATH:-${YDB_DATABASE}/slo_topic}" +INFRA_DIR="${SCRIPT_DIR}/.infra" +INFRA_REPO="https://github.com/ydb-platform/ydb-slo-action.git" +INFRA_BRANCH="v2" -OTLP_ENDPOINT="${OTLP_ENDPOINT:-http://prometheus:9090/api/v1/otlp/v1/metrics}" +WORKLOAD_NAME="${WORKLOAD_NAME:-topic}" RUN_TIME_SEC="${RUN_TIME_SEC:-120}" WRITE_RPS="${WRITE_RPS:-1}" READ_THREADS="${READ_THREADS:-0}" @@ -45,74 +42,85 @@ DEBUG="${DEBUG:-0}" WORKLOAD_IMAGE="${WORKLOAD_IMAGE:-ydb-python-slo:local}" -# Infra configuration -# -# Infra is expected to be started separately (see https://github.com/ydb-platform/ydb-slo-action/tree/main/deploy). -# This runner only attaches the workload container to the existing docker network "${NETWORK_NAME}". -YDB_ENDPOINT="${YDB_ENDPOINT:-grpc://ydb-storage-1:2136}" +# --------------------------------------------------------------------------- +# Infra management (ydb-slo-action/v2/deploy) +# --------------------------------------------------------------------------- -ensure_network() { - if docker network inspect "${NETWORK_NAME}" >/dev/null 2>&1; then - return 0 +fetch_infra() { + if [[ -d "${INFRA_DIR}/.git" ]]; then + echo "[slo_runner] updating infra configs..." + git -C "${INFRA_DIR}" fetch origin "${INFRA_BRANCH}" --depth 1 --quiet + git -C "${INFRA_DIR}" -c advice.detachedHead=false checkout FETCH_HEAD --quiet + else + echo "[slo_runner] cloning infra configs (${INFRA_REPO} @ ${INFRA_BRANCH}, sparse: deploy/)..." + git clone --no-checkout --depth 1 --branch "${INFRA_BRANCH}" --filter=blob:none "${INFRA_REPO}" "${INFRA_DIR}" + git -C "${INFRA_DIR}" sparse-checkout init --cone + git -C "${INFRA_DIR}" sparse-checkout set deploy + git -C "${INFRA_DIR}" checkout "${INFRA_BRANCH}" fi - - echo "[slo_runner] docker network '${NETWORK_NAME}' not found." >&2 - echo "[slo_runner] Start infra and ensure it creates/uses this network, then re-run." >&2 - echo "[slo_runner] Infra configs: https://github.com/ydb-platform/ydb-slo-action/tree/main/deploy" >&2 - exit 2 } -workload_run() { - # Runs workload as a container attached to the infra compose network. - # Usage: - # workload_run - docker run --rm --network "${NETWORK_NAME}" "${WORKLOAD_IMAGE}" "$@" -} +COMPOSE_FILE="${INFRA_DIR}/deploy/compose.yml" build_workload_image() { - docker build -f "${REPO_ROOT}/tests/slo/Dockerfile" -t "${WORKLOAD_IMAGE}" "${REPO_ROOT}" + echo "[slo_runner] building workload image: ${WORKLOAD_IMAGE} ..." + docker build --platform linux/amd64 -f "${REPO_ROOT}/tests/slo/Dockerfile" -t "${WORKLOAD_IMAGE}" "${REPO_ROOT}" +} + +# --------------------------------------------------------------------------- +# Build workload command +# --------------------------------------------------------------------------- + +build_workload_command() { + local cmd=( + --workload-name "${WORKLOAD_NAME}" + --report-period "${REPORT_PERIOD_MS}" + --read-threads "${READ_THREADS}" + --write-threads "${WRITE_THREADS}" + --write-rps "${WRITE_RPS}" + --message-size "${MESSAGE_SIZE}" + --time "${RUN_TIME_SEC}" + ) + if [[ "${DEBUG}" == "1" ]]; then + cmd+=(--debug) + fi + echo "${cmd[*]}" } -echo "[slo_runner] repo root: ${REPO_ROOT}" -echo "[slo_runner] compose network: ${NETWORK_NAME}" -echo "[slo_runner] ydb endpoint: ${YDB_ENDPOINT}" -echo "[slo_runner] ydb db: ${YDB_DATABASE}" -echo "[slo_runner] topic path: ${TOPIC_PATH}" -echo "[slo_runner] otlp endpoint: ${OTLP_ENDPOINT}" -echo "[slo_runner] checking docker network: ${NETWORK_NAME}..." -ensure_network -echo "[slo_runner] building workload image: ${WORKLOAD_IMAGE} ..." +# --------------------------------------------------------------------------- +# Main +# --------------------------------------------------------------------------- + +fetch_infra build_workload_image -echo "[slo_runner] topic-create..." -topic_create_args=( - topic-create - "${YDB_ENDPOINT}" - "${YDB_DATABASE}" - --path "${TOPIC_PATH}" -) -if [[ "${DEBUG}" == "1" ]]; then - topic_create_args+=(--debug) -fi -workload_run "${topic_create_args[@]}" - -echo "[slo_runner] topic-run..." -topic_run_args=( - topic-run - "${YDB_ENDPOINT}" - "${YDB_DATABASE}" - --path "${TOPIC_PATH}" - --otlp-endpoint "${OTLP_ENDPOINT}" - --report-period "${REPORT_PERIOD_MS}" - --read-threads "${READ_THREADS}" - --write-threads "${WRITE_THREADS}" - --write-rps "${WRITE_RPS}" - --message-size "${MESSAGE_SIZE}" - --time "${RUN_TIME_SEC}" -) -if [[ "${DEBUG}" == "1" ]]; then - topic_run_args+=(--debug) +echo "[slo_runner] starting infra + workload..." +echo "[slo_runner] workload image: ${WORKLOAD_IMAGE}" +echo "[slo_runner] workload name: ${WORKLOAD_NAME}" +echo "[slo_runner] run time: ${RUN_TIME_SEC}s" + +export WORKLOAD_CURRENT_IMAGE="${WORKLOAD_IMAGE}" +export WORKLOAD_CURRENT_COMMAND="$(build_workload_command)" +export WORKLOAD_NAME +export WORKLOAD_DURATION="${RUN_TIME_SEC}" + +COMPOSE="docker compose -f ${COMPOSE_FILE} --profile telemetry --profile workload-current" +trap '${COMPOSE} down' EXIT + +echo "[slo_runner] starting infra..." +${COMPOSE} up -d --wait + +prom_port=$(docker port ydb-prometheus 9090 2>/dev/null | head -1 || true) +if [[ -n "${prom_port}" ]]; then + echo "[slo_runner] prometheus: http://${prom_port}" fi -workload_run "${topic_run_args[@]}" -echo "[slo_runner] done" +echo "[slo_runner] waiting for workload to finish..." +${COMPOSE} logs -f workload-current & +LOGS_PID=$! + +exit_code=$(docker wait ydb-workload-current) + +kill "${LOGS_PID}" 2>/dev/null || true +echo "[slo_runner] workload exited with code ${exit_code}" +exit "${exit_code}" From 29aada51a428ce5dd332ccca0672a86a23435157 Mon Sep 17 00:00:00 2001 From: Vladislav Polyakov Date: Mon, 13 Apr 2026 11:22:39 +0300 Subject: [PATCH 10/13] Add .gitignore for SLO test infrastructure directory --- tests/slo/.gitignore | 1 + 1 file changed, 1 insertion(+) create mode 100644 tests/slo/.gitignore diff --git a/tests/slo/.gitignore b/tests/slo/.gitignore new file mode 100644 index 00000000..1ce13a4f --- /dev/null +++ b/tests/slo/.gitignore @@ -0,0 +1 @@ +.infra/ From bf8b9f838660fbf449b6ba83fb1ec861084689f5 Mon Sep 17 00:00:00 2001 From: Vladislav Polyakov Date: Mon, 13 Apr 2026 11:31:18 +0300 Subject: [PATCH 11/13] Expand SLO testing documentation with local run examples Add comprehensive guide for running SLO workloads locally against a custom YDB instance, including Docker Compose setup, CLI argument examples, and environment variable usage. Update section heading to clarify Docker Compose approach. --- tests/slo/README.md | 73 +++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 70 insertions(+), 3 deletions(-) diff --git a/tests/slo/README.md b/tests/slo/README.md index 94b144f5..d028a819 100644 --- a/tests/slo/README.md +++ b/tests/slo/README.md @@ -9,9 +9,9 @@ SLO is the type of test where app based on ydb-sdk is tested against falling YDB - **sync-table** — tests table operations via Table API (read/write) - **topic** — tests topic operations (publish/consume) -## Quick start +## Quick start (Docker Compose) -The runner script handles everything: clones infra configs, builds the workload image, +The runner script handles everything: clones [ydb-slo-action](https://github.com/ydb-platform/ydb-slo-action/tree/v2/deploy) infra configs, builds the workload image, starts YDB cluster + Prometheus via docker compose, runs the workload, and tears down on exit. ```sh @@ -24,7 +24,74 @@ WORKLOAD_NAME=topic ./slo_runner.sh WORKLOAD_NAME=sync-query ./slo_runner.sh ``` -Infra configs: https://github.com/ydb-platform/ydb-slo-action/tree/v2/deploy +## Local run (against your own YDB) + +Start the playground cluster and run the workload directly with Python. +All examples run from `tests/slo/` directory with activated venv. + +```sh +# Start playground YDB cluster +docker compose -f playground/configs/compose.yaml up -d + +# Activate venv +source ../../.venv/bin/activate +``` + +### Using CLI arguments + +```sh +# Topic workload — write only, 60 sec, debug logging +python ./src grpc://localhost:2136 /Root/testdb \ + --workload-name topic \ + --topic-path /Root/testdb/slo_topic \ + --otlp-endpoint "" \ + --write-rps 1 --write-threads 1 --read-threads 0 \ + --time 60 --debug + +# Topic workload — read + write +python ./src grpc://localhost:2136 /Root/testdb \ + --workload-name topic \ + --otlp-endpoint "" \ + --write-rps 5 --write-threads 2 --read-threads 2 --read-rps 10 \ + --time 120 + +# Table workload (sync-query) — default RPS +python ./src grpc://localhost:2136 /Root/testdb \ + --workload-name sync-query \ + --otlp-endpoint "" \ + --time 60 --debug + +# Table workload — high load +python ./src grpc://localhost:2136 /Root/testdb \ + --workload-name sync-query \ + --otlp-endpoint "" \ + --read-rps 500 --write-rps 100 --read-threads 8 --write-threads 4 \ + --time 300 +``` + +### Using environment variables + +```sh +# All settings via env vars +YDB_ENDPOINT=grpc://localhost:2136 \ +YDB_DATABASE=/Root/testdb \ +WORKLOAD_NAME=topic \ +WORKLOAD_DURATION=60 \ +OTEL_EXPORTER_OTLP_ENDPOINT="" \ + python ./src --debug + +# Mix: connection via env, tuning via args +YDB_ENDPOINT=grpc://localhost:2136 \ +YDB_DATABASE=/Root/testdb \ + python ./src --workload-name sync-query --otlp-endpoint "" \ + --read-rps 200 --write-rps 50 --time 120 +``` + +### Tear down + +```sh +docker compose -f playground/configs/compose.yaml down +``` ### Configuration From 3481b1af878a595c36d8b31e1b105f3f3cafa3d2 Mon Sep 17 00:00:00 2001 From: Vladislav Polyakov Date: Mon, 13 Apr 2026 11:32:14 +0300 Subject: [PATCH 12/13] Restructure SLO testing documentation Add Docker Compose full-stack approach and clarify local testing against custom YDB instance. Update success criteria to be workload-agnostic. --- AGENTS.md | 51 +++++++++++++++++++++++++++------------------------ 1 file changed, 27 insertions(+), 24 deletions(-) diff --git a/AGENTS.md b/AGENTS.md index b347890e..2b6f1c98 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -90,44 +90,47 @@ To regenerate protobuf stubs: see `Makefile` and `generate-protobuf.Dockerfile`. --- -## Topic Chaos Testing (SLO) +## SLO Testing -Run this only for changes that affect topic reader/writer reconnection logic. +Run this for changes that affect topic/table reader/writer reconnection logic. -**1. Start YDB with chaos** (kills a DB node every ~20 seconds): -```sh -docker compose -f tests/slo/playground/configs/compose.yaml up -d -``` +### Docker Compose (full stack) -**2. Wait until YDB is healthy:** -```sh -docker ps --format "table {{.Names}}\t{{.Status}}" | grep ydb -``` +Uses [ydb-slo-action](https://github.com/ydb-platform/ydb-slo-action/tree/v2/deploy) infra (YDB cluster + Prometheus + workload in one command). -**3. Create a test topic** (from `tests/slo/` directory): +From `tests/slo/` directory: ```sh -source .venv/bin/activate -python ./src topic-create grpc://localhost:2135 /Root/testdb \ - --path /Root/testdb/slo_topic --debug +WORKLOAD_NAME=topic ./slo_runner.sh +WORKLOAD_NAME=sync-query ./slo_runner.sh ``` -**4. Test writer** (60 sec): +Override defaults via env vars: `RUN_TIME_SEC`, `WRITE_RPS`, `READ_THREADS`, `WRITE_THREADS`, `MESSAGE_SIZE`, `DEBUG=1`. + +### Local run (against your own YDB) + +**1. Start playground cluster:** ```sh -python ./src topic-run grpc://localhost:2135 /Root/testdb \ - --path /Root/testdb/slo_topic --otlp-endpoint "" \ - --read-threads 0 --write-rps 1 --time 60 --debug +docker compose -f tests/slo/playground/configs/compose.yaml up -d ``` -**5. Test reader** (60 sec): +**2. Run workload** (from `tests/slo/` directory): ```sh -python ./src topic-run grpc://localhost:2135 /Root/testdb \ - --path /Root/testdb/slo_topic --otlp-endpoint "" \ - --read-rps 1 --write-threads 0 --time 60 --debug +source ../../.venv/bin/activate + +# Topic workload (60 sec) +python ./src grpc://localhost:2136 /Root/testdb \ + --workload-name topic --otlp-endpoint "" --time 60 --debug + +# Table workload (60 sec) +python ./src grpc://localhost:2136 /Root/testdb \ + --workload-name sync-query --otlp-endpoint "" --time 60 --debug ``` -**6. Tear down:** +**3. Tear down:** ```sh docker compose -f tests/slo/playground/configs/compose.yaml down ``` -**Success criteria:** writer and reader reconnect automatically during node restarts with no fatal errors. +Full list of CLI arguments and environment variables: see `tests/slo/README.md` or run `python tests/slo/src --help`. + +**Success criteria:** workload reconnects automatically during node restarts with no fatal errors. From c073769e0c56975ff242e124da873a7f070178e3 Mon Sep 17 00:00:00 2001 From: Vladislav Polyakov Date: Mon, 13 Apr 2026 13:56:18 +0300 Subject: [PATCH 13/13] Add async mode support to SLO workload runner --- tests/slo/src/options.py | 1 + tests/slo/src/root_runner.py | 36 ++++++++++++++++++++++++++++++++++++ 2 files changed, 37 insertions(+) diff --git a/tests/slo/src/options.py b/tests/slo/src/options.py index c9f340c6..0db988e0 100644 --- a/tests/slo/src/options.py +++ b/tests/slo/src/options.py @@ -48,6 +48,7 @@ def parse_options(): ) parser.add_argument("--debug", action="store_true", help="Enable debug logging") + parser.add_argument("--async", dest="async_mode", action="store_true", help="Run workload in async mode") # Table params parser.add_argument("--table-name", default="key_value", help="Table name") diff --git a/tests/slo/src/root_runner.py b/tests/slo/src/root_runner.py index 7a8e2060..4e280b53 100644 --- a/tests/slo/src/root_runner.py +++ b/tests/slo/src/root_runner.py @@ -1,3 +1,4 @@ +import asyncio import logging from runners.table_runner import TableRunner @@ -24,6 +25,13 @@ def _get_runner(workload_name: str): def run_all(args): """Create infrastructure, run the workload, then clean up — all in one go.""" + if args.async_mode: + asyncio.run(_run_all_async(args)) + else: + _run_all_sync(args) + + +def _run_all_sync(args): workload_name = args.workload_name runner = _get_runner(workload_name) @@ -51,3 +59,31 @@ def run_all(args): logger.exception("Cleanup failed — ignoring") driver.stop(timeout=args.shutdown_time) + + +async def _run_all_async(args): + workload_name = args.workload_name + runner = _get_runner(workload_name) + + driver_config = ydb.DriverConfig( + args.endpoint, + database=args.db, + grpc_keep_alive_timeout=5000, + ) + + async with ydb.aio.Driver(driver_config) as driver: + await driver.wait(timeout=300) + runner.set_driver(driver) + + try: + logger.info("[%s][async] Creating resources", workload_name) + runner.create(args) + + logger.info("[%s][async] Running workload for %d s", workload_name, args.time) + await runner.run_async(args) + finally: + logger.info("[%s][async] Cleaning up resources", workload_name) + try: + runner.cleanup(args) + except Exception: + logger.exception("Cleanup failed — ignoring")