From 7d8752ec25f231451b4457e2fd4d66c81aa3aeae Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?U=C4=9Fur=20=C3=96zy=C4=B1lmazel?= Date: Sat, 3 Jan 2026 19:12:55 +0300 Subject: [PATCH 1/4] Add GitLab webhook support MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implement full GitLab webhook processing pipeline: - Add database migrations for gitlab and gitlab_user tables - Create gitlabwebhookhandler with plain text token validation - Create gitlabstorage with nullable field handling for varying payloads - Add gitlabconsumer service for Kafka message processing - Update webhookserver with /v1/webhook/gitlab endpoint - Add Dockerfile and docker-compose service for gitlab-consumer - Add GitHub Actions workflows for gitlab-consumer (build + staging) - Add rake task for running gitlab consumer locally - Update pre-commit config to use TekWizely/pre-commit-golang Supports all GitLab webhook event types including premium/ultimate events (user_add_to_group, project_create, subgroup_create) with fallback handling for object_kind vs event_name and nested vs flat user/project fields. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- .../build-push-cauldron-github-consumer.yml | 2 +- ...ld-push-cauldron-github-group-consumer.yml | 2 +- .../build-push-cauldron-gitlab-consumer.yml | 33 ++ .../build-push-cauldron-migrator.yml | 2 +- .../build-push-cauldron-webhookserver.yml | 2 +- .../stg-cauldron-github-consumer.yml | 2 +- .../stg-cauldron-gitlab-consumer.yml | 33 ++ .github/workflows/stg-cauldron-migrator.yml | 2 +- .../workflows/stg-cauldron-webhookserver.yml | 2 +- .pre-commit-config.yaml | 8 +- Dockerfile.gitlab-consumer | 31 ++ README.md | 24 +- cmd/gitlabconsumer/main.go | 101 ++++ cmd/webhookserver/main.go | 51 +- docker-compose.infra.yml | 18 + .../storage/gitlabstorage/gitlabstorage.go | 267 +++++++++ .../gitlabstorage/gitlabstorage_test.go | 518 ++++++++++++++++++ .../gitlabwebhookhandler.go | 269 +++++++++ .../gitlabwebhookhandler_test.go | 504 +++++++++++++++++ migrations/000007_gitlab.down.sql | 12 + migrations/000007_gitlab.up.sql | 34 ++ migrations/000008_gitlab_user.down.sql | 9 + migrations/000008_gitlab_user.up.sql | 28 + scripts/local/rake/run.rake | 12 + 24 files changed, 1949 insertions(+), 17 deletions(-) create mode 100644 .github/workflows/build-push-cauldron-gitlab-consumer.yml create mode 100644 .github/workflows/stg-cauldron-gitlab-consumer.yml create mode 100644 Dockerfile.gitlab-consumer create mode 100644 cmd/gitlabconsumer/main.go create mode 100644 internal/storage/gitlabstorage/gitlabstorage.go create mode 100644 internal/storage/gitlabstorage/gitlabstorage_test.go create mode 100644 internal/transport/http/gitlabwebhookhandler/gitlabwebhookhandler.go create mode 100644 internal/transport/http/gitlabwebhookhandler/gitlabwebhookhandler_test.go create mode 100644 migrations/000007_gitlab.down.sql create mode 100644 migrations/000007_gitlab.up.sql create mode 100644 migrations/000008_gitlab_user.down.sql create mode 100644 migrations/000008_gitlab_user.up.sql diff --git a/.github/workflows/build-push-cauldron-github-consumer.yml b/.github/workflows/build-push-cauldron-github-consumer.yml index 44ea432..f3388bb 100644 --- a/.github/workflows/build-push-cauldron-github-consumer.yml +++ b/.github/workflows/build-push-cauldron-github-consumer.yml @@ -1,4 +1,4 @@ -name: Build and push Cauldron GitHub Consumer +name: GitHub Consumer (build-push) on: workflow_dispatch: diff --git a/.github/workflows/build-push-cauldron-github-group-consumer.yml b/.github/workflows/build-push-cauldron-github-group-consumer.yml index 910c9f2..88d1680 100644 --- a/.github/workflows/build-push-cauldron-github-group-consumer.yml +++ b/.github/workflows/build-push-cauldron-github-group-consumer.yml @@ -1,4 +1,4 @@ -name: Build and push Cauldron GitHub Group Consumer +name: GitHub Group Consumer (build-push) on: workflow_dispatch: diff --git a/.github/workflows/build-push-cauldron-gitlab-consumer.yml b/.github/workflows/build-push-cauldron-gitlab-consumer.yml new file mode 100644 index 0000000..0411d7f --- /dev/null +++ b/.github/workflows/build-push-cauldron-gitlab-consumer.yml @@ -0,0 +1,33 @@ +name: GitLab Consumer (build-push) + +on: + workflow_dispatch: + +jobs: + build-image: + runs-on: ubuntu-24.04 + steps: + - name: Checkout + uses: actions/checkout@v6 + + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v3 + + - name: Login to GitHub Container Registry + uses: docker/login-action@v3 + with: + registry: ghcr.io + username: ${{ github.actor }} + password: ${{ secrets.GITHUB_TOKEN }} + + - name: Build and push to GitHub Container Registry + uses: docker/build-push-action@v6 + with: + context: . + file: Dockerfile.gitlab-consumer + platforms: linux/amd64 + push: true + provenance: false + tags: ghcr.io/${{ github.repository }}/cauldron-gitlab-consumer:latest + cache-from: type=gha + cache-to: type=gha,mode=max diff --git a/.github/workflows/build-push-cauldron-migrator.yml b/.github/workflows/build-push-cauldron-migrator.yml index f80149d..b9dffcf 100644 --- a/.github/workflows/build-push-cauldron-migrator.yml +++ b/.github/workflows/build-push-cauldron-migrator.yml @@ -1,4 +1,4 @@ -name: Build and push Cauldron Migrator +name: Migrator (build-push) on: workflow_dispatch: diff --git a/.github/workflows/build-push-cauldron-webhookserver.yml b/.github/workflows/build-push-cauldron-webhookserver.yml index 88aacd0..b50b986 100644 --- a/.github/workflows/build-push-cauldron-webhookserver.yml +++ b/.github/workflows/build-push-cauldron-webhookserver.yml @@ -1,4 +1,4 @@ -name: Build and push Cauldron Webhook Server +name: Cauldron (build-push) on: workflow_dispatch: diff --git a/.github/workflows/stg-cauldron-github-consumer.yml b/.github/workflows/stg-cauldron-github-consumer.yml index f6a326c..8311243 100644 --- a/.github/workflows/stg-cauldron-github-consumer.yml +++ b/.github/workflows/stg-cauldron-github-consumer.yml @@ -1,4 +1,4 @@ -name: Staging - Build and push Cauldron GitHub Consumer +name: Staging - GitHub Consumer (build-push) on: workflow_dispatch: diff --git a/.github/workflows/stg-cauldron-gitlab-consumer.yml b/.github/workflows/stg-cauldron-gitlab-consumer.yml new file mode 100644 index 0000000..6bdc851 --- /dev/null +++ b/.github/workflows/stg-cauldron-gitlab-consumer.yml @@ -0,0 +1,33 @@ +name: Staging - GitLab Consumer (build-push) + +on: + workflow_dispatch: + +jobs: + build-image: + runs-on: ubuntu-24.04 + steps: + - name: Checkout + uses: actions/checkout@v6 + + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v3 + + - name: Login to GitHub Container Registry + uses: docker/login-action@v3 + with: + registry: ghcr.io + username: ${{ github.actor }} + password: ${{ secrets.GITHUB_TOKEN }} + + - name: Build and push to GitHub Container Registry + uses: docker/build-push-action@v6 + with: + context: . + file: Dockerfile.gitlab-consumer + platforms: linux/amd64 + push: true + provenance: false + tags: ghcr.io/${{ github.repository }}/cauldron-stg-gitlab-consumer:latest + cache-from: type=gha + cache-to: type=gha,mode=max diff --git a/.github/workflows/stg-cauldron-migrator.yml b/.github/workflows/stg-cauldron-migrator.yml index ff2082b..c8a888b 100644 --- a/.github/workflows/stg-cauldron-migrator.yml +++ b/.github/workflows/stg-cauldron-migrator.yml @@ -1,4 +1,4 @@ -name: Staging - Build and push Cauldron Migrator +name: Staging - Migrator (build-push) on: workflow_dispatch: diff --git a/.github/workflows/stg-cauldron-webhookserver.yml b/.github/workflows/stg-cauldron-webhookserver.yml index 4528456..6545723 100644 --- a/.github/workflows/stg-cauldron-webhookserver.yml +++ b/.github/workflows/stg-cauldron-webhookserver.yml @@ -1,4 +1,4 @@ -name: Staging - Build and push Cauldron Webhook Server +name: Staging - Cauldron (build-push) on: workflow_dispatch: diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index afee50d..e33c34e 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,10 +1,10 @@ repos: - - repo: https://github.com/dnephin/pre-commit-golang - rev: v0.5.1 + - repo: https://github.com/TekWizely/pre-commit-golang + rev: v1.0.0-rc.1 hooks: - - id: golangci-lint + - id: golangci-lint-mod - id: go-mod-tidy - - id: go-unit-tests + - id: go-test-mod - repo: https://github.com/rubocop-hq/rubocop rev: v1.70.0 diff --git a/Dockerfile.gitlab-consumer b/Dockerfile.gitlab-consumer new file mode 100644 index 0000000..36c4dea --- /dev/null +++ b/Dockerfile.gitlab-consumer @@ -0,0 +1,31 @@ +FROM golang:1.25-alpine AS builder + +WORKDIR /build +COPY . . + +ARG GOOS +ARG GOARCH +RUN CGO_ENABLED=0 GOOS=${GOOS} GOARCH=${GOARCH} go build -o consumer cmd/gitlabconsumer/main.go + +FROM alpine:latest AS certs +RUN apk add --update --no-cache ca-certificates + +FROM busybox:latest +ARG UID=10001 +RUN adduser \ + --disabled-password \ + --gecos "" \ + --home "/nonexistent" \ + --shell "/sbin/nologin" \ + --no-create-home \ + --uid "${UID}" \ + appuser +USER appuser +COPY --from=certs /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/ca-certificates.crt +COPY --from=builder /build/consumer /consumer + +ENTRYPOINT ["/consumer"] + +LABEL org.opencontainers.image.authors="Uğur vigo Özyılmazel " +LABEL org.opencontainers.image.licenses="MIT" +LABEL org.opencontainers.image.source="https://github.com/devchain-network/cauldron" diff --git a/README.md b/README.md index 96828e5..eb64e1b 100644 --- a/README.md +++ b/README.md @@ -1,13 +1,17 @@ ![Version](https://img.shields.io/badge/version-0.2.0-orange.svg) ![Go](https://img.shields.io/github/go-mod/go-version/devchain-network/cauldron) [![codecov](https://codecov.io/github/devchain-network/cauldron/graph/badge.svg?token=LAUHZBW12F)](https://codecov.io/github/devchain-network/cauldron) +[![Go Report Card](https://goreportcard.com/badge/github.com/devchain-network/cauldron)](https://goreportcard.com/report/github.com/devchain-network/cauldron) -[![Build and push Cauldron Webhook Server](https://github.com/devchain-network/cauldron/actions/workflows/build-push-cauldron-webhookserver.yml/badge.svg)](https://github.com/devchain-network/cauldron/actions/workflows/build-push-cauldron-webhookserver.yml) -[![Build and push Cauldron Migrator](https://github.com/devchain-network/cauldron/actions/workflows/build-push-cauldron-migrator.yml/badge.svg)](https://github.com/devchain-network/cauldron/actions/workflows/build-push-cauldron-migrator.yml) -[![Build and push Cauldron GitHub Consumer](https://github.com/devchain-network/cauldron/actions/workflows/build-push-cauldron-github-consumer.yml/badge.svg)](https://github.com/devchain-network/cauldron/actions/workflows/build-push-cauldron-github-consumer.yml) -[![Build and push Cauldron GitHub Group Consumer](https://github.com/devchain-network/cauldron/actions/workflows/build-push-cauldron-github-group-consumer.yml/badge.svg)](https://github.com/devchain-network/cauldron/actions/workflows/build-push-cauldron-github-group-consumer.yml) +[![Cauldron Migrator](https://github.com/devchain-network/cauldron/actions/workflows/build-push-cauldron-migrator.yml/badge.svg)](https://github.com/devchain-network/cauldron/actions/workflows/build-push-cauldron-migrator.yml) +[![Cauldron Webhook Server](https://github.com/devchain-network/cauldron/actions/workflows/build-push-cauldron-webhookserver.yml/badge.svg)](https://github.com/devchain-network/cauldron/actions/workflows/build-push-cauldron-webhookserver.yml) +[![Cauldron GitHub Consumer](https://github.com/devchain-network/cauldron/actions/workflows/build-push-cauldron-github-consumer.yml/badge.svg)](https://github.com/devchain-network/cauldron/actions/workflows/build-push-cauldron-github-consumer.yml) +[![Cauldron GitHub Group Consumer](https://github.com/devchain-network/cauldron/actions/workflows/build-push-cauldron-github-group-consumer.yml/badge.svg)](https://github.com/devchain-network/cauldron/actions/workflows/build-push-cauldron-github-group-consumer.yml) -[![Go Report Card](https://goreportcard.com/badge/github.com/devchain-network/cauldron)](https://goreportcard.com/report/github.com/devchain-network/cauldron) +[![Staging - Cauldron Migrator](https://github.com/devchain-network/cauldron/actions/workflows/stg-cauldron-migrator.yml/badge.svg)](https://github.com/devchain-network/cauldron/actions/workflows/stg-cauldron-migrator.yml) +[![Staging - Cauldron Webhook Server](https://github.com/devchain-network/cauldron/actions/workflows/stg-cauldron-webhookserver.yml/badge.svg)](https://github.com/devchain-network/cauldron/actions/workflows/stg-cauldron-webhookserver.yml) +[![Staging - Cauldron GitHub Consumer](https://github.com/devchain-network/cauldron/actions/workflows/stg-cauldron-github-consumer.yml/badge.svg)](https://github.com/devchain-network/cauldron/actions/workflows/stg-cauldron-github-consumer.yml) +[![Staging - Cauldron GitLab Consumer](https://github.com/devchain-network/cauldron/actions/workflows/stg-cauldron-gitlab-consumer.yml/badge.svg)](https://github.com/devchain-network/cauldron/actions/workflows/stg-cauldron-gitlab-consumer.yml) # cauldron @@ -21,6 +25,16 @@ development. --- +## Supported Git Providers + +- [x] GitHub +- [x] GitLab (*staging*) +- [ ] Gitea +- [ ] Codeberg +- [ ] BitBucket + +--- + ## License This project is licensed under MIT. diff --git a/cmd/gitlabconsumer/main.go b/cmd/gitlabconsumer/main.go new file mode 100644 index 0000000..9a9e8d3 --- /dev/null +++ b/cmd/gitlabconsumer/main.go @@ -0,0 +1,101 @@ +package main + +import ( + "context" + "fmt" + "log" + + "github.com/IBM/sarama" + "github.com/devchain-network/cauldron/internal/kafkacp" + "github.com/devchain-network/cauldron/internal/kafkacp/kafkaconsumer" + "github.com/devchain-network/cauldron/internal/slogger" + "github.com/devchain-network/cauldron/internal/storage" + "github.com/devchain-network/cauldron/internal/storage/gitlabstorage" + "github.com/vigo/getenv" +) + +func storeMessage(strg storage.PingStorer) kafkaconsumer.ProcessMessageFunc { + return func(ctx context.Context, msg *sarama.ConsumerMessage) error { + if err := strg.MessageStore(ctx, msg); err != nil { + return fmt.Errorf("message store error: [%w]", err) + } + + return nil + } +} + +// Run runs kafa gitlab consumer. +func Run() error { + logLevel := getenv.String("LOG_LEVEL", slogger.DefaultLogLevel) + brokersList := getenv.String("KCP_BROKERS", kafkacp.DefaultKafkaBrokers) + + kafkaTopic := getenv.String("KC_TOPIC_GITLAB", "") + kafkaPartition := getenv.Int("KC_PARTITION", kafkaconsumer.DefaultPartition) + kafkaDialTimeout := getenv.Duration("KC_DIAL_TIMEOUT", kafkaconsumer.DefaultDialTimeout) + kafkaReadTimeout := getenv.Duration("KC_READ_TIMEOUT", kafkaconsumer.DefaultReadTimeout) + kafkaWriteTimeout := getenv.Duration("KC_WRITE_TIMEOUT", kafkaconsumer.DefaultWriteTimeout) + kafkaBackoff := getenv.Duration("KC_BACKOFF", kafkaconsumer.DefaultBackoff) + kafkaMaxRetries := getenv.Int("KC_MAX_RETRIES", kafkaconsumer.DefaultMaxRetries) + + databaseURL := getenv.String("DATABASE_URL", "") + if err := getenv.Parse(); err != nil { + return fmt.Errorf("environment variable parse error: [%w]", err) + } + + logger, err := slogger.New( + slogger.WithLogLevelName(*logLevel), + ) + if err != nil { + return fmt.Errorf("logger instantiate error: [%w]", err) + } + + ctx, cancel := context.WithTimeout(context.Background(), storage.DefaultDBPingTimeout) + defer cancel() + + db, err := gitlabstorage.New( + ctx, + gitlabstorage.WithDatabaseDSN(*databaseURL), + gitlabstorage.WithLogger(logger), + ) + if err != nil { + return fmt.Errorf("gitlab storage instantiate error: [%w]", err) + } + + if err = db.Ping(ctx, storage.DefaultDBPingMaxRetries, storage.DefaultDBPingBackoff); err != nil { + return fmt.Errorf("gitlab storage ping error: [%w]", err) + } + defer func() { + logger.Info("gitlab storage - closing pgx pool") + db.Pool.Close() + }() + + kafkaGitLabConsumer, err := kafkaconsumer.New( + kafkaconsumer.WithLogger(logger), + kafkaconsumer.WithProcessMessageFunc(storeMessage(db)), + kafkaconsumer.WithKafkaBrokers(*brokersList), + kafkaconsumer.WithDialTimeout(*kafkaDialTimeout), + kafkaconsumer.WithReadTimeout(*kafkaReadTimeout), + kafkaconsumer.WithWriteTimeout(*kafkaWriteTimeout), + kafkaconsumer.WithBackoff(*kafkaBackoff), + kafkaconsumer.WithMaxRetries(*kafkaMaxRetries), + kafkaconsumer.WithTopic(*kafkaTopic), + kafkaconsumer.WithPartition(*kafkaPartition), + ) + if err != nil { + return fmt.Errorf("gitlab kafka consumer instantiate error: [%w]", err) + } + + defer func() { _ = kafkaGitLabConsumer.SaramaConsumer.Close() }() + + if err = kafkaGitLabConsumer.Consume(); err != nil { + return fmt.Errorf("gitlab kafka consumer consume error: [%w]", err) + } + + return nil +} + +func main() { + if err := Run(); err != nil { + log.Fatal(err) + } +} diff --git a/cmd/webhookserver/main.go b/cmd/webhookserver/main.go index a6c7f42..1d2e2a4 100644 --- a/cmd/webhookserver/main.go +++ b/cmd/webhookserver/main.go @@ -14,6 +14,7 @@ import ( "github.com/devchain-network/cauldron/internal/kafkacp/kafkaproducer" "github.com/devchain-network/cauldron/internal/slogger" "github.com/devchain-network/cauldron/internal/transport/http/githubwebhookhandler" + "github.com/devchain-network/cauldron/internal/transport/http/gitlabwebhookhandler" "github.com/devchain-network/cauldron/internal/transport/http/healthcheckhandler" "github.com/devchain-network/cauldron/internal/webhookserver" "github.com/valyala/fasthttp" @@ -23,6 +24,7 @@ import ( // default values. const ( kpGitHubDefaultQueueSize = 100 + kpGitLabDefaultQueueSize = 100 ) // Run runs the webhookserver. @@ -34,6 +36,7 @@ func Run() error { serverIdleTimeout := getenv.Duration("SERVER_IDLE_TIMEOUT", webhookserver.ServerDefaultIdleTimeout) githubHMACSecret := getenv.String("GITHUB_HMAC_SECRET", "") + gitlabHMACSecret := getenv.String("GITLAB_HMAC_SECRET", "") brokersList := getenv.String("KCP_BROKERS", kafkacp.DefaultKafkaBrokers) @@ -43,6 +46,7 @@ func Run() error { kafkaProducerBackoff := getenv.Duration("KP_BACKOFF", kafkaproducer.DefaultBackoff) kafkaProducerMaxRetries := getenv.Int("KP_MAX_RETRIES", kafkaproducer.DefaultMaxRetries) kafkaProducerGithubWebhookMessageQueueSize := getenv.Int("KP_GITHUB_MESSAGE_QUEUE_SIZE", kpGitHubDefaultQueueSize) + kafkaProducerGitlabWebhookMessageQueueSize := getenv.Int("KP_GITLAB_MESSAGE_QUEUE_SIZE", kpGitLabDefaultQueueSize) if err := getenv.Parse(); err != nil { return fmt.Errorf("environment variable parse error: [%w]", err) @@ -73,12 +77,14 @@ func Run() error { logger.Info("connected to kafka brokers", "addrs", *brokersList) githubWebhookMessageQueue := make(chan *sarama.ProducerMessage, *kafkaProducerGithubWebhookMessageQueueSize) + gitlabWebhookMessageQueue := make(chan *sarama.ProducerMessage, *kafkaProducerGitlabWebhookMessageQueueSize) numMessageWorkers := runtime.NumCPU() logger.Info( "number of message workers", "count", numMessageWorkers, "github webhook message queue size", *kafkaProducerGithubWebhookMessageQueueSize, + "gitlab webhook message queue size", *kafkaProducerGitlabWebhookMessageQueueSize, ) healthCheckHandler, err := healthcheckhandler.New( @@ -98,6 +104,16 @@ func Run() error { return fmt.Errorf("github webhook http handler instantiate error: [%w]", err) } + gitlabWebhookHandler, err := gitlabwebhookhandler.New( + gitlabwebhookhandler.WithLogger(logger), + gitlabwebhookhandler.WithTopic(kafkacp.KafkaTopicIdentifierGitLab.String()), + gitlabwebhookhandler.WithWebhookSecret(*gitlabHMACSecret), + gitlabwebhookhandler.WithProducerGitLabMessageQueue(gitlabWebhookMessageQueue), + ) + if err != nil { + return fmt.Errorf("gitlab webhook http handler instantiate error: [%w]", err) + } + server, err := webhookserver.New( webhookserver.WithLogger(logger), webhookserver.WithListenAddr(*listenAddr), @@ -108,6 +124,7 @@ func Run() error { webhookserver.WithKafkaBrokers(*brokersList), webhookserver.WithHTTPHandler(fasthttp.MethodGet, "/healthz", healthCheckHandler.Handle), webhookserver.WithHTTPHandler(fasthttp.MethodPost, "/v1/webhook/github", githubWebhookHandler.Handle), + webhookserver.WithHTTPHandler(fasthttp.MethodPost, "/v1/webhook/gitlab", gitlabWebhookHandler.Handle), ) if err != nil { return fmt.Errorf("api server instantiate error: [%w]", err) @@ -117,12 +134,13 @@ func Run() error { var wg sync.WaitGroup + // GitHub message workers for i := range numMessageWorkers { wg.Add(1) go func() { defer func() { wg.Done() - logger.Info("terminating worker", "id", i) + logger.Info("terminating github worker", "id", i) }() func() { @@ -146,6 +164,36 @@ func Run() error { }() } + // GitLab message workers + for i := range numMessageWorkers { + wg.Add(1) + go func() { + defer func() { + wg.Done() + logger.Info("terminating gitlab worker", "id", i) + }() + + func() { + for msg := range gitlabWebhookMessageQueue { + kafkaProducer.Input() <- msg + + select { + case success := <-kafkaProducer.Successes(): + logger.Info( + "message sent", + "worker", i, + "topic", success.Topic, + "partition", success.Partition, + "offset", success.Offset, + ) + case err := <-kafkaProducer.Errors(): + logger.Error("message send error", "error", err) + } + } + }() + }() + } + wg.Add(1) go func() { defer wg.Done() @@ -157,6 +205,7 @@ func Run() error { logger.Error("webhookserver stop error: [%w]", "error", errStop) } close(githubWebhookMessageQueue) + close(gitlabWebhookMessageQueue) close(doneChannel) }() diff --git a/docker-compose.infra.yml b/docker-compose.infra.yml index 9f311f5..c2aea3e 100644 --- a/docker-compose.infra.yml +++ b/docker-compose.infra.yml @@ -91,6 +91,7 @@ services: - "8000:8000" environment: GITHUB_HMAC_SECRET: "${GITHUB_HMAC_SECRET}" + GITLAB_HMAC_SECRET: "${GITLAB_HMAC_SECRET}" KCP_BROKERS: "kafka:9092" depends_on: - kafka @@ -114,6 +115,23 @@ services: networks: - devchain-network + gitlab-consumer: + build: + context: . + dockerfile: Dockerfile.gitlab-consumer + environment: + KC_TOPIC_GITLAB: "gitlab" + KCG_NAME: "gitlab-group" + KCP_BROKERS: "kafka:9092" + DATABASE_URL: "${DATABASE_URL_INFRA}" + depends_on: + - kafka + - cauldron + - postgresql_db + - migrator + networks: + - devchain-network + volumes: kafka_data: driver: local diff --git a/internal/storage/gitlabstorage/gitlabstorage.go b/internal/storage/gitlabstorage/gitlabstorage.go new file mode 100644 index 0000000..8fc2fee --- /dev/null +++ b/internal/storage/gitlabstorage/gitlabstorage.go @@ -0,0 +1,267 @@ +package gitlabstorage + +import ( + "context" + "fmt" + "log/slog" + "strconv" + "time" + + "github.com/IBM/sarama" + "github.com/devchain-network/cauldron/internal/cerrors" + "github.com/devchain-network/cauldron/internal/storage" + "github.com/google/uuid" + "github.com/jackc/pgx/v5/pgxpool" +) + +var ( + _ storage.Pinger = (*GitLabStorage)(nil) // compile time proof + _ storage.MessageStorer = (*GitLabStorage)(nil) // compile time proof + _ storage.PingStorer = (*GitLabStorage)(nil) // compile time proof +) + +// queries. +const ( + GitLabStoreQuery = ` + INSERT INTO gitlab ( + event_uuid, + webhook_uuid, + object_kind, + project_id, + project_path, + user_username, + user_id, + kafka_offset, + kafka_partition, + payload + ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)` +) + +// GitLab represents `gitlab` table model fields. +type GitLab struct { + Payload any + ObjectKind string + ProjectPath *string + UserUsername *string + EventUUID uuid.UUID + WebhookUUID uuid.UUID + ProjectID *int64 + UserID *int64 + KafkaOffset int64 + KafkaPartition int32 +} + +// GitLabStorage implements GitLabPingStorer interface. +type GitLabStorage struct { + Logger *slog.Logger + Pool storage.PGPooler + DatabaseDSN string +} + +func (GitLabStorage) prepareGitLabPayload(message *sarama.ConsumerMessage) (*GitLab, error) { + gitlabStorage := new(GitLab) + gitlabStorage.KafkaPartition = message.Partition + gitlabStorage.KafkaOffset = message.Offset + + messageKey := string(message.Key) + eventUUID, err := uuid.Parse(messageKey) + if err != nil { + return nil, fmt.Errorf( + "[gitlabstorage.prepareGitLabPayload] eventUUID error: ['%s' received, %w]", + messageKey, err, + ) + } + gitlabStorage.EventUUID = eventUUID + + for _, header := range message.Headers { + key := string(header.Key) + value := string(header.Value) + + switch key { + case "webhook-uuid": + webhookUUID, uuidErr := uuid.Parse(value) + if uuidErr != nil { + return nil, fmt.Errorf( + "[gitlabstorage.prepareGitLabPayload] webhookUUID error: ['%s' received, %w]", + value, uuidErr, + ) + } + gitlabStorage.WebhookUUID = webhookUUID + case "object-kind": + gitlabStorage.ObjectKind = value + case "project-id": + if value == "" || value == "0" { + break + } + projectID, parseErr := strconv.ParseInt(value, 10, 64) + if parseErr != nil { + return nil, fmt.Errorf( + "[gitlabstorage.prepareGitLabPayload] projectID error: ['%s' received, %w]", + value, parseErr, + ) + } + gitlabStorage.ProjectID = &projectID + case "project-path": + if value != "" { + gitlabStorage.ProjectPath = &value + } + case "user-username": + if value != "" { + gitlabStorage.UserUsername = &value + } + case "user-id": + if value == "" || value == "0" { + break + } + userID, parseErr := strconv.ParseInt(value, 10, 64) + if parseErr != nil { + return nil, fmt.Errorf( + "[gitlabstorage.prepareGitLabPayload] userID error: ['%s' received, %w]", + value, parseErr, + ) + } + gitlabStorage.UserID = &userID + } + } + + gitlabStorage.Payload = message.Value + + return gitlabStorage, nil +} + +// Ping pings database and makes sure db communication is ok. +func (s GitLabStorage) Ping(ctx context.Context, maxRetries uint8, backoff time.Duration) error { + var pingErr error + + for i := range maxRetries { + pingErr = s.Pool.Ping(ctx) + if pingErr == nil { + s.Logger.Info("successfully pinged the database") + + break + } + + s.Logger.Error( + "can not ping the database", + "error", pingErr, + "retry", fmt.Sprintf("%d/%d", i, maxRetries), + "backoff", backoff.String(), + ) + time.Sleep(backoff) + backoff *= 2 + } + + if pingErr != nil { + return fmt.Errorf("[gitlabstorage.Ping] error: [%w]", pingErr) + } + + return nil +} + +// MessageStore stores received kafka message to database. +func (s GitLabStorage) MessageStore(ctx context.Context, message *sarama.ConsumerMessage) error { + payload, err := s.prepareGitLabPayload(message) + if err != nil { + return fmt.Errorf("[gitlabstorage.MessageStore] payload error: [%w]", err) + } + + _, err = s.Pool.Exec( + ctx, + GitLabStoreQuery, + payload.EventUUID, + payload.WebhookUUID, + payload.ObjectKind, + payload.ProjectID, + payload.ProjectPath, + payload.UserUsername, + payload.UserID, + payload.KafkaOffset, + payload.KafkaPartition, + payload.Payload, + ) + if err != nil { + return fmt.Errorf("[gitlabstorage.MessageStore][Pool.Exec] error: [%w]", err) + } + + return nil +} + +func (s GitLabStorage) checkRequired() error { + if s.Logger == nil { + return fmt.Errorf( + "[gitlabstorage.checkRequired] Logger error: [%w, 'nil' received]", + cerrors.ErrValueRequired, + ) + } + + if s.DatabaseDSN == "" { + return fmt.Errorf( + "[gitlabstorage.checkRequired] DatabaseDSN error: [%w, empty string received]", + cerrors.ErrValueRequired, + ) + } + + return nil +} + +// Option represents option function type. +type Option func(*GitLabStorage) error + +// WithLogger sets logger. +func WithLogger(l *slog.Logger) Option { + return func(s *GitLabStorage) error { + if l == nil { + return fmt.Errorf( + "[gitlabstorage.WithLogger] error: [%w, 'nil' received]", + cerrors.ErrValueRequired, + ) + } + s.Logger = l + + return nil + } +} + +// WithDatabaseDSN sets database data source. +func WithDatabaseDSN(dsn string) Option { + return func(s *GitLabStorage) error { + if dsn == "" { + return fmt.Errorf( + "[gitlabstorage.WithDatabaseDSN] error: [%w, empty string received]", + cerrors.ErrValueRequired, + ) + } + s.DatabaseDSN = dsn + + return nil + } +} + +// New instantiates new gitlab storage. +func New(ctx context.Context, options ...Option) (*GitLabStorage, error) { + gitlabStorage := new(GitLabStorage) + + for _, option := range options { + if err := option(gitlabStorage); err != nil { + return nil, err + } + } + + if err := gitlabStorage.checkRequired(); err != nil { + return nil, err + } + + config, err := pgxpool.ParseConfig(gitlabStorage.DatabaseDSN) + if err != nil { + return nil, fmt.Errorf("[gitlabstorage.New][pgxpool.ParseConfig] error: [%w]", err) + } + + pool, err := pgxpool.NewWithConfig(ctx, config) + if err != nil { + return nil, fmt.Errorf("[gitlabstorage.New][pgxpool.NewWithConfig] error: [%w]", err) + } + + gitlabStorage.Pool = pool + + return gitlabStorage, nil +} diff --git a/internal/storage/gitlabstorage/gitlabstorage_test.go b/internal/storage/gitlabstorage/gitlabstorage_test.go new file mode 100644 index 0000000..9bcdd8d --- /dev/null +++ b/internal/storage/gitlabstorage/gitlabstorage_test.go @@ -0,0 +1,518 @@ +package gitlabstorage_test + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/IBM/sarama" + "github.com/devchain-network/cauldron/internal/cerrors" + "github.com/devchain-network/cauldron/internal/slogger/mockslogger" + "github.com/devchain-network/cauldron/internal/storage" + "github.com/devchain-network/cauldron/internal/storage/gitlabstorage" + "github.com/google/uuid" + "github.com/jackc/pgx/v5/pgconn" + "github.com/jackc/pgx/v5/pgxpool" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" +) + +type MockPGPooler struct { + mock.Mock +} + +func (m *MockPGPooler) Ping(ctx context.Context) error { + args := m.Called(ctx) + return args.Error(0) +} + +func (m *MockPGPooler) Exec(ctx context.Context, sql string, arguments ...any) (pgconn.CommandTag, error) { + args := m.Called(ctx, sql, arguments) + return args.Get(0).(pgconn.CommandTag), args.Error(1) +} + +func (m *MockPGPooler) Close() { + m.Called() +} + +func (m *MockPGPooler) Acquire(ctx context.Context) (*pgxpool.Conn, error) { + args := m.Called(ctx) + return args.Get(0).(*pgxpool.Conn), args.Error(1) +} + +func (m *MockPGPooler) AcquireAllIdle(ctx context.Context) ([]*pgxpool.Conn, error) { + args := m.Called(ctx) + return args.Get(0).([]*pgxpool.Conn), args.Error(1) +} + +func (m *MockPGPooler) AcquireFunc(ctx context.Context, f func(*pgxpool.Conn) error) error { + args := m.Called(ctx, f) + return args.Error(0) +} + +func TestNew_NoLogger(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), storage.DefaultDBPingTimeout) + defer cancel() + + db, err := gitlabstorage.New( + ctx, + ) + + assert.ErrorIs(t, err, cerrors.ErrValueRequired) + assert.Nil(t, db) +} + +func TestNew_NilLogger(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), storage.DefaultDBPingTimeout) + defer cancel() + + db, err := gitlabstorage.New( + ctx, + gitlabstorage.WithLogger(nil), + ) + + assert.ErrorIs(t, err, cerrors.ErrValueRequired) + assert.Nil(t, db) +} + +func TestNew_NoDSN(t *testing.T) { + logger := mockslogger.New() + + ctx, cancel := context.WithTimeout(context.Background(), storage.DefaultDBPingTimeout) + defer cancel() + + db, err := gitlabstorage.New( + ctx, + gitlabstorage.WithLogger(logger), + ) + + assert.ErrorIs(t, err, cerrors.ErrValueRequired) + assert.Nil(t, db) +} + +func TestNew_EmptyDSN(t *testing.T) { + logger := mockslogger.New() + + ctx, cancel := context.WithTimeout(context.Background(), storage.DefaultDBPingTimeout) + defer cancel() + + db, err := gitlabstorage.New( + ctx, + gitlabstorage.WithLogger(logger), + gitlabstorage.WithDatabaseDSN(""), + ) + + assert.ErrorIs(t, err, cerrors.ErrValueRequired) + assert.Nil(t, db) +} + +func TestNew_InvalidDSN(t *testing.T) { + logger := mockslogger.New() + + ctx, cancel := context.WithTimeout(context.Background(), storage.DefaultDBPingTimeout) + defer cancel() + + db, err := gitlabstorage.New( + ctx, + gitlabstorage.WithLogger(logger), + gitlabstorage.WithDatabaseDSN("foo:://bar"), + ) + + var parseErr *pgconn.ParseConfigError + assert.ErrorAs(t, err, &parseErr) + assert.Nil(t, db) +} + +func TestNew_Success(t *testing.T) { + logger := mockslogger.New() + + ctx, cancel := context.WithTimeout(context.Background(), storage.DefaultDBPingTimeout) + defer cancel() + + db, err := gitlabstorage.New( + ctx, + gitlabstorage.WithLogger(logger), + gitlabstorage.WithDatabaseDSN( + "postgres://foo:bar@localhost:5432/fake?sslmode=disable", + ), + ) + + assert.NoError(t, err) + assert.NotNil(t, db) +} + +func TestPing_Fail(t *testing.T) { + logger := mockslogger.New() + + ctx, cancel := context.WithTimeout(context.Background(), storage.DefaultDBPingTimeout) + defer cancel() + + fakeRetries := 3 + mockPool := new(MockPGPooler) + mockPool.On("Ping", ctx).Return(errors.New("ping failed")).Times(fakeRetries) + + db, err := gitlabstorage.New( + ctx, + gitlabstorage.WithLogger(logger), + gitlabstorage.WithDatabaseDSN( + "postgres://foo:bar@localhost:5432/fake?sslmode=disable", + ), + ) + + assert.NoError(t, err) + assert.NotNil(t, db) + db.Pool = mockPool + + err = db.Ping(ctx, uint8(fakeRetries), time.Millisecond*10) + assert.Error(t, err) + mockPool.AssertExpectations(t) +} + +func TestPing_Success(t *testing.T) { + logger := mockslogger.New() + + ctx, cancel := context.WithTimeout(context.Background(), storage.DefaultDBPingTimeout) + defer cancel() + + mockPool := new(MockPGPooler) + mockPool.On("Ping", ctx).Return(nil) + + db, err := gitlabstorage.New( + ctx, + gitlabstorage.WithLogger(logger), + gitlabstorage.WithDatabaseDSN( + "postgres://foo:bar@localhost:5432/fake?sslmode=disable", + ), + ) + + assert.NoError(t, err) + assert.NotNil(t, db) + db.Pool = mockPool + + err = db.Ping(ctx, 3, time.Millisecond*10) + assert.NoError(t, err) + mockPool.AssertExpectations(t) +} + +func TestStore_Fail_EmptyMessage(t *testing.T) { + logger := mockslogger.New() + + ctx, cancel := context.WithTimeout(context.Background(), storage.DefaultDBPingTimeout) + defer cancel() + + mockPool := new(MockPGPooler) + + db, err := gitlabstorage.New( + ctx, + gitlabstorage.WithLogger(logger), + gitlabstorage.WithDatabaseDSN( + "postgres://foo:bar@localhost:5432/fake?sslmode=disable", + ), + ) + + assert.NoError(t, err) + assert.NotNil(t, db) + db.Pool = mockPool + + message := &sarama.ConsumerMessage{} + + err = db.MessageStore(ctx, message) + assert.Error(t, err) +} + +func TestStore_Fail_Message_InvalidWebhookUUID(t *testing.T) { + logger := mockslogger.New() + + ctx, cancel := context.WithTimeout(context.Background(), storage.DefaultDBPingTimeout) + defer cancel() + + mockPool := new(MockPGPooler) + db, err := gitlabstorage.New( + ctx, + gitlabstorage.WithLogger(logger), + gitlabstorage.WithDatabaseDSN( + "postgres://foo:bar@localhost:5432/fake?sslmode=disable", + ), + ) + + assert.NoError(t, err) + assert.NotNil(t, db) + db.Pool = mockPool + + uuidKey := uuid.New() + + message := &sarama.ConsumerMessage{ + Topic: "gitlab", + Key: []byte(uuidKey.String()), + Value: []byte(`{"hello": "world"}`), + Headers: []*sarama.RecordHeader{ + {Key: []byte("webhook-uuid"), Value: []byte(`invalid-uuid`)}, + {Key: []byte("object-kind"), Value: []byte(`push`)}, + }, + } + + err = db.MessageStore(ctx, message) + assert.Error(t, err) +} + +func TestStore_Fail_Message_InvalidProjectID(t *testing.T) { + logger := mockslogger.New() + + ctx, cancel := context.WithTimeout(context.Background(), storage.DefaultDBPingTimeout) + defer cancel() + + mockPool := new(MockPGPooler) + db, err := gitlabstorage.New( + ctx, + gitlabstorage.WithLogger(logger), + gitlabstorage.WithDatabaseDSN( + "postgres://foo:bar@localhost:5432/fake?sslmode=disable", + ), + ) + + assert.NoError(t, err) + assert.NotNil(t, db) + db.Pool = mockPool + + uuidKey := uuid.New() + webhookUUID := uuid.New() + + message := &sarama.ConsumerMessage{ + Topic: "gitlab", + Key: []byte(uuidKey.String()), + Value: []byte(`{"hello": "world"}`), + Headers: []*sarama.RecordHeader{ + {Key: []byte("webhook-uuid"), Value: []byte(webhookUUID.String())}, + {Key: []byte("object-kind"), Value: []byte(`push`)}, + {Key: []byte("project-id"), Value: []byte(`invalid`)}, + }, + } + + err = db.MessageStore(ctx, message) + assert.Error(t, err) +} + +func TestStore_Fail_Message_InvalidUserID(t *testing.T) { + logger := mockslogger.New() + + ctx, cancel := context.WithTimeout(context.Background(), storage.DefaultDBPingTimeout) + defer cancel() + + mockPool := new(MockPGPooler) + + db, err := gitlabstorage.New( + ctx, + gitlabstorage.WithLogger(logger), + gitlabstorage.WithDatabaseDSN( + "postgres://foo:bar@localhost:5432/fake?sslmode=disable", + ), + ) + + assert.NoError(t, err) + assert.NotNil(t, db) + db.Pool = mockPool + + uuidKey := uuid.New() + webhookUUID := uuid.New() + + message := &sarama.ConsumerMessage{ + Topic: "gitlab", + Key: []byte(uuidKey.String()), + Value: []byte(`{"hello": "world"}`), + Headers: []*sarama.RecordHeader{ + {Key: []byte("webhook-uuid"), Value: []byte(webhookUUID.String())}, + {Key: []byte("object-kind"), Value: []byte(`push`)}, + {Key: []byte("project-id"), Value: []byte(`123`)}, + {Key: []byte("project-path"), Value: []byte(`vigo/test`)}, + {Key: []byte("user-username"), Value: []byte(`vigo`)}, + {Key: []byte("user-id"), Value: []byte(`invalid`)}, + }, + } + + err = db.MessageStore(ctx, message) + assert.Error(t, err) +} + +func TestStore_Insert_Error(t *testing.T) { + logger := mockslogger.New() + + ctx, cancel := context.WithTimeout(context.Background(), storage.DefaultDBPingTimeout) + defer cancel() + + mockPool := new(MockPGPooler) + mockPool.On("Exec", + mock.Anything, + mock.Anything, + mock.Anything, + ).Return(pgconn.CommandTag{}, errors.New("insert error")) + + db, err := gitlabstorage.New( + ctx, + gitlabstorage.WithLogger(logger), + gitlabstorage.WithDatabaseDSN( + "postgres://foo:bar@localhost:5432/fake?sslmode=disable", + ), + ) + + assert.NoError(t, err) + assert.NotNil(t, db) + db.Pool = mockPool + + uuidKey := uuid.New() + webhookUUID := uuid.New() + + message := &sarama.ConsumerMessage{ + Topic: "gitlab", + Key: []byte(uuidKey.String()), + Value: []byte(`{"hello": "world"}`), + Headers: []*sarama.RecordHeader{ + {Key: []byte("webhook-uuid"), Value: []byte(webhookUUID.String())}, + {Key: []byte("object-kind"), Value: []byte(`push`)}, + {Key: []byte("project-id"), Value: []byte(`123`)}, + {Key: []byte("project-path"), Value: []byte(`vigo/test`)}, + {Key: []byte("user-username"), Value: []byte(`vigo`)}, + {Key: []byte("user-id"), Value: []byte(`456`)}, + }, + } + + err = db.MessageStore(ctx, message) + assert.Error(t, err) + mockPool.AssertExpectations(t) +} + +func TestStore_Success(t *testing.T) { + logger := mockslogger.New() + + ctx, cancel := context.WithTimeout(context.Background(), storage.DefaultDBPingTimeout) + defer cancel() + + mockPool := new(MockPGPooler) + mockPool.On("Exec", + mock.Anything, + mock.Anything, + mock.Anything, + ).Return(pgconn.CommandTag{}, nil) + + db, err := gitlabstorage.New( + ctx, + gitlabstorage.WithLogger(logger), + gitlabstorage.WithDatabaseDSN( + "postgres://foo:bar@localhost:5432/fake?sslmode=disable", + ), + ) + + assert.NoError(t, err) + assert.NotNil(t, db) + db.Pool = mockPool + + uuidKey := uuid.New() + webhookUUID := uuid.New() + + message := &sarama.ConsumerMessage{ + Topic: "gitlab", + Key: []byte(uuidKey.String()), + Value: []byte(`{"hello": "world"}`), + Headers: []*sarama.RecordHeader{ + {Key: []byte("webhook-uuid"), Value: []byte(webhookUUID.String())}, + {Key: []byte("object-kind"), Value: []byte(`push`)}, + {Key: []byte("project-id"), Value: []byte(`123`)}, + {Key: []byte("project-path"), Value: []byte(`vigo/test`)}, + {Key: []byte("user-username"), Value: []byte(`vigo`)}, + {Key: []byte("user-id"), Value: []byte(`456`)}, + }, + } + + err = db.MessageStore(ctx, message) + assert.NoError(t, err) + mockPool.AssertExpectations(t) +} + +func TestStore_Success_WithNullableFields(t *testing.T) { + logger := mockslogger.New() + + ctx, cancel := context.WithTimeout(context.Background(), storage.DefaultDBPingTimeout) + defer cancel() + + mockPool := new(MockPGPooler) + mockPool.On("Exec", + mock.Anything, + mock.Anything, + mock.Anything, + ).Return(pgconn.CommandTag{}, nil) + + db, err := gitlabstorage.New( + ctx, + gitlabstorage.WithLogger(logger), + gitlabstorage.WithDatabaseDSN( + "postgres://foo:bar@localhost:5432/fake?sslmode=disable", + ), + ) + + assert.NoError(t, err) + assert.NotNil(t, db) + db.Pool = mockPool + + uuidKey := uuid.New() + webhookUUID := uuid.New() + + // Milestone event - no user, no project + message := &sarama.ConsumerMessage{ + Topic: "gitlab", + Key: []byte(uuidKey.String()), + Value: []byte(`{"object_kind": "milestone"}`), + Headers: []*sarama.RecordHeader{ + {Key: []byte("webhook-uuid"), Value: []byte(webhookUUID.String())}, + {Key: []byte("object-kind"), Value: []byte(`milestone`)}, + }, + } + + err = db.MessageStore(ctx, message) + assert.NoError(t, err) + mockPool.AssertExpectations(t) +} + +func TestStore_Success_SkipsEmptyProjectID(t *testing.T) { + logger := mockslogger.New() + + ctx, cancel := context.WithTimeout(context.Background(), storage.DefaultDBPingTimeout) + defer cancel() + + mockPool := new(MockPGPooler) + mockPool.On("Exec", + mock.Anything, + mock.Anything, + mock.Anything, + ).Return(pgconn.CommandTag{}, nil) + + db, err := gitlabstorage.New( + ctx, + gitlabstorage.WithLogger(logger), + gitlabstorage.WithDatabaseDSN( + "postgres://foo:bar@localhost:5432/fake?sslmode=disable", + ), + ) + + assert.NoError(t, err) + assert.NotNil(t, db) + db.Pool = mockPool + + uuidKey := uuid.New() + webhookUUID := uuid.New() + + message := &sarama.ConsumerMessage{ + Topic: "gitlab", + Key: []byte(uuidKey.String()), + Value: []byte(`{"hello": "world"}`), + Headers: []*sarama.RecordHeader{ + {Key: []byte("webhook-uuid"), Value: []byte(webhookUUID.String())}, + {Key: []byte("object-kind"), Value: []byte(`push`)}, + {Key: []byte("project-id"), Value: []byte(``)}, // empty, should skip + {Key: []byte("project-id"), Value: []byte(`0`)}, // zero, should skip + }, + } + + err = db.MessageStore(ctx, message) + assert.NoError(t, err) + mockPool.AssertExpectations(t) +} diff --git a/internal/transport/http/gitlabwebhookhandler/gitlabwebhookhandler.go b/internal/transport/http/gitlabwebhookhandler/gitlabwebhookhandler.go new file mode 100644 index 0000000..cb0de5e --- /dev/null +++ b/internal/transport/http/gitlabwebhookhandler/gitlabwebhookhandler.go @@ -0,0 +1,269 @@ +package gitlabwebhookhandler + +import ( + "fmt" + "log/slog" + "strconv" + + "github.com/IBM/sarama" + "github.com/buger/jsonparser" + "github.com/devchain-network/cauldron/internal/cerrors" + "github.com/devchain-network/cauldron/internal/kafkacp" + "github.com/devchain-network/cauldron/internal/transport/http/httphandler" + "github.com/valyala/fasthttp" +) + +var _ GitLabWebhookHandler = (*Handler)(nil) // compile time proof + +const ( + missingHTTPHandlerHeaderText = "header" + missingHTTPHandlerErrorMessage = "missing http header" +) + +// GitLabWebhookHandler defines http handler behaviours. +type GitLabWebhookHandler interface { + httphandler.FastHTTPHandler +} + +// Handler represents http handler configuration and must satisfy GitLabWebhookHandler interface. +type Handler struct { + MessageQueue chan *sarama.ProducerMessage + Logger *slog.Logger + Topic kafkacp.KafkaTopicIdentifier + Secret string +} + +// Handle is a fasthttp handler function. +func (h Handler) Handle(ctx *fasthttp.RequestCtx) { + if len(ctx.PostBody()) == 0 { + h.Logger.Error("empty post body", "length", len(ctx.PostBody())) + ctx.SetStatusCode(fasthttp.StatusBadRequest) + + return + } + + // GitLab uses plain text token comparison (not HMAC) + gitlabToken := string(ctx.Request.Header.Peek("X-Gitlab-Token")) + if gitlabToken != h.Secret { + h.Logger.Error("invalid gitlab token") + ctx.SetStatusCode(fasthttp.StatusBadRequest) + + return + } + + gitlabEventUUID := ctx.Request.Header.Peek("X-Gitlab-Event-Uuid") + if len(gitlabEventUUID) == 0 { + h.Logger.Error(missingHTTPHandlerErrorMessage, missingHTTPHandlerHeaderText, "X-Gitlab-Event-Uuid") + ctx.SetStatusCode(fasthttp.StatusBadRequest) + + return + } + + gitlabWebhookUUID := ctx.Request.Header.Peek("X-Gitlab-Webhook-Uuid") + if len(gitlabWebhookUUID) == 0 { + h.Logger.Error(missingHTTPHandlerErrorMessage, missingHTTPHandlerHeaderText, "X-Gitlab-Webhook-Uuid") + ctx.SetStatusCode(fasthttp.StatusBadRequest) + + return + } + + // object_kind or event_name - one of them is required + objectKind, err := jsonparser.GetString(ctx.PostBody(), "object_kind") + if err != nil { + // fallback to event_name for premium/ultimate hooks (group, project, subgroup events) + objectKind, err = jsonparser.GetString(ctx.PostBody(), "event_name") + if err != nil { + h.Logger.Error("objectKind/eventName jsonparser.GetString error", "error", err) + ctx.SetStatusCode(fasthttp.StatusBadRequest) + + return + } + } + + // project.id - try nested first, fallback to top-level (for premium events like project_create) + projectID, err := jsonparser.GetInt(ctx.PostBody(), "project", "id") + if err != nil { + projectID, _ = jsonparser.GetInt(ctx.PostBody(), "project_id") + } + + // project.path_with_namespace - try nested first, fallback for premium events + projectPath, err := jsonparser.GetString(ctx.PostBody(), "project", "path_with_namespace") + if err != nil { + // try top-level path_with_namespace (project_create) + projectPath, err = jsonparser.GetString(ctx.PostBody(), "path_with_namespace") + if err != nil { + // try full_path (subgroup_create) + projectPath, err = jsonparser.GetString(ctx.PostBody(), "full_path") + if err != nil { + // try group_path (user_add_to_group) + projectPath, _ = jsonparser.GetString(ctx.PostBody(), "group_path") + } + } + } + + // user info - try nested first (user.id, user.username), fallback to flat (user_id, user_username) + userID, err := jsonparser.GetInt(ctx.PostBody(), "user", "id") + if err != nil { + userID, _ = jsonparser.GetInt(ctx.PostBody(), "user_id") + } + + userUsername, err := jsonparser.GetString(ctx.PostBody(), "user", "username") + if err != nil { + userUsername, _ = jsonparser.GetString(ctx.PostBody(), "user_username") + } + + h.Logger.Info("received gitlab webhook", + "objectKind", objectKind, + "userUsername", userUsername, + "userID", userID, + "projectPath", projectPath, + ) + + message := &sarama.ProducerMessage{ + Topic: h.Topic.String(), + Key: sarama.StringEncoder(gitlabEventUUID), + Value: sarama.ByteEncoder(ctx.PostBody()), + Headers: []sarama.RecordHeader{ + {Key: []byte("event-uuid"), Value: gitlabEventUUID}, + {Key: []byte("webhook-uuid"), Value: gitlabWebhookUUID}, + {Key: []byte("object-kind"), Value: []byte(objectKind)}, + {Key: []byte("project-id"), Value: []byte(strconv.FormatInt(projectID, 10))}, + {Key: []byte("project-path"), Value: []byte(projectPath)}, + {Key: []byte("user-username"), Value: []byte(userUsername)}, + {Key: []byte("user-id"), Value: []byte(strconv.FormatInt(userID, 10))}, + }, + } + + go func() { + select { + case h.MessageQueue <- message: + h.Logger.Info( + "kafka message queued for processing", + "key", string(gitlabEventUUID), + "topic", h.Topic, + ) + case <-ctx.Done(): + h.Logger.Info("received context Done") + default: + h.Logger.Error( + "kafka message queue full, dropping message", + "key", string(gitlabEventUUID), + "topic", h.Topic, + ) + } + }() + + h.Logger.Info("gitlab webhook received successfully") + ctx.SetStatusCode(fasthttp.StatusAccepted) +} + +func (h Handler) checkRequired() error { + if h.Logger == nil { + return fmt.Errorf( + "[gitlabwebhookhandler.checkRequired] Logger error: [%w, 'nil' received]", + cerrors.ErrValueRequired, + ) + } + if !h.Topic.Valid() { + return fmt.Errorf( + "[gitlabwebhookhandler.checkRequired] Topic error: [%w, '%s' received]", + cerrors.ErrInvalid, h.Topic, + ) + } + if h.Secret == "" { + return fmt.Errorf( + "[gitlabwebhookhandler.checkRequired] Secret error: [%w, empty string received]", + cerrors.ErrValueRequired, + ) + } + if h.MessageQueue == nil { + return fmt.Errorf( + "[gitlabwebhookhandler.checkRequired] MessageQueue error: [%w, 'nil' received]", + cerrors.ErrValueRequired, + ) + } + + return nil +} + +// Option represents option function type. +type Option func(*Handler) error + +// WithLogger sets logger. +func WithLogger(l *slog.Logger) Option { + return func(h *Handler) error { + if l == nil { + return fmt.Errorf( + "[gitlabwebhookhandler.WithLogger] error: [%w, 'nil' received]", + cerrors.ErrValueRequired, + ) + } + h.Logger = l + + return nil + } +} + +// WithTopic sets topic name to consume. +func WithTopic(s string) Option { + return func(h *Handler) error { + topic := kafkacp.KafkaTopicIdentifier(s) + if !topic.Valid() { + return fmt.Errorf( + "[gitlabwebhookhandler.WithTopic] error: [%w, '%s' received]", + cerrors.ErrInvalid, s, + ) + } + h.Topic = topic + + return nil + } +} + +// WithWebhookSecret sets gitlab webhook secret. +func WithWebhookSecret(s string) Option { + return func(h *Handler) error { + if s == "" { + return fmt.Errorf( + "[gitlabwebhookhandler.WithWebhookSecret] error: [%w, empty string received]", + cerrors.ErrValueRequired, + ) + } + + h.Secret = s + + return nil + } +} + +// WithProducerGitLabMessageQueue sets kafka producer message queue for gitlab webhooks. +func WithProducerGitLabMessageQueue(mq chan *sarama.ProducerMessage) Option { + return func(h *Handler) error { + if mq == nil { + return fmt.Errorf( + "[gitlabwebhookhandler.WithProducerGitLabMessageQueue] error: [%w, 'nil' received]", + cerrors.ErrValueRequired, + ) + } + h.MessageQueue = mq + + return nil + } +} + +// New instantiates new handler instance. +func New(options ...Option) (*Handler, error) { + handler := new(Handler) + + for _, option := range options { + if err := option(handler); err != nil { + return nil, err + } + } + + if err := handler.checkRequired(); err != nil { + return nil, err + } + + return handler, nil +} diff --git a/internal/transport/http/gitlabwebhookhandler/gitlabwebhookhandler_test.go b/internal/transport/http/gitlabwebhookhandler/gitlabwebhookhandler_test.go new file mode 100644 index 0000000..31eccc4 --- /dev/null +++ b/internal/transport/http/gitlabwebhookhandler/gitlabwebhookhandler_test.go @@ -0,0 +1,504 @@ +package gitlabwebhookhandler_test + +import ( + "testing" + "time" + + "github.com/IBM/sarama" + "github.com/devchain-network/cauldron/internal/cerrors" + "github.com/devchain-network/cauldron/internal/kafkacp" + "github.com/devchain-network/cauldron/internal/slogger/mockslogger" + "github.com/devchain-network/cauldron/internal/transport/http/gitlabwebhookhandler" + "github.com/google/uuid" + "github.com/stretchr/testify/assert" + "github.com/valyala/fasthttp" +) + +func TestNew_NoLogger(t *testing.T) { + handler, err := gitlabwebhookhandler.New() + + assert.ErrorIs(t, err, cerrors.ErrValueRequired) + assert.Nil(t, handler) +} + +func TestNew_NilLogger(t *testing.T) { + handler, err := gitlabwebhookhandler.New( + gitlabwebhookhandler.WithLogger(nil), + ) + + assert.ErrorIs(t, err, cerrors.ErrValueRequired) + assert.Nil(t, handler) +} + +func TestNew_NoTopic(t *testing.T) { + logger := mockslogger.New() + + handler, err := gitlabwebhookhandler.New( + gitlabwebhookhandler.WithLogger(logger), + ) + + assert.ErrorIs(t, err, cerrors.ErrInvalid) + assert.Nil(t, handler) +} + +func TestNew_InvalidTopic(t *testing.T) { + logger := mockslogger.New() + + handler, err := gitlabwebhookhandler.New( + gitlabwebhookhandler.WithLogger(logger), + gitlabwebhookhandler.WithTopic("foo"), + ) + + assert.ErrorIs(t, err, cerrors.ErrInvalid) + assert.Nil(t, handler) +} + +func TestNew_NoSecret(t *testing.T) { + logger := mockslogger.New() + + handler, err := gitlabwebhookhandler.New( + gitlabwebhookhandler.WithLogger(logger), + gitlabwebhookhandler.WithTopic(kafkacp.KafkaTopicIdentifierGitLab.String()), + ) + + assert.ErrorIs(t, err, cerrors.ErrValueRequired) + assert.Nil(t, handler) +} + +func TestNew_EmptySecret(t *testing.T) { + logger := mockslogger.New() + + handler, err := gitlabwebhookhandler.New( + gitlabwebhookhandler.WithLogger(logger), + gitlabwebhookhandler.WithTopic(kafkacp.KafkaTopicIdentifierGitLab.String()), + gitlabwebhookhandler.WithWebhookSecret(""), + ) + + assert.ErrorIs(t, err, cerrors.ErrValueRequired) + assert.Nil(t, handler) +} + +func TestNew_NoMessageQueue(t *testing.T) { + logger := mockslogger.New() + + handler, err := gitlabwebhookhandler.New( + gitlabwebhookhandler.WithLogger(logger), + gitlabwebhookhandler.WithTopic(kafkacp.KafkaTopicIdentifierGitLab.String()), + gitlabwebhookhandler.WithWebhookSecret("my-secret"), + ) + + assert.ErrorIs(t, err, cerrors.ErrValueRequired) + assert.Nil(t, handler) +} + +func TestNew_NilMessageQueue(t *testing.T) { + logger := mockslogger.New() + + handler, err := gitlabwebhookhandler.New( + gitlabwebhookhandler.WithLogger(logger), + gitlabwebhookhandler.WithTopic(kafkacp.KafkaTopicIdentifierGitLab.String()), + gitlabwebhookhandler.WithWebhookSecret("my-secret"), + gitlabwebhookhandler.WithProducerGitLabMessageQueue(nil), + ) + + assert.ErrorIs(t, err, cerrors.ErrValueRequired) + assert.Nil(t, handler) +} + +func TestNew_Success(t *testing.T) { + logger := mockslogger.New() + messageQueue := make(chan *sarama.ProducerMessage, 10) + + handler, err := gitlabwebhookhandler.New( + gitlabwebhookhandler.WithLogger(logger), + gitlabwebhookhandler.WithTopic(kafkacp.KafkaTopicIdentifierGitLab.String()), + gitlabwebhookhandler.WithWebhookSecret("my-secret"), + gitlabwebhookhandler.WithProducerGitLabMessageQueue(messageQueue), + ) + + assert.NoError(t, err) + assert.NotNil(t, handler) +} + +func TestHandle_NoBody(t *testing.T) { + logger := mockslogger.New() + messageQueue := make(chan *sarama.ProducerMessage, 10) + + handler, err := gitlabwebhookhandler.New( + gitlabwebhookhandler.WithLogger(logger), + gitlabwebhookhandler.WithTopic(kafkacp.KafkaTopicIdentifierGitLab.String()), + gitlabwebhookhandler.WithWebhookSecret("my-secret"), + gitlabwebhookhandler.WithProducerGitLabMessageQueue(messageQueue), + ) + + assert.NoError(t, err) + assert.NotNil(t, handler) + + ctx := &fasthttp.RequestCtx{} + handler.Handle(ctx) + + assert.Equal(t, fasthttp.StatusBadRequest, ctx.Response.StatusCode()) +} + +func TestHandle_NoToken(t *testing.T) { + logger := mockslogger.New() + messageQueue := make(chan *sarama.ProducerMessage, 10) + + handler, err := gitlabwebhookhandler.New( + gitlabwebhookhandler.WithLogger(logger), + gitlabwebhookhandler.WithTopic(kafkacp.KafkaTopicIdentifierGitLab.String()), + gitlabwebhookhandler.WithWebhookSecret("my-secret"), + gitlabwebhookhandler.WithProducerGitLabMessageQueue(messageQueue), + ) + + assert.NoError(t, err) + assert.NotNil(t, handler) + + ctx := &fasthttp.RequestCtx{} + ctx.Request.SetBodyString(`{"object_kind": "push"}`) + handler.Handle(ctx) + + assert.Equal(t, fasthttp.StatusBadRequest, ctx.Response.StatusCode()) +} + +func TestHandle_InvalidToken(t *testing.T) { + logger := mockslogger.New() + messageQueue := make(chan *sarama.ProducerMessage, 10) + + handler, err := gitlabwebhookhandler.New( + gitlabwebhookhandler.WithLogger(logger), + gitlabwebhookhandler.WithTopic(kafkacp.KafkaTopicIdentifierGitLab.String()), + gitlabwebhookhandler.WithWebhookSecret("my-secret"), + gitlabwebhookhandler.WithProducerGitLabMessageQueue(messageQueue), + ) + + assert.NoError(t, err) + assert.NotNil(t, handler) + + ctx := &fasthttp.RequestCtx{} + ctx.Request.SetBodyString(`{"object_kind": "push"}`) + ctx.Request.Header.Set("X-Gitlab-Token", "wrong-secret") + handler.Handle(ctx) + + assert.Equal(t, fasthttp.StatusBadRequest, ctx.Response.StatusCode()) +} + +func newMockRequestCtx() *fasthttp.RequestCtx { + var ctx fasthttp.RequestCtx + ctx.Init(&fasthttp.Request{}, nil, nil) + + return &ctx +} + +func TestHandle_NoEventUUID(t *testing.T) { + logger := mockslogger.New() + messageQueue := make(chan *sarama.ProducerMessage, 10) + + handler, err := gitlabwebhookhandler.New( + gitlabwebhookhandler.WithLogger(logger), + gitlabwebhookhandler.WithTopic(kafkacp.KafkaTopicIdentifierGitLab.String()), + gitlabwebhookhandler.WithWebhookSecret("my-secret"), + gitlabwebhookhandler.WithProducerGitLabMessageQueue(messageQueue), + ) + + assert.NoError(t, err) + assert.NotNil(t, handler) + + ctx := newMockRequestCtx() + ctx.Request.SetBodyString(`{"object_kind": "push"}`) + ctx.Request.Header.Set("X-Gitlab-Token", "my-secret") + + done := make(chan bool) + go func() { + select { + case msg := <-messageQueue: + t.Errorf("unexpected message in queue: %v", msg) + case <-time.After(100 * time.Millisecond): + done <- true + } + }() + + handler.Handle(ctx) + + assert.Equal(t, fasthttp.StatusBadRequest, ctx.Response.StatusCode()) + <-done +} + +func TestHandle_NoWebhookUUID(t *testing.T) { + logger := mockslogger.New() + messageQueue := make(chan *sarama.ProducerMessage, 10) + + handler, err := gitlabwebhookhandler.New( + gitlabwebhookhandler.WithLogger(logger), + gitlabwebhookhandler.WithTopic(kafkacp.KafkaTopicIdentifierGitLab.String()), + gitlabwebhookhandler.WithWebhookSecret("my-secret"), + gitlabwebhookhandler.WithProducerGitLabMessageQueue(messageQueue), + ) + + assert.NoError(t, err) + assert.NotNil(t, handler) + + ctx := newMockRequestCtx() + ctx.Request.SetBodyString(`{"object_kind": "push"}`) + ctx.Request.Header.Set("X-Gitlab-Token", "my-secret") + ctx.Request.Header.Set("X-Gitlab-Event-Uuid", uuid.New().String()) + + done := make(chan bool) + go func() { + select { + case msg := <-messageQueue: + t.Errorf("unexpected message in queue: %v", msg) + case <-time.After(100 * time.Millisecond): + done <- true + } + }() + + handler.Handle(ctx) + + assert.Equal(t, fasthttp.StatusBadRequest, ctx.Response.StatusCode()) + <-done +} + +func TestHandle_NoObjectKindOrEventName(t *testing.T) { + logger := mockslogger.New() + messageQueue := make(chan *sarama.ProducerMessage, 10) + + handler, err := gitlabwebhookhandler.New( + gitlabwebhookhandler.WithLogger(logger), + gitlabwebhookhandler.WithTopic(kafkacp.KafkaTopicIdentifierGitLab.String()), + gitlabwebhookhandler.WithWebhookSecret("my-secret"), + gitlabwebhookhandler.WithProducerGitLabMessageQueue(messageQueue), + ) + + assert.NoError(t, err) + assert.NotNil(t, handler) + + ctx := newMockRequestCtx() + ctx.Request.SetBodyString(`{"foo": "bar"}`) + ctx.Request.Header.Set("X-Gitlab-Token", "my-secret") + ctx.Request.Header.Set("X-Gitlab-Event-Uuid", uuid.New().String()) + ctx.Request.Header.Set("X-Gitlab-Webhook-Uuid", uuid.New().String()) + + done := make(chan bool) + go func() { + select { + case msg := <-messageQueue: + t.Errorf("unexpected message in queue: %v", msg) + case <-time.After(100 * time.Millisecond): + done <- true + } + }() + + handler.Handle(ctx) + + assert.Equal(t, fasthttp.StatusBadRequest, ctx.Response.StatusCode()) + <-done +} + +func TestHandle_Success_ObjectKind(t *testing.T) { + logger := mockslogger.New() + messageQueue := make(chan *sarama.ProducerMessage, 10) + + handler, err := gitlabwebhookhandler.New( + gitlabwebhookhandler.WithLogger(logger), + gitlabwebhookhandler.WithTopic(kafkacp.KafkaTopicIdentifierGitLab.String()), + gitlabwebhookhandler.WithWebhookSecret("my-secret"), + gitlabwebhookhandler.WithProducerGitLabMessageQueue(messageQueue), + ) + + assert.NoError(t, err) + assert.NotNil(t, handler) + + body := `{ + "object_kind": "merge_request", + "user": {"id": 123, "username": "vigo"}, + "project": {"id": 456, "path_with_namespace": "devchain/cauldron"} + }` + + ctx := newMockRequestCtx() + ctx.Request.SetBodyString(body) + ctx.Request.Header.Set("X-Gitlab-Token", "my-secret") + ctx.Request.Header.Set("X-Gitlab-Event-Uuid", uuid.New().String()) + ctx.Request.Header.Set("X-Gitlab-Webhook-Uuid", uuid.New().String()) + + handler.Handle(ctx) + + assert.Equal(t, fasthttp.StatusAccepted, ctx.Response.StatusCode()) + assert.NotEmpty(t, <-messageQueue) +} + +func TestHandle_Success_EventName(t *testing.T) { + logger := mockslogger.New() + messageQueue := make(chan *sarama.ProducerMessage, 10) + + handler, err := gitlabwebhookhandler.New( + gitlabwebhookhandler.WithLogger(logger), + gitlabwebhookhandler.WithTopic(kafkacp.KafkaTopicIdentifierGitLab.String()), + gitlabwebhookhandler.WithWebhookSecret("my-secret"), + gitlabwebhookhandler.WithProducerGitLabMessageQueue(messageQueue), + ) + + assert.NoError(t, err) + assert.NotNil(t, handler) + + // Premium event with event_name instead of object_kind + body := `{ + "event_name": "project_create", + "project_id": 22, + "path_with_namespace": "group1/project1" + }` + + ctx := newMockRequestCtx() + ctx.Request.SetBodyString(body) + ctx.Request.Header.Set("X-Gitlab-Token", "my-secret") + ctx.Request.Header.Set("X-Gitlab-Event-Uuid", uuid.New().String()) + ctx.Request.Header.Set("X-Gitlab-Webhook-Uuid", uuid.New().String()) + + handler.Handle(ctx) + + assert.Equal(t, fasthttp.StatusAccepted, ctx.Response.StatusCode()) + assert.NotEmpty(t, <-messageQueue) +} + +func TestHandle_Success_PushEvent_FlatUser(t *testing.T) { + logger := mockslogger.New() + messageQueue := make(chan *sarama.ProducerMessage, 10) + + handler, err := gitlabwebhookhandler.New( + gitlabwebhookhandler.WithLogger(logger), + gitlabwebhookhandler.WithTopic(kafkacp.KafkaTopicIdentifierGitLab.String()), + gitlabwebhookhandler.WithWebhookSecret("my-secret"), + gitlabwebhookhandler.WithProducerGitLabMessageQueue(messageQueue), + ) + + assert.NoError(t, err) + assert.NotNil(t, handler) + + // Push event has flat user fields + body := `{ + "object_kind": "push", + "user_id": 383362, + "user_username": "vigo", + "project_id": 77447212, + "project": {"id": 77447212, "path_with_namespace": "vigo/webhook-tests-repo"} + }` + + ctx := newMockRequestCtx() + ctx.Request.SetBodyString(body) + ctx.Request.Header.Set("X-Gitlab-Token", "my-secret") + ctx.Request.Header.Set("X-Gitlab-Event-Uuid", uuid.New().String()) + ctx.Request.Header.Set("X-Gitlab-Webhook-Uuid", uuid.New().String()) + + handler.Handle(ctx) + + assert.Equal(t, fasthttp.StatusAccepted, ctx.Response.StatusCode()) + msg := <-messageQueue + assert.NotNil(t, msg) + assert.Equal(t, string(msg.Value.(sarama.ByteEncoder)), body) +} + +func TestHandle_Success_UserAddToGroup(t *testing.T) { + logger := mockslogger.New() + messageQueue := make(chan *sarama.ProducerMessage, 10) + + handler, err := gitlabwebhookhandler.New( + gitlabwebhookhandler.WithLogger(logger), + gitlabwebhookhandler.WithTopic(kafkacp.KafkaTopicIdentifierGitLab.String()), + gitlabwebhookhandler.WithWebhookSecret("my-secret"), + gitlabwebhookhandler.WithProducerGitLabMessageQueue(messageQueue), + ) + + assert.NoError(t, err) + assert.NotNil(t, handler) + + // Premium event: user_add_to_group + body := `{ + "event_name": "user_add_to_group", + "user_username": "test_user", + "user_id": 64, + "group_id": 100, + "group_path": "webhook-test" + }` + + ctx := newMockRequestCtx() + ctx.Request.SetBodyString(body) + ctx.Request.Header.Set("X-Gitlab-Token", "my-secret") + ctx.Request.Header.Set("X-Gitlab-Event-Uuid", uuid.New().String()) + ctx.Request.Header.Set("X-Gitlab-Webhook-Uuid", uuid.New().String()) + + handler.Handle(ctx) + + assert.Equal(t, fasthttp.StatusAccepted, ctx.Response.StatusCode()) + assert.NotEmpty(t, <-messageQueue) +} + +func TestHandle_Success_SubgroupCreate(t *testing.T) { + logger := mockslogger.New() + messageQueue := make(chan *sarama.ProducerMessage, 10) + + handler, err := gitlabwebhookhandler.New( + gitlabwebhookhandler.WithLogger(logger), + gitlabwebhookhandler.WithTopic(kafkacp.KafkaTopicIdentifierGitLab.String()), + gitlabwebhookhandler.WithWebhookSecret("my-secret"), + gitlabwebhookhandler.WithProducerGitLabMessageQueue(messageQueue), + ) + + assert.NoError(t, err) + assert.NotNil(t, handler) + + // Premium event: subgroup_create + body := `{ + "event_name": "subgroup_create", + "group_id": 10, + "full_path": "group1/subgroup1" + }` + + ctx := newMockRequestCtx() + ctx.Request.SetBodyString(body) + ctx.Request.Header.Set("X-Gitlab-Token", "my-secret") + ctx.Request.Header.Set("X-Gitlab-Event-Uuid", uuid.New().String()) + ctx.Request.Header.Set("X-Gitlab-Webhook-Uuid", uuid.New().String()) + + handler.Handle(ctx) + + assert.Equal(t, fasthttp.StatusAccepted, ctx.Response.StatusCode()) + assert.NotEmpty(t, <-messageQueue) +} + +func TestMessageQueue_Scenarios(t *testing.T) { + logger := mockslogger.New() + messageQueue := make(chan *sarama.ProducerMessage, 1) + + handler, err := gitlabwebhookhandler.New( + gitlabwebhookhandler.WithLogger(logger), + gitlabwebhookhandler.WithTopic(kafkacp.KafkaTopicIdentifierGitLab.String()), + gitlabwebhookhandler.WithWebhookSecret("my-secret"), + gitlabwebhookhandler.WithProducerGitLabMessageQueue(messageQueue), + ) + + assert.NoError(t, err) + assert.NotNil(t, handler) + + body := `{ + "object_kind": "push", + "user": {"id": 123, "username": "vigo"}, + "project": {"id": 456, "path_with_namespace": "devchain/cauldron"} + }` + + ctx := newMockRequestCtx() + ctx.Request.SetBodyString(body) + ctx.Request.Header.Set("X-Gitlab-Token", "my-secret") + ctx.Request.Header.Set("X-Gitlab-Event-Uuid", uuid.New().String()) + ctx.Request.Header.Set("X-Gitlab-Webhook-Uuid", uuid.New().String()) + + handler.Handle(ctx) + + go func() { + msg := <-messageQueue + assert.NotNil(t, msg) + assert.Equal(t, string(msg.Value.(sarama.ByteEncoder)), body) + }() + + assert.Equal(t, fasthttp.StatusAccepted, ctx.Response.StatusCode()) + assert.NotEmpty(t, <-messageQueue) +} diff --git a/migrations/000007_gitlab.down.sql b/migrations/000007_gitlab.down.sql new file mode 100644 index 0000000..5dea13f --- /dev/null +++ b/migrations/000007_gitlab.down.sql @@ -0,0 +1,12 @@ +BEGIN; + +DROP INDEX IF EXISTS "cauldron"."idx_gitlab_payload"; +DROP INDEX IF EXISTS "cauldron"."idx_gitlab_user_id"; +DROP INDEX IF EXISTS "cauldron"."idx_gitlab_user_username"; +DROP INDEX IF EXISTS "cauldron"."idx_gitlab_project_path"; +DROP INDEX IF EXISTS "cauldron"."idx_gitlab_project_id"; +DROP INDEX IF EXISTS "cauldron"."idx_gitlab_object_kind"; + +DROP TABLE IF EXISTS "cauldron"."gitlab"; + +COMMIT; diff --git a/migrations/000007_gitlab.up.sql b/migrations/000007_gitlab.up.sql new file mode 100644 index 0000000..8740723 --- /dev/null +++ b/migrations/000007_gitlab.up.sql @@ -0,0 +1,34 @@ +BEGIN; + +-- +-- Create "gitlab" table +-- + +CREATE TABLE "cauldron"."gitlab" ( + "id" SERIAL PRIMARY KEY, + "uid" UUID DEFAULT uuid_generate_v4(), + "created_at" TIMESTAMP NOT NULL DEFAULT (NOW() AT TIME ZONE 'UTC'), + "event_uuid" UUID NOT NULL, + "webhook_uuid" UUID NOT NULL, + "object_kind" VARCHAR(64) NOT NULL, + "project_id" BIGINT, + "project_path" VARCHAR(255), + "user_username" VARCHAR(255), + "user_id" BIGINT, + "kafka_offset" BIGINT NOT NULL, + "kafka_partition" SMALLINT NOT NULL, + "payload" JSONB NOT NULL DEFAULT '{}'::jsonb +); + +-- +-- Create indexes +-- + +CREATE INDEX "idx_gitlab_object_kind" ON "cauldron"."gitlab" (object_kind); +CREATE INDEX "idx_gitlab_project_id" ON "cauldron"."gitlab" (project_id) WHERE project_id IS NOT NULL; +CREATE INDEX "idx_gitlab_project_path" ON "cauldron"."gitlab" (project_path) WHERE project_path IS NOT NULL; +CREATE INDEX "idx_gitlab_user_username" ON "cauldron"."gitlab" (user_username) WHERE user_username IS NOT NULL; +CREATE INDEX "idx_gitlab_user_id" ON "cauldron"."gitlab" (user_id) WHERE user_id IS NOT NULL; +CREATE INDEX "idx_gitlab_payload" ON "cauldron"."gitlab" USING gin (payload); + +COMMIT; diff --git a/migrations/000008_gitlab_user.down.sql b/migrations/000008_gitlab_user.down.sql new file mode 100644 index 0000000..6fb102c --- /dev/null +++ b/migrations/000008_gitlab_user.down.sql @@ -0,0 +1,9 @@ +BEGIN; + +DROP INDEX IF EXISTS "cauldron"."idx_gitlab_user_meta_data"; +DROP INDEX IF EXISTS "cauldron"."idx_gitlab_user_user_id"; +DROP INDEX IF EXISTS "cauldron"."idx_gitlab_user_user_username"; + +DROP TABLE IF EXISTS "cauldron"."gitlab_user"; + +COMMIT; diff --git a/migrations/000008_gitlab_user.up.sql b/migrations/000008_gitlab_user.up.sql new file mode 100644 index 0000000..51e0b6b --- /dev/null +++ b/migrations/000008_gitlab_user.up.sql @@ -0,0 +1,28 @@ +BEGIN; + +-- +-- Create "gitlab_user" table +-- + +CREATE TABLE "cauldron"."gitlab_user" ( + "id" SERIAL PRIMARY KEY, + "uid" UUID DEFAULT uuid_generate_v4(), + "created_at" TIMESTAMP NOT NULL DEFAULT (NOW() AT TIME ZONE 'UTC'), + "app_user_id" BIGINT REFERENCES "cauldron"."app_user"(id) ON DELETE NO ACTION, + "user_username" VARCHAR(255) NOT NULL, + "user_id" BIGINT NOT NULL, + "name" VARCHAR(255), + "avatar_url" VARCHAR(255), + "email" VARCHAR(254), + "meta_data" JSONB NOT NULL DEFAULT '{}'::jsonb +); + +-- +-- Create indexes +-- + +CREATE INDEX "idx_gitlab_user_user_username" ON "cauldron"."gitlab_user" (user_username); +CREATE INDEX "idx_gitlab_user_user_id" ON "cauldron"."gitlab_user" (user_id); +CREATE INDEX "idx_gitlab_user_meta_data" ON "cauldron"."gitlab_user" USING gin (meta_data); + +COMMIT; diff --git a/scripts/local/rake/run.rake b/scripts/local/rake/run.rake index 74e03b5..25f89e3 100644 --- a/scripts/local/rake/run.rake +++ b/scripts/local/rake/run.rake @@ -32,5 +32,17 @@ namespace :run do end end + + namespace :gitlab do + + desc 'run kafka gitlab consumer' + task :consumer do + system %{ go run -race cmd/gitlabconsumer/main.go } + exit($CHILD_STATUS&.exitstatus || 1) unless ENV['RAKE_CONTINUE'] + rescue Interrupt + exit(0) + end + + end end end From 2e9bc4c2a61e9e057c0258f530ad2041dc9dc843 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?U=C4=9Fur=20=C3=96zy=C4=B1lmazel?= Date: Sat, 3 Jan 2026 19:16:44 +0300 Subject: [PATCH 2/4] Improve test coverage for GitLab components MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add TestMessageQueue_Full for handler queue full scenario - Add TestStore_Success_SkipsZeroProjectID for zero project_id - Add TestStore_Success_SkipsZeroUserID for zero user_id Coverage improved: - gitlabwebhookhandler: 97.7% → 98.9% - gitlabstorage: 97.6% → 98.8% 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- .../gitlabstorage/gitlabstorage_test.go | 87 +++++++++++++++++++ .../gitlabwebhookhandler_test.go | 13 ++- 2 files changed, 92 insertions(+), 8 deletions(-) diff --git a/internal/storage/gitlabstorage/gitlabstorage_test.go b/internal/storage/gitlabstorage/gitlabstorage_test.go index 9bcdd8d..577e9fd 100644 --- a/internal/storage/gitlabstorage/gitlabstorage_test.go +++ b/internal/storage/gitlabstorage/gitlabstorage_test.go @@ -508,6 +508,49 @@ func TestStore_Success_SkipsEmptyProjectID(t *testing.T) { {Key: []byte("webhook-uuid"), Value: []byte(webhookUUID.String())}, {Key: []byte("object-kind"), Value: []byte(`push`)}, {Key: []byte("project-id"), Value: []byte(``)}, // empty, should skip + }, + } + + err = db.MessageStore(ctx, message) + assert.NoError(t, err) + mockPool.AssertExpectations(t) +} + +func TestStore_Success_SkipsZeroProjectID(t *testing.T) { + logger := mockslogger.New() + + ctx, cancel := context.WithTimeout(context.Background(), storage.DefaultDBPingTimeout) + defer cancel() + + mockPool := new(MockPGPooler) + mockPool.On("Exec", + mock.Anything, + mock.Anything, + mock.Anything, + ).Return(pgconn.CommandTag{}, nil) + + db, err := gitlabstorage.New( + ctx, + gitlabstorage.WithLogger(logger), + gitlabstorage.WithDatabaseDSN( + "postgres://foo:bar@localhost:5432/fake?sslmode=disable", + ), + ) + + assert.NoError(t, err) + assert.NotNil(t, db) + db.Pool = mockPool + + uuidKey := uuid.New() + webhookUUID := uuid.New() + + message := &sarama.ConsumerMessage{ + Topic: "gitlab", + Key: []byte(uuidKey.String()), + Value: []byte(`{"hello": "world"}`), + Headers: []*sarama.RecordHeader{ + {Key: []byte("webhook-uuid"), Value: []byte(webhookUUID.String())}, + {Key: []byte("object-kind"), Value: []byte(`push`)}, {Key: []byte("project-id"), Value: []byte(`0`)}, // zero, should skip }, } @@ -516,3 +559,47 @@ func TestStore_Success_SkipsEmptyProjectID(t *testing.T) { assert.NoError(t, err) mockPool.AssertExpectations(t) } + +func TestStore_Success_SkipsZeroUserID(t *testing.T) { + logger := mockslogger.New() + + ctx, cancel := context.WithTimeout(context.Background(), storage.DefaultDBPingTimeout) + defer cancel() + + mockPool := new(MockPGPooler) + mockPool.On("Exec", + mock.Anything, + mock.Anything, + mock.Anything, + ).Return(pgconn.CommandTag{}, nil) + + db, err := gitlabstorage.New( + ctx, + gitlabstorage.WithLogger(logger), + gitlabstorage.WithDatabaseDSN( + "postgres://foo:bar@localhost:5432/fake?sslmode=disable", + ), + ) + + assert.NoError(t, err) + assert.NotNil(t, db) + db.Pool = mockPool + + uuidKey := uuid.New() + webhookUUID := uuid.New() + + message := &sarama.ConsumerMessage{ + Topic: "gitlab", + Key: []byte(uuidKey.String()), + Value: []byte(`{"hello": "world"}`), + Headers: []*sarama.RecordHeader{ + {Key: []byte("webhook-uuid"), Value: []byte(webhookUUID.String())}, + {Key: []byte("object-kind"), Value: []byte(`push`)}, + {Key: []byte("user-id"), Value: []byte(`0`)}, // zero, should skip + }, + } + + err = db.MessageStore(ctx, message) + assert.NoError(t, err) + mockPool.AssertExpectations(t) +} diff --git a/internal/transport/http/gitlabwebhookhandler/gitlabwebhookhandler_test.go b/internal/transport/http/gitlabwebhookhandler/gitlabwebhookhandler_test.go index 31eccc4..4f21478 100644 --- a/internal/transport/http/gitlabwebhookhandler/gitlabwebhookhandler_test.go +++ b/internal/transport/http/gitlabwebhookhandler/gitlabwebhookhandler_test.go @@ -465,9 +465,10 @@ func TestHandle_Success_SubgroupCreate(t *testing.T) { assert.NotEmpty(t, <-messageQueue) } -func TestMessageQueue_Scenarios(t *testing.T) { +func TestMessageQueue_Full(t *testing.T) { logger := mockslogger.New() - messageQueue := make(chan *sarama.ProducerMessage, 1) + // Create a zero-capacity buffered channel that's already full + messageQueue := make(chan *sarama.ProducerMessage) handler, err := gitlabwebhookhandler.New( gitlabwebhookhandler.WithLogger(logger), @@ -493,12 +494,8 @@ func TestMessageQueue_Scenarios(t *testing.T) { handler.Handle(ctx) - go func() { - msg := <-messageQueue - assert.NotNil(t, msg) - assert.Equal(t, string(msg.Value.(sarama.ByteEncoder)), body) - }() + // Wait a bit for goroutine to execute default case (queue full) + time.Sleep(50 * time.Millisecond) assert.Equal(t, fasthttp.StatusAccepted, ctx.Response.StatusCode()) - assert.NotEmpty(t, <-messageQueue) } From e5909b84273c335355fc59e9818fbeacab4ae9a0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?U=C4=9Fur=20=C3=96zy=C4=B1lmazel?= Date: Sat, 3 Jan 2026 19:21:57 +0300 Subject: [PATCH 3/4] fix typo in comments --- cmd/githubconsumer/main.go | 2 +- cmd/githubconsumergroup/main.go | 2 +- cmd/gitlabconsumer/main.go | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/cmd/githubconsumer/main.go b/cmd/githubconsumer/main.go index b36ee63..4dd64bb 100644 --- a/cmd/githubconsumer/main.go +++ b/cmd/githubconsumer/main.go @@ -24,7 +24,7 @@ func storeMessage(strg storage.PingStorer) kafkaconsumer.ProcessMessageFunc { } } -// Run runs kafa github consumer. +// Run runs kafka github consumer. func Run() error { logLevel := getenv.String("LOG_LEVEL", slogger.DefaultLogLevel) brokersList := getenv.String("KCP_BROKERS", kafkacp.DefaultKafkaBrokers) diff --git a/cmd/githubconsumergroup/main.go b/cmd/githubconsumergroup/main.go index 792aa5e..1f84534 100644 --- a/cmd/githubconsumergroup/main.go +++ b/cmd/githubconsumergroup/main.go @@ -24,7 +24,7 @@ func storeMessage(strg storage.PingStorer) kafkaconsumergroup.ProcessMessageFunc } } -// Run runs kafa github consumer group. +// Run runs kafka github consumer group. func Run() error { logLevel := getenv.String("LOG_LEVEL", slogger.DefaultLogLevel) brokersList := getenv.String("KCP_BROKERS", kafkacp.DefaultKafkaBrokers) diff --git a/cmd/gitlabconsumer/main.go b/cmd/gitlabconsumer/main.go index 9a9e8d3..dfe0be9 100644 --- a/cmd/gitlabconsumer/main.go +++ b/cmd/gitlabconsumer/main.go @@ -24,7 +24,7 @@ func storeMessage(strg storage.PingStorer) kafkaconsumer.ProcessMessageFunc { } } -// Run runs kafa gitlab consumer. +// Run runs kafka gitlab consumer. func Run() error { logLevel := getenv.String("LOG_LEVEL", slogger.DefaultLogLevel) brokersList := getenv.String("KCP_BROKERS", kafkacp.DefaultKafkaBrokers) From 5695762baedf7eae5feb525993150bd2e89dfd94 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?U=C4=9Fur=20=C3=96zy=C4=B1lmazel?= Date: Sat, 3 Jan 2026 19:24:35 +0300 Subject: [PATCH 4/4] Add codecov config to ignore main.go files MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Main entry point files don't have unit tests as they're integration points. Ignore them from patch coverage calculation. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- codecov.yml | 11 +++++++++++ 1 file changed, 11 insertions(+) create mode 100644 codecov.yml diff --git a/codecov.yml b/codecov.yml new file mode 100644 index 0000000..ab6f8ba --- /dev/null +++ b/codecov.yml @@ -0,0 +1,11 @@ +coverage: + status: + project: + default: + target: auto + patch: + default: + target: 80% + +ignore: + - "cmd/**/main.go"