Skip to content

feat: ClickHouse sink for query history, aggregated metrics and session snapshots#36

Open
Sanikadze wants to merge 1 commit into
open-gpdb:mainfrom
Sanikadze:feat/clickhouse-sink
Open

feat: ClickHouse sink for query history, aggregated metrics and session snapshots#36
Sanikadze wants to merge 1 commit into
open-gpdb:mainfrom
Sanikadze:feat/clickhouse-sink

Conversation

@Sanikadze

Copy link
Copy Markdown
Contributor

Summary

Adds an optional ClickHouse sink to the master role that streams query history, aggregated metrics and session snapshots into a ClickHouse cluster for long-term, ad-hoc SQL analysis. Complements the existing JSON archiver (which keeps working unchanged) and the Prometheus/VictoriaMetrics path.

The sink is opt-in via clickhouse.enabled: true in yagpcc.yaml. When disabled the orchestrator is a no-op and yagpcc behaves exactly as before.

Motivation

The JSON archiver and Prometheus metrics cover the live-monitoring and short-term audit use-cases, but neither lets operators run ad-hoc SQL queries against historical query data (top-N slowest, by user/database/resgroup, plan diffs over time, etc.). ClickHouse is the natural fit for this workload — columnar storage, fast aggregation, TTL-based retention.

What it does

The sink persists three first-class tables (plus a bookkeeping table) under the yagpcc database in ClickHouse:

Table Purpose
yagpcc.query_events One row per query status change (SUBMIT/START/DONE/ERROR/CANCELLING/CANCELED/END). Holds the full plan tree as JSON, per-segment metrics as Array(Tuple(...)) and aggregated SystemStat/Instrumentation totals.
yagpcc.aggregated_metrics SummingMergeTree rollups keyed by (query_id, plan_id, user, database, resource_group, bucket_time). Mirrors the in-memory AggregatedStorage flush.
yagpcc.session_snapshots Periodic snapshot of pg_stat_activity (default every 10s).
yagpcc._yagpcc_meta Schema-version ledger. Used by both auto-migrate and verify-schema modes.

The DDL lives in internal/sink/clickhouse/migrations/0001_init.up.sql and is embedded into the binary via //go:embed. The retention TTL is a text/template placeholder rendered from clickhouse.retention_days at startup.

Architecture

internal/sink/clickhouse/
├── client.go         NewClient + TLS config + Ping
├── writer.go         ClickhouseWriter orchestrator (lifecycle, Submit, FlushAggregates)
├── tables.go         QueryEventWriter, AggregatedWriter, SessionSnapshotWriter
├── mapping.go        proto/storage to CH row converters
├── buffer.go         thread-safe ring buffer (drop_oldest | block)
├── migrations.go     ParseMigrations, RenderTemplate, ApplyMigrations
├── schema.go         VerifySchema, DumpSchema, DumpMigration
├── metrics.go        Prometheus collectors + per-table hook factories
├── migrations/       0001_init.{up,down}.sql (embedded)
└── testdata/         fixtures for unit tests

Boundaries are designed to keep the sink isolated from yagpcc's hot-path storage:

  • mapping.go is the only file that imports proto and storage types.
  • The orchestrator exposes a small ClickhouseSink interface to internal/master/background.go (Submit + FlushAggregates) so the master can swap a fake in tests.
  • Per-table writers depend on a batchPreparer interface (a subset of driver.Conn) so unit tests use hand-rolled fakes instead of standing up a real server.

Data flow

yagp-hooks-collector --UDS--> yagpcc segments --gRPC pull--> master/background.go
                                                                  |
                              +-----------------------------------+
                              v                                   v
               ArchiveOrAggregate (existing)            AggregatedStorage cycle hook
                              |                                   |
                              v                                   v
                  ClickhouseWriter.Submit              ClickhouseWriter.FlushAggregates
                              |                                   |
                              v                                   v
              QueryEventWriter.Write              AggregatedWriter.FlushBuckets
              (filter + Buffer.Append)                            |
                              |                                   v
                              v                       INSERT yagpcc.aggregated_metrics
                   periodic Flush(ctx)
                              |
                              v
                INSERT yagpcc.query_events             SessionSnapshotWriter (own ticker)
                                                                  |
                                                                  v
                                                INSERT yagpcc.session_snapshots

archiver.go and statwriter.go keep working unchanged; the JSON archive becomes a manual disaster-recovery source rather than the only persistence path. Both can be enabled at the same time, only one, or neither.

Configuration

Minimal master config:

role: master
# ... existing options ...

clickhouse:
  enabled: true
  addrs:
    - "ch01.example.com:9440"
  database: yagpcc
  user: yagpcc_writer
  # password is read from env YAGPCC_CH_PASSWORD (never put it in yaml)
  schema_management: auto       # auto | verify_only | dump_only
  retention_days: 30
  batch_size: 10000
  flush_interval: 10s
  buffer_max_rows: 100000
  on_buffer_overflow: drop_oldest  # drop_oldest | block
  async_insert: true
  min_duration_ms: 60000        # filter short queries
  session_snapshot_interval_sec: 10
  tls:
    enabled: true
    ca_file: /etc/yagpcc/certs/ch_ca.crt

When enabled: false, the sink is constructed as a no-op and adds no overhead.

CLI flags

  • --dump-schema — print cumulative CH DDL to stdout and exit (no connection required)
  • --dump-migration --from=N --to=M — print SQL to migrate between schema versions
  • --migrate-only — connect, apply pending migrations, exit
  • --verify-schema — connect, verify schema version matches binary expectations, exit

Schema management

Mode Behaviour
auto (default) Apply pending migrations on startup. Idempotent.
verify_only Check _yagpcc_meta version, fatal on mismatch.
dump_only Print DDL to stdout, exit. For CI integration.

The bookkeeping table yagpcc._yagpcc_meta tracks version, applied_at, direction='up'|'down' for each migration.

Observability

New Prometheus metrics:

  • yagpcc_ch_inserts_total{table, status} — counter of INSERT batches
  • yagpcc_ch_dropped_rows_total{table, reason} — counter of dropped rows (buffer_overflow, filter)
  • yagpcc_ch_buffer_size{table} — gauge of current buffer occupancy
  • yagpcc_ch_flush_duration_seconds{table} — histogram of batch flush latency

Backpressure

The ring buffer offers two policies:

  • drop_oldest (default) — drop oldest rows when buffer is full, increment yagpcc_ch_dropped_rows_total{reason="buffer_overflow"}. Producer never blocks.
  • block — block the producer until the writer drains. NOT recommended in production: a stuck CH can stall the master.

Graceful shutdown

ctx.Done() triggers a final flush guarded by a 30s timeout so a stuck CH cannot block master shutdown. The flusher goroutine and the per-table writers are drained in order, then the connection is closed.

Testing

  • Unit tests — every component has a corresponding *_test.go with hand-rolled fakes for batchPreparer / driver.Conn. Coverage in internal/sink/clickhouse/ is high (writer, tables, migrations, schema, buffer, mapping, client, metrics all have dedicated suites).
  • Integration testinternal/sink/clickhouse/integration_test.go uses testcontainers-go to spin up a real ClickHouse container, apply migrations, write rows, verify reads. Gated by -tags=integration to keep go test ./... fast.
  • End-to-end test plan — see Test plan section below.

Files changed

Category Files
New package internal/sink/clickhouse/ 11 .go + 11 _test.go + 2 .sql migrations
New CLI flags cmd/server/schema_cli.go + test
Config internal/config/clickhouse.go + test, internal/config/config.go (validation)
Wiring internal/master/background.go (CH sink interface, Submit/Flush), internal/storage/aggregated_storage.go (cycle hook)
App lifecycle internal/app/app.go (master-only CH writer construction)
Docs docs/clickhouse-sink-architecture.md, docs/vm-vs-ch.md, README, architecture
RAT exclude .rat-excludes (embedded SQL migrations)

46 files changed, ~10200 insertions.

Backwards compatibility

  • Default clickhouse.enabled: false keeps existing deployments unchanged.
  • JSON archiver path is untouched; both paths can coexist.
  • No new required config fields; all CH options have sensible defaults.
  • No proto changes; gRPC API is unaffected.

Test plan

  • make build succeeds
  • make unittest passes (excluding integration tag)
  • make lint passes (golangci-lint)
  • make package produces a deb that installs cleanly
  • Apache RAT audit workflow passes (license headers + .rat-excludes)
  • Deploy on dev stand with clickhouse.enabled: false — yagpcc behaves identically to current main
  • Deploy on dev stand with clickhouse.enabled: true, run load — yagpcc.query_events rows appear, yagpcc_ch_inserts_total{table="query_events"} grows
  • Verify graceful shutdown — systemctl stop yagpcc flushes buffers within 30s, no WARN clickhouse sink close error in logs
  • Verify --dump-schema outputs valid CH DDL
  • Verify --migrate-only applies migrations from a clean DB and exits with code 0
  • Verify --verify-schema exits with code 0 when versions match, code 1 when they don't

Known dependencies

  • github.com/ClickHouse/clickhouse-go/v2 (direct, native protocol client)
  • github.com/testcontainers/testcontainers-go (test-only)
  • Transitive: opentelemetry, segmentio/asm, shopspring/decimal — pulled in by clickhouse-go

Related

  • docs/clickhouse-sink-architecture.md — full architecture (this PR adds it)
  • docs/vm-vs-ch.md — when to use VictoriaMetrics vs ClickHouse (this PR adds it)

…on snapshots

Includes a small lifecycle fix in queryCompleted: archive on master
Completed even when not all segments have reported. Without this,
under load garbageCollect evicts queries from RunningQueriesStorage
before the segment-timeout fires, so they never reach archChan and
the CH sink stays empty for query_events / aggregated_metrics.

Closes open-gpdb#34

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces an optional, master-only ClickHouse sink that persists query history events, aggregated metrics rollups, and session snapshots into a ClickHouse cluster for long-term SQL analytics, alongside the existing JSON archiver and Prometheus/VictoriaMetrics metrics path.

Changes:

  • Added new internal/sink/clickhouse/ package (client, buffering, mapping, writers, migrations/schema tooling, metrics, and tests) plus ClickHouse DDL migrations.
  • Wired the sink into the master lifecycle (app construction, background query archive path, aggregated-storage cycle hook) and added ClickHouse config + validation.
  • Added schema-management CLI commands (--dump-schema, --dump-migration, --migrate-only, --verify-schema), documentation updates, and a CI job for ClickHouse integration tests.

Reviewed changes

Copilot reviewed 42 out of 46 changed files in this pull request and generated 5 comments.

Show a summary per file
File Description
README.md Documents the optional ClickHouse sink, required grants, config snippet, and schema CLI flags.
PROJECT_CONTEXT.md Adds project overview/context including the new sink package and high-level architecture notes.
internal/storage/aggregated_storage.go Adds cycle-hook support with deep-cloned snapshots to safely forward drained aggregated buckets to a sink.
internal/storage/aggregated_storage_test.go Adds a regression test for deep-cloned snapshot behavior used by the cycle hook.
internal/sink/clickhouse/client.go Adds ClickHouse connection creation, async insert settings, and TLS configuration.
internal/sink/clickhouse/client_test.go Unit tests for TLS config validation and client/ping behavior.
internal/sink/clickhouse/buffer.go Implements a concurrent ring buffer with drop_oldest and block overflow policies.
internal/sink/clickhouse/buffer_test.go Unit tests for buffer behavior, overflow modes, Close/unblock semantics, and concurrency.
internal/sink/clickhouse/mapping.go Maps proto/storage inputs into ClickHouse row layouts (query events, aggregates, sessions).
internal/sink/clickhouse/integration_test.go Integration test (tagged) using testcontainers to apply migrations and exercise inserts/query patterns/TTL.
internal/sink/clickhouse/metrics.go Adds Prometheus collectors and hook adapters for per-table sink observability.
internal/sink/clickhouse/metrics_test.go Verifies metric registration and hook-to-metric wiring.
internal/sink/clickhouse/migrations.go Embeds/parses/renders SQL migrations, applies migrations, and reads current schema version.
internal/sink/clickhouse/migrations_test.go Unit tests for migration parsing/rendering/splitting.
internal/sink/clickhouse/migration_runner_test.go Fake MigrationConn + tests for ApplyMigrations/GetCurrentVersion behavior.
internal/sink/clickhouse/schema.go Implements VerifySchema/DumpSchema/DumpMigration helpers for schema management.
internal/sink/clickhouse/schema_test.go Unit tests for VerifySchema and dump helpers.
internal/sink/clickhouse/migrations/0001_init.up.sql Initial ClickHouse schema for _yagpcc_meta, query_events, aggregated_metrics, session_snapshots (templated TTL).
internal/sink/clickhouse/migrations/0001_init.down.sql Down migration drops the created tables.
internal/sink/clickhouse/migrations/.gitkeep Placeholder file for the migrations directory.
internal/sink/clickhouse/testdata/.gitkeep Placeholder file for the testdata directory.
internal/master/background.go Adds ClickhouseSink interface wiring, submission/aggregate-forwarding helpers, and snapshot conversion.
internal/master/archiver.go Forwards finalized query records into the ClickHouse sink (when configured).
internal/master/clickhouse_test.go Adds tests validating BackgroundStorage wiring to a fake ClickHouse sink.
internal/master/statwriter_test.go Formatting-only adjustments in test fixtures (no behavior change).
internal/config/config.go Adds Clickhouse config to the main Config, applies env overrides after load, validates CH config on master.
internal/config/clickhouse.go Defines ClickHouse sink configuration schema, defaults, env override, and validation.
internal/config/clickhouse_test.go Unit tests for defaults, validation rules, env overrides, and master-vs-segment validation behavior.
internal/app/app.go Constructs and runs the ClickHouse writer for master-only when enabled; wires session snapshot provider and cycle hook.
cmd/server/schema_cli.go Implements schema-management CLI commands (dump/migrate/verify) and dump_only startup short-circuit.
cmd/server/schema_cli_test.go Unit tests for schema CLI behavior and error handling.
cmd/server/main.go Adds schema CLI flag wiring and startup short-circuit logic before running the main app.
cmd/server/yagpcc_master.yaml Adds an (opt-in) ClickHouse configuration template block for master deployments.
docs/clickhouse-sink-architecture.md Adds detailed design/behavior docs for the ClickHouse sink and schema lifecycle/CLI/metrics.
docs/vm-vs-ch.md Documents the intended data split between VictoriaMetrics and ClickHouse.
docs/architecture.md Updates architecture diagram and narrative to include optional ClickHouse sink.
docs/architecture-ru.md Same as above for the Russian architecture doc.
.rat-excludes Excludes embedded SQL migrations from Apache RAT checks.
.github/workflows/test.yaml Adds a ClickHouse integration-test job running go test -tags=integration ....
go.mod Adds ClickHouse client + testcontainers and updates indirect dependencies accordingly.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread cmd/server/main.go
}

for {
err := app.Run(ctxC, fmt.Sprintf("%s/%s", *configPathValue, configFile))
Comment on lines +258 to +261
`schema_management: dump_only` in the YAML achieves the same as
`--dump-schema` but goes through the normal startup path; once the dump is
printed the binary self-disables the sink and the master continues to run
without ClickHouse.
Comment on lines +153 to +155
- `dump_only` — prints the rendered DDL to stdout and self-disables. The
`--dump-only` mode is meant for offline review; the corresponding CLI
flags are described below.
Comment on lines +148 to +150
// RenderTemplate substitutes {{.Field}} placeholders in sql using params via
// text/template. Missing keys are rendered as <no value> by template defaults;
// callers should pass complete params.
origVal := aggStorage.aggQueries[AggKey{QueryID: 42, UserName: "u", DatabaseName: "d", StartTime: startI, EndTime: endI}]
aggStorage.mx.RUnlock()
require.NotNil(t, origVal)
CurrentTime = func() time.Time { return endQ.Add(2 * aggStorage.GetTruncInterval()) }
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants