From d01656ad2291ce82ae346cfe040e17cac487c3e6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?U=C4=9Fur=20=C3=96zy=C4=B1lmazel?= Date: Sat, 3 Jan 2026 19:52:03 +0300 Subject: [PATCH] Add GitLab consumer group support MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add cmd/gitlabconsumergroup for Kafka consumer group processing - Add Dockerfile.gitlab-consumer-group for container builds - Add GitHub Actions workflows for build and staging deployment - Update docker-compose.infra.consumerg.yml with gitlab-consumer-group - Add rake task for running gitlab consumer group locally 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- ...ld-push-cauldron-gitlab-group-consumer.yml | 33 ++++++ .../stg-cauldron-gitlab-group-consumer.yml | 33 ++++++ Dockerfile.gitlab-consumer-group | 31 ++++++ cmd/gitlabconsumergroup/main.go | 100 ++++++++++++++++++ docker-compose.infra.consumerg.yml | 18 ++++ scripts/local/rake/run.rake | 8 ++ 6 files changed, 223 insertions(+) create mode 100644 .github/workflows/build-push-cauldron-gitlab-group-consumer.yml create mode 100644 .github/workflows/stg-cauldron-gitlab-group-consumer.yml create mode 100644 Dockerfile.gitlab-consumer-group create mode 100644 cmd/gitlabconsumergroup/main.go diff --git a/.github/workflows/build-push-cauldron-gitlab-group-consumer.yml b/.github/workflows/build-push-cauldron-gitlab-group-consumer.yml new file mode 100644 index 0000000..d511663 --- /dev/null +++ b/.github/workflows/build-push-cauldron-gitlab-group-consumer.yml @@ -0,0 +1,33 @@ +name: GitLab Group Consumer (build-push) + +on: + workflow_dispatch: + +jobs: + build-image: + runs-on: ubuntu-24.04 + steps: + - name: Checkout + uses: actions/checkout@v6 + + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v3 + + - name: Login to GitHub Container Registry + uses: docker/login-action@v3 + with: + registry: ghcr.io + username: ${{ github.actor }} + password: ${{ secrets.GITHUB_TOKEN }} + + - name: Build and push to GitHub Container Registry + uses: docker/build-push-action@v6 + with: + context: . + file: Dockerfile.gitlab-consumer-group + platforms: linux/amd64 + push: true + provenance: false + tags: ghcr.io/${{ github.repository }}/cauldron-gitlab-consumer-group:latest + cache-from: type=gha + cache-to: type=gha,mode=max diff --git a/.github/workflows/stg-cauldron-gitlab-group-consumer.yml b/.github/workflows/stg-cauldron-gitlab-group-consumer.yml new file mode 100644 index 0000000..a019849 --- /dev/null +++ b/.github/workflows/stg-cauldron-gitlab-group-consumer.yml @@ -0,0 +1,33 @@ +name: Staging - GitLab Group Consumer (build-push) + +on: + workflow_dispatch: + +jobs: + build-image: + runs-on: ubuntu-24.04 + steps: + - name: Checkout + uses: actions/checkout@v6 + + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v3 + + - name: Login to GitHub Container Registry + uses: docker/login-action@v3 + with: + registry: ghcr.io + username: ${{ github.actor }} + password: ${{ secrets.GITHUB_TOKEN }} + + - name: Build and push to GitHub Container Registry + uses: docker/build-push-action@v6 + with: + context: . + file: Dockerfile.gitlab-consumer-group + platforms: linux/amd64 + push: true + provenance: false + tags: ghcr.io/${{ github.repository }}/cauldron-stg-gitlab-consumer-group:latest + cache-from: type=gha + cache-to: type=gha,mode=max diff --git a/Dockerfile.gitlab-consumer-group b/Dockerfile.gitlab-consumer-group new file mode 100644 index 0000000..f73a637 --- /dev/null +++ b/Dockerfile.gitlab-consumer-group @@ -0,0 +1,31 @@ +FROM golang:1.25-alpine AS builder + +WORKDIR /build +COPY . . + +ARG GOOS +ARG GOARCH +RUN CGO_ENABLED=0 GOOS=${GOOS} GOARCH=${GOARCH} go build -o consumergroup cmd/gitlabconsumergroup/main.go + +FROM alpine:latest AS certs +RUN apk add --update --no-cache ca-certificates + +FROM busybox:latest +ARG UID=10001 +RUN adduser \ + --disabled-password \ + --gecos "" \ + --home "/nonexistent" \ + --shell "/sbin/nologin" \ + --no-create-home \ + --uid "${UID}" \ + appuser +USER appuser +COPY --from=certs /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/ca-certificates.crt +COPY --from=builder /build/consumergroup /consumergroup + +ENTRYPOINT ["/consumergroup"] + +LABEL org.opencontainers.image.authors="Uğur vigo Özyılmazel " +LABEL org.opencontainers.image.licenses="MIT" +LABEL org.opencontainers.image.source="https://github.com/devchain-network/cauldron" diff --git a/cmd/gitlabconsumergroup/main.go b/cmd/gitlabconsumergroup/main.go new file mode 100644 index 0000000..561d51a --- /dev/null +++ b/cmd/gitlabconsumergroup/main.go @@ -0,0 +1,100 @@ +package main + +import ( + "context" + "fmt" + "log" + + "github.com/IBM/sarama" + "github.com/devchain-network/cauldron/internal/kafkacp" + "github.com/devchain-network/cauldron/internal/kafkacp/kafkaconsumergroup" + "github.com/devchain-network/cauldron/internal/slogger" + "github.com/devchain-network/cauldron/internal/storage" + "github.com/devchain-network/cauldron/internal/storage/gitlabstorage" + "github.com/vigo/getenv" +) + +func storeMessage(strg storage.PingStorer) kafkaconsumergroup.ProcessMessageFunc { + return func(ctx context.Context, msg *sarama.ConsumerMessage) error { + if err := strg.MessageStore(ctx, msg); err != nil { + return fmt.Errorf("message store error: [%w]", err) + } + + return nil + } +} + +// Run runs kafka gitlab consumer group. +func Run() error { + logLevel := getenv.String("LOG_LEVEL", slogger.DefaultLogLevel) + brokersList := getenv.String("KCP_BROKERS", kafkacp.DefaultKafkaBrokers) + kafkaTopic := getenv.String("KC_TOPIC_GITLAB", "") + kafkaConsumerGroup := getenv.String("KCG_NAME", "") + kafkaDialTimeout := getenv.Duration("KC_DIAL_TIMEOUT", kafkaconsumergroup.DefaultDialTimeout) + kafkaReadTimeout := getenv.Duration("KC_READ_TIMEOUT", kafkaconsumergroup.DefaultReadTimeout) + kafkaWriteTimeout := getenv.Duration("KC_WRITE_TIMEOUT", kafkaconsumergroup.DefaultWriteTimeout) + kafkaBackoff := getenv.Duration("KC_BACKOFF", kafkaconsumergroup.DefaultBackoff) + kafkaMaxRetries := getenv.Int("KC_MAX_RETRIES", kafkaconsumergroup.DefaultMaxRetries) + databaseURL := getenv.String("DATABASE_URL", "") + + if err := getenv.Parse(); err != nil { + return fmt.Errorf("environment variable parse error: [%w]", err) + } + + logger, err := slogger.New( + slogger.WithLogLevelName(*logLevel), + ) + if err != nil { + return fmt.Errorf("logger instantiate error: [%w]", err) + } + + ctx, cancel := context.WithTimeout(context.Background(), storage.DefaultDBPingTimeout) + defer cancel() + + db, err := gitlabstorage.New( + ctx, + gitlabstorage.WithDatabaseDSN(*databaseURL), + gitlabstorage.WithLogger(logger), + ) + if err != nil { + return fmt.Errorf("gitlab storage instantiate error: [%w]", err) + } + + if err = db.Ping(ctx, storage.DefaultDBPingMaxRetries, storage.DefaultDBPingBackoff); err != nil { + return fmt.Errorf("gitlab storage ping error: [%w]", err) + } + defer func() { + logger.Info("gitlab storage - closing pgx pool") + db.Pool.Close() + }() + + kafkaGitLabConsumer, err := kafkaconsumergroup.New( + kafkaconsumergroup.WithLogger(logger), + kafkaconsumergroup.WithProcessMessageFunc(storeMessage(db)), + kafkaconsumergroup.WithKafkaBrokers(*brokersList), + kafkaconsumergroup.WithDialTimeout(*kafkaDialTimeout), + kafkaconsumergroup.WithReadTimeout(*kafkaReadTimeout), + kafkaconsumergroup.WithWriteTimeout(*kafkaWriteTimeout), + kafkaconsumergroup.WithBackoff(*kafkaBackoff), + kafkaconsumergroup.WithMaxRetries(*kafkaMaxRetries), + kafkaconsumergroup.WithTopic(*kafkaTopic), + kafkaconsumergroup.WithKafkaGroupName(*kafkaConsumerGroup), + ) + if err != nil { + return fmt.Errorf("gitlab kafka group consumer instantiate error: [%w]", err) + } + + defer func() { _ = kafkaGitLabConsumer.SaramaConsumerGroup.Close() }() + + if err = kafkaGitLabConsumer.StartConsume(); err != nil { + return fmt.Errorf("gitlab kafka group consumer start consume error: [%w]", err) + } + + return nil +} + +func main() { + if err := Run(); err != nil { + log.Fatal(err) + } +} diff --git a/docker-compose.infra.consumerg.yml b/docker-compose.infra.consumerg.yml index 6414ec2..7e58c11 100644 --- a/docker-compose.infra.consumerg.yml +++ b/docker-compose.infra.consumerg.yml @@ -91,6 +91,7 @@ services: - "8000:8000" environment: GITHUB_HMAC_SECRET: "${GITHUB_HMAC_SECRET}" + GITLAB_HMAC_SECRET: "${GITLAB_HMAC_SECRET}" KCP_BROKERS: "kafka:9092" depends_on: - kafka @@ -114,6 +115,23 @@ services: networks: - devchain-network + gitlab-consumer-group: + build: + context: . + dockerfile: Dockerfile.gitlab-consumer-group + environment: + KC_TOPIC_GITLAB: "gitlab" + KCG_NAME: "gitlab-group" + KCP_BROKERS: "kafka:9092" + DATABASE_URL: "${DATABASE_URL_INFRA}" + depends_on: + - kafka + - cauldron + - postgresql_db + - migrator + networks: + - devchain-network + volumes: kafka_data: driver: local diff --git a/scripts/local/rake/run.rake b/scripts/local/rake/run.rake index 25f89e3..1a10a9a 100644 --- a/scripts/local/rake/run.rake +++ b/scripts/local/rake/run.rake @@ -43,6 +43,14 @@ namespace :run do exit(0) end + desc 'run kafka gitlab consumer group' + task :consumer_group do + system %{ go run -race cmd/gitlabconsumergroup/main.go } + exit($CHILD_STATUS&.exitstatus || 1) unless ENV['RAKE_CONTINUE'] + rescue Interrupt + exit(0) + end + end end end