diff --git a/.github/workflows/rds-postgres-debezium-msk-connector.yml b/.github/workflows/rds-postgres-debezium-msk-connector.yml new file mode 100644 index 0000000..c05a0d3 --- /dev/null +++ b/.github/workflows/rds-postgres-debezium-msk-connector.yml @@ -0,0 +1,23 @@ +name: rds-postgres/debezium-msk-connector +on: + pull_request: + branches: + - main + paths: + - rds-postgres/debezium-msk-connector/** + types: + - closed + - opened + - reopened + - synchronize +jobs: + terraform: + uses: ./.github/workflows/terraform.yml + concurrency: ${{ github.workflow }} + with: + module: rds-postgres/debezium-msk-connector + permissions: + id-token: write + contents: write + checks: write + pull-requests: write diff --git a/.gitignore b/.gitignore index 3b7eca1..bcffb36 100644 --- a/.gitignore +++ b/.gitignore @@ -9,3 +9,5 @@ .terraformrc .validate python +.plugin-build/ +postgres-msk-debezium-plugin-*.zip diff --git a/README.md b/README.md index 4e1e2ef..95a8cdd 100644 --- a/README.md +++ b/README.md @@ -9,6 +9,7 @@ Modules: * [Primary Instance](./rds-postgres/primary-instance/README.md) * [Replica Instance](./rds-postgres/replica/README.md) * [Parameter Group](./rds-postgres/parameter-group/README.md) + * [Debezium MSK Connector](./rds-postgres/debezium-msk-connector/README.md) * [Admin Login](./rds-postgres/admin-login/README.md) * [User Login](./rds-postgres/rds-postgres-login/README.md) * [CloudWatch Alarms](./rds-postgres/cloudwatch-alarms/README.md) diff --git a/rds-postgres/debezium-msk-connector/README.md b/rds-postgres/debezium-msk-connector/README.md new file mode 100644 index 0000000..a6d1344 --- /dev/null +++ b/rds-postgres/debezium-msk-connector/README.md @@ -0,0 +1,237 @@ +# Debezium MSK Connector + +Provision an Amazon MSK Connect Debezium PostgreSQL connector, including its execution role, worker configuration, and CloudWatch log group. + +## Usage + +The module expects three pieces to already exist in the target AWS account: + +1. An Amazon MSK cluster with IAM authentication enabled for clients. +2. An MSK Connect custom plugin containing the Debezium PostgreSQL connector and the AWS config providers. +3. A Secrets Manager secret containing PostgreSQL connection details and an SSM parameter containing the MSK IAM bootstrap brokers string. + +Available Debezium PostgreSQL connector versions can be found in the upstream Debezium release pages and connector installation docs: + +- Debezium releases overview: https://debezium.io/releases/ +- Debezium PostgreSQL connector docs: https://debezium.io/documentation/reference/stable/connectors/postgresql.html +- Debezium installation docs for connector archives: https://debezium.io/documentation/reference/stable/install.html + +## Build And Upload The Plugin + +This is a required initial preparation step. The module cannot be applied until the MSK Connect custom plugin ZIP has been built, uploaded to S3, and registered as an MSK Connect custom plugin in AWS. + +An interactive helper script is included at [`scripts/build_upload_plugin.py`](./scripts/build_upload_plugin.py). It automates the full preparation flow by validating the requested Debezium version, downloading the required Debezium and AWS config provider artifacts, packaging them into a single MSK Connect plugin ZIP, and uploading that ZIP to S3. + +Use this script as part of the initial setup for any environment that needs this module. + +Prerequisites for the script: + +- Python 3 +- AWS CLI +- AWS credentials, or an AWS profile, or an assumable IAM role with access to the target S3 bucket + +The script uses only the Python standard library, so there are no extra Python package dependencies to install. + +Default values used by the script: + +- Debezium PostgreSQL connector version: `3.5.0` +- AWS MSK config provider version: `0.4.0` + +The script will: + +1. Request the Debezium PostgreSQL connector version and validate that the archive exists. +2. Show where to find available AWS MSK config provider versions, then request and validate the selected version. +3. Request an authentication mode: current AWS credentials, AWS profile, or IAM role. +4. Request the AWS region, S3 bucket, optional S3 prefix, output ZIP filename, and suggested MSK Connect plugin name. +5. Download and extract the Debezium PostgreSQL connector plugin archive. +6. Download and extract the AWS MSK config provider archive. +7. Package both into a single ZIP file. +8. Upload the ZIP to the requested S3 location. + +Run it with `./scripts/build_upload_plugin.py` or `python3 scripts/build_upload_plugin.py`. + +At the end, the script prints: + +- the uploaded `s3://` object path +- the suggested `custom_plugin_name` value to use in this Terraform module + +The packaged ZIP uploaded by the script is the artifact you must register as the MSK Connect custom plugin in AWS before applying this module. + +A typical module call looks like this: + +```hcl +module "debezium_msk_connector" { + source = "../debezium-msk-connector" + + cluster_name = "example" + custom_plugin_name = "postgresql-msk-debezium-connector-3-5-0" + bootstrap_brokers_sasl_iam = aws_msk_cluster.this.bootstrap_brokers_sasl_iam + kafka_iam_broker_endpoint_parameter_name = "/example/msk/bootstrap-brokers-sasl-iam" + database_credentials_secret_name = "example-postgres-debezium" + table_include_list = "public.table_a,public.table_b,public.table_c" + + security_group_ids = [aws_security_group.msk_connect.id] + subnet_ids = module.vpc.private_subnets + + tags = { + Service = "example" + ManagedBy = "terraform" + Environment = "production" + } +} +``` + +## Required Secret Shape + +The secret referenced by `database_credentials_secret_name` should expose these keys: + +```hcl +{ + host = "postgres.cluster-abcdefghijkl.us-east-1.rds.amazonaws.com" + port = "5432" + dbname = "application" + username = "debezium" + password = "replace-me" +} +``` + +## Common Overrides + +You can override naming defaults, scaling, logging retention, and selected Debezium connector settings. + +```hcl +module "debezium_msk_connector" { + source = "../debezium-msk-connector" + + cluster_name = "example" + custom_plugin_name = "postgresql-msk-debezium-connector-3-5-0" + bootstrap_brokers_sasl_iam = aws_msk_cluster.this.bootstrap_brokers_sasl_iam + kafka_iam_broker_endpoint_parameter_name = "/example/msk/bootstrap-brokers-sasl-iam" + database_credentials_secret_name = "example-postgres-debezium" + table_include_list = "public.table_a,public.table_b" + security_group_ids = [aws_security_group.msk_connect.id] + subnet_ids = module.vpc.private_subnets + + connector_name = "example-cdc" + worker_configuration_name = "example-cdc-workers" + log_group_name = "/aws/mskconnect/example-cdc" + schema_history_topic = "schemahistory.example_cdc" + mcu_count = 2 + min_worker_count = 1 + max_worker_count = 4 + tasks_max = 1 + cloudwatch_log_retention_in_days = 14 + + connector_configuration_overrides = { + "snapshot.mode" = "initial" + "decimal.handling.mode" = "string" + "time.precision.mode" = "connect" + "tombstones.on.delete" = "false" + "include.schema.changes" = "true" + } +} +``` + +## Custom Worker Configuration + +If you need to replace the default worker properties entirely, provide `worker_configuration_properties_file_content`. + +```hcl +module "debezium_msk_connector" { + source = "../debezium-msk-connector" + + cluster_name = "example" + custom_plugin_name = "postgresql-msk-debezium-connector-3-5-0" + bootstrap_brokers_sasl_iam = aws_msk_cluster.this.bootstrap_brokers_sasl_iam + kafka_iam_broker_endpoint_parameter_name = "/example/msk/bootstrap-brokers-sasl-iam" + database_credentials_secret_name = "example-postgres-debezium" + table_include_list = "public.table_a" + security_group_ids = [aws_security_group.msk_connect.id] + subnet_ids = module.vpc.private_subnets + + worker_configuration_properties_file_content = <<-EOT + key.converter=org.apache.kafka.connect.storage.StringConverter + value.converter=org.apache.kafka.connect.storage.StringConverter + config.providers=secretsmanager,ssm + config.providers.secretsmanager.class=com.amazonaws.kafka.config.providers.SecretsManagerConfigProvider + config.providers.ssm.class=com.amazonaws.kafka.config.providers.SsmParamStoreConfigProvider + config.providers.secretsmanager.param.region=us-east-1 + config.providers.ssm.param.region=us-east-1 + EOT +} +``` + +## Notes + +- `table_include_list` must use fully qualified PostgreSQL table names such as `public.table_a`. +- `tasks_max` should generally remain `1` for PostgreSQL Debezium connectors. +- `security_group_ids` and `subnet_ids` must allow connector workers to reach both the MSK brokers and the PostgreSQL instance. +- `connector_configuration_overrides` is merged on top of the module defaults, so conflicting keys there will replace the defaults. + + +## Requirements + +| Name | Version | +|------|---------| +| [terraform](#requirement\_terraform) | >= 1.6.2 | +| [aws](#requirement\_aws) | ~> 6.0 | + +## Providers + +| Name | Version | +|------|---------| +| [aws](#provider\_aws) | ~> 6.0 | + +## Resources + +| Name | Type | +|------|------| +| [aws_cloudwatch_log_group.this](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/cloudwatch_log_group) | resource | +| [aws_iam_role.this](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/iam_role) | resource | +| [aws_iam_role_policy.this](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/iam_role_policy) | resource | +| [aws_mskconnect_connector.this](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/mskconnect_connector) | resource | +| [aws_mskconnect_worker_configuration.this](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/mskconnect_worker_configuration) | resource | +| [aws_caller_identity.this](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/data-sources/caller_identity) | data source | +| [aws_iam_policy_document.msk_connect_assume_role](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/data-sources/iam_policy_document) | data source | +| [aws_iam_policy_document.this](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/data-sources/iam_policy_document) | data source | +| [aws_mskconnect_custom_plugin.this](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/data-sources/mskconnect_custom_plugin) | data source | +| [aws_region.this](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/data-sources/region) | data source | + +## Inputs + +| Name | Description | Type | Default | Required | +|------|-------------|------|---------|:--------:| +| [bootstrap\_brokers\_sasl\_iam](#input\_bootstrap\_brokers\_sasl\_iam) | SASL/IAM bootstrap broker connection string for the target Amazon MSK cluster. | `string` | n/a | yes | +| [cloudwatch\_log\_retention\_in\_days](#input\_cloudwatch\_log\_retention\_in\_days) | Number of days to retain connector worker logs in CloudWatch. | `number` | `30` | no | +| [cluster\_name](#input\_cluster\_name) | Base name used to derive connector, slot, publication, and topic defaults. | `string` | n/a | yes | +| [connector\_configuration\_overrides](#input\_connector\_configuration\_overrides) | Additional connector configuration entries to merge over the module defaults. | `map(string)` | `{}` | no | +| [connector\_name](#input\_connector\_name) | Explicit name for the MSK Connect connector. Defaults to '-postgres-connector'. | `string` | `null` | no | +| [custom\_plugin\_name](#input\_custom\_plugin\_name) | Name of the existing AWS MSK Connect custom plugin that contains the Debezium PostgreSQL connector artifacts. | `string` | n/a | yes | +| [database\_credentials\_secret\_name](#input\_database\_credentials\_secret\_name) | Name of the AWS Secrets Manager secret that stores the PostgreSQL connection details with keys `host`, `port`, `dbname`, `username`, and `password`. | `string` | n/a | yes | +| [kafka\_iam\_broker\_endpoint\_parameter\_name](#input\_kafka\_iam\_broker\_endpoint\_parameter\_name) | Name of the SSM parameter that contains the MSK IAM bootstrap broker endpoint. | `string` | n/a | yes | +| [kafkaconnect\_version](#input\_kafkaconnect\_version) | Kafka Connect runtime version for the MSK Connect connector. | `string` | `"3.7.x"` | no | +| [log\_group\_name](#input\_log\_group\_name) | Explicit name for the CloudWatch log group. Defaults to '-logs'. | `string` | `null` | no | +| [max\_worker\_count](#input\_max\_worker\_count) | Maximum number of autoscaled workers for the MSK Connect connector. | `number` | `2` | no | +| [mcu\_count](#input\_mcu\_count) | Number of MSK Connect units (MCUs) allocated to each connector worker. | `number` | `1` | no | +| [min\_worker\_count](#input\_min\_worker\_count) | Minimum number of autoscaled workers for the MSK Connect connector. | `number` | `1` | no | +| [schema\_history\_topic](#input\_schema\_history\_topic) | Override for the internal Kafka topic used by Debezium schema history. Defaults to 'schemahistory.' with hyphens replaced by underscores. | `string` | `null` | no | +| [security\_group\_ids](#input\_security\_group\_ids) | Security group IDs attached to the MSK Connect connector within the VPC. | `list(string)` | n/a | yes | +| [subnet\_ids](#input\_subnet\_ids) | Subnet IDs where the MSK Connect connector workers will run. | `list(string)` | n/a | yes | +| [table\_include\_list](#input\_table\_include\_list) | Comma-separated list of PostgreSQL tables that Debezium should capture. Example: 'public.table\_a,public.table\_b'. | `string` | n/a | yes | +| [tags](#input\_tags) | Tags to apply to supported resources created by this module. | `map(string)` | `{}` | no | +| [tasks\_max](#input\_tasks\_max) | Maximum number of connector tasks. PostgreSQL Debezium connectors typically run with a single task. | `number` | `1` | no | +| [worker\_configuration\_name](#input\_worker\_configuration\_name) | Explicit name for the MSK Connect worker configuration. Defaults to '-worker-config'. | `string` | `null` | no | +| [worker\_configuration\_properties\_file\_content](#input\_worker\_configuration\_properties\_file\_content) | Optional full worker configuration properties content. Defaults to a configuration that enables Secrets Manager and SSM config providers in the current AWS region. | `string` | `null` | no | + +## Outputs + +| Name | Description | +|------|-------------| +| [connector](#output\_connector) | The created MSK Connect connector resource. | +| [connector\_arn](#output\_connector\_arn) | ARN of the created MSK Connect connector. | +| [connector\_name](#output\_connector\_name) | Name of the created MSK Connect connector. | +| [custom\_plugin\_arn](#output\_custom\_plugin\_arn) | ARN of the MSK Connect custom plugin used by the connector. | +| [log\_group\_name](#output\_log\_group\_name) | Name of the CloudWatch log group receiving connector worker logs. | +| [service\_execution\_role\_arn](#output\_service\_execution\_role\_arn) | ARN of the IAM role assumed by MSK Connect. | +| [worker\_configuration\_arn](#output\_worker\_configuration\_arn) | ARN of the worker configuration attached to the connector. | + diff --git a/rds-postgres/debezium-msk-connector/iam_role.tf b/rds-postgres/debezium-msk-connector/iam_role.tf new file mode 100644 index 0000000..f856a73 --- /dev/null +++ b/rds-postgres/debezium-msk-connector/iam_role.tf @@ -0,0 +1,87 @@ +data "aws_caller_identity" "this" {} + +data "aws_region" "this" {} + +data "aws_iam_policy_document" "msk_connect_assume_role" { + statement { + effect = "Allow" + + principals { + type = "Service" + identifiers = ["kafkaconnect.amazonaws.com"] + } + + actions = ["sts:AssumeRole"] + } +} + +data "aws_iam_policy_document" "this" { + statement { + effect = "Allow" + actions = [ + "secretsmanager:GetResourcePolicy", + "secretsmanager:GetSecretValue", + "secretsmanager:DescribeSecret", + "secretsmanager:ListSecretVersionIds", + ] + resources = [ + "arn:aws:secretsmanager:${data.aws_region.this.region}:${data.aws_caller_identity.this.account_id}:secret:${var.database_credentials_secret_name}*", + ] + } + + statement { + sid = "WriteConnectorLogs" + effect = "Allow" + actions = [ + "logs:CreateLogStream", + "logs:PutLogEvents", + "logs:DescribeLogStreams", + ] + resources = [ + aws_cloudwatch_log_group.this.arn, + "${aws_cloudwatch_log_group.this.arn}:*", + ] + } + + statement { + sid = "MSKClusterConnect" + effect = "Allow" + actions = [ + "kafka-cluster:*", + ] + resources = ["*"] + } + + statement { + sid = "MSKAccess" + effect = "Allow" + actions = [ + "kafka:*", + ] + resources = ["*"] + } + + statement { + sid = "SSMAccess" + effect = "Allow" + actions = [ + "ssm:GetParameter", + "ssm:GetParameters", + ] + resources = [ + "arn:aws:ssm:${data.aws_region.this.region}:${data.aws_caller_identity.this.account_id}:parameter/${trim(var.kafka_iam_broker_endpoint_parameter_name, "/")}", + ] + } +} + +resource "aws_iam_role" "this" { + name = "${var.cluster_name}-msk-connect-role" + assume_role_policy = data.aws_iam_policy_document.msk_connect_assume_role.json + tags = var.tags +} + +resource "aws_iam_role_policy" "this" { + name = "${var.cluster_name}-msk-connect-policy" + role = aws_iam_role.this.id + policy = data.aws_iam_policy_document.this.json +} diff --git a/rds-postgres/debezium-msk-connector/main.tf b/rds-postgres/debezium-msk-connector/main.tf new file mode 100644 index 0000000..7197dea --- /dev/null +++ b/rds-postgres/debezium-msk-connector/main.tf @@ -0,0 +1,147 @@ +locals { + normalized_cluster_name = replace(var.cluster_name, "-", "_") + + connector_name = coalesce( + var.connector_name, + "${var.cluster_name}-postgres-connector" + ) + + worker_configuration_name = coalesce( + var.worker_configuration_name, + "${var.cluster_name}-worker-config" + ) + + log_group_name = coalesce( + var.log_group_name, + "${local.connector_name}-logs" + ) + + schema_history_topic = coalesce( + var.schema_history_topic, + "schemahistory.${local.normalized_cluster_name}" + ) + + worker_configuration_properties_file_content = coalesce( + var.worker_configuration_properties_file_content, + <<-EOT + key.converter=org.apache.kafka.connect.storage.StringConverter + value.converter=org.apache.kafka.connect.storage.StringConverter + config.providers=secretsmanager,ssm + config.providers.secretsmanager.class=com.amazonaws.kafka.config.providers.SecretsManagerConfigProvider + config.providers.ssm.class=com.amazonaws.kafka.config.providers.SsmParamStoreConfigProvider + config.providers.secretsmanager.param.region=${data.aws_region.this.region} + config.providers.ssm.param.region=${data.aws_region.this.region} + EOT + ) + + default_connector_configuration = { + "connector.class" = "io.debezium.connector.postgresql.PostgresConnector" + "database.dbname" = "$${secretsmanager:${var.database_credentials_secret_name}:dbname}" + "database.hostname" = "$${secretsmanager:${var.database_credentials_secret_name}:host}" + "database.password" = "$${secretsmanager:${var.database_credentials_secret_name}:password}" + "database.port" = "$${secretsmanager:${var.database_credentials_secret_name}:port}" + "database.user" = "$${secretsmanager:${var.database_credentials_secret_name}:username}" + "plugin.name" = "pgoutput" + "publication.name" = "dbz_publication_${local.normalized_cluster_name}" + "schema.history.internal.consumer.sasl.client.callback.handler.class" = "software.amazon.msk.auth.iam.IAMClientCallbackHandler" + "schema.history.internal.consumer.sasl.jaas.config" = "software.amazon.msk.auth.iam.IAMLoginModule required;" + "schema.history.internal.consumer.sasl.mechanism" = "AWS_MSK_IAM" + "schema.history.internal.consumer.security.protocol" = "SASL_SSL" + "schema.history.internal.kafka.bootstrap.servers" = "$${ssm:${var.kafka_iam_broker_endpoint_parameter_name}}" + "schema.history.internal.kafka.topic" = local.schema_history_topic + "schema.history.internal.producer.sasl.client.callback.handler.class" = "software.amazon.msk.auth.iam.IAMClientCallbackHandler" + "schema.history.internal.producer.sasl.jaas.config" = "software.amazon.msk.auth.iam.IAMLoginModule required;" + "schema.history.internal.producer.sasl.mechanism" = "AWS_MSK_IAM" + "schema.history.internal.producer.security.protocol" = "SASL_SSL" + "slot.name" = "debezium_${local.normalized_cluster_name}" + "table.include.list" = var.table_include_list + "tasks.max" = tostring(var.tasks_max) + "topic.creation.default.cleanup.policy" = "compact" + "topic.creation.default.partitions" = "-1" + "topic.creation.default.replication.factor" = "-1" + "topic.prefix" = local.normalized_cluster_name + } +} + +data "aws_mskconnect_custom_plugin" "this" { + name = var.custom_plugin_name +} + +resource "aws_cloudwatch_log_group" "this" { + name = local.log_group_name + retention_in_days = var.cloudwatch_log_retention_in_days + tags = var.tags +} + +resource "aws_mskconnect_worker_configuration" "this" { + name = local.worker_configuration_name + properties_file_content = local.worker_configuration_properties_file_content +} + +resource "aws_mskconnect_connector" "this" { + name = local.connector_name + kafkaconnect_version = var.kafkaconnect_version + + capacity { + autoscaling { + mcu_count = var.mcu_count + min_worker_count = var.min_worker_count + max_worker_count = var.max_worker_count + + scale_in_policy { + cpu_utilization_percentage = 20 + } + + scale_out_policy { + cpu_utilization_percentage = 80 + } + } + } + + connector_configuration = merge( + local.default_connector_configuration, + var.connector_configuration_overrides + ) + + kafka_cluster { + apache_kafka_cluster { + bootstrap_servers = var.bootstrap_brokers_sasl_iam + + vpc { + security_groups = var.security_group_ids + subnets = var.subnet_ids + } + } + } + + kafka_cluster_client_authentication { + authentication_type = "IAM" + } + + kafka_cluster_encryption_in_transit { + encryption_type = "TLS" + } + + log_delivery { + worker_log_delivery { + cloudwatch_logs { + enabled = true + log_group = aws_cloudwatch_log_group.this.name + } + } + } + + plugin { + custom_plugin { + arn = data.aws_mskconnect_custom_plugin.this.arn + revision = data.aws_mskconnect_custom_plugin.this.latest_revision + } + } + + service_execution_role_arn = aws_iam_role.this.arn + + worker_configuration { + arn = aws_mskconnect_worker_configuration.this.arn + revision = aws_mskconnect_worker_configuration.this.latest_revision + } +} diff --git a/rds-postgres/debezium-msk-connector/makefile b/rds-postgres/debezium-msk-connector/makefile new file mode 100644 index 0000000..2ba5cfc --- /dev/null +++ b/rds-postgres/debezium-msk-connector/makefile @@ -0,0 +1,58 @@ +TFLINTRC := ../../.tflint.hcl +MODULEFILES := $(wildcard *.tf) + +.PHONY: default +default: checkfmt validate docs lint + +.PHONY: checkfmt +checkfmt: .fmt + +.PHONY: fmt +fmt: $(MODULEFILES) + terraform fmt + @touch .fmt + +.PHONY: validate +validate: .validate + +.PHONY: docs +docs: README.md + +.PHONY: lint +lint: .lint + +.lint: $(MODULEFILES) .lintinit + tflint --config=$(TFLINTRC) + @touch .lint + +.lintinit: $(TFLINTRC) + tflint --init --config=$(TFLINTRC) + @touch .lintinit + +README.md: $(MODULEFILES) + terraform-docs markdown table . --output-file README.md + +.fmt: $(MODULEFILES) + terraform fmt -check + @touch .fmt + +.PHONY: init +init: .init + +.init: versions.tf + terraform init -backend=false + @touch .init + +.validate: .init $(MODULEFILES) $(wildcard *.tf.example) + echo | cat - $(wildcard *.tf.example) > test.tf + if AWS_DEFAULT_REGION=us-east-1 terraform validate; then \ + rm test.tf; \ + touch .validate; \ + else \ + rm test.tf; \ + false; \ + fi + +.PHONY: clean +clean: + rm -rf .fmt .init .lint .lintinit .terraform .terraform.lock.hcl .validate diff --git a/rds-postgres/debezium-msk-connector/outputs.tf b/rds-postgres/debezium-msk-connector/outputs.tf new file mode 100644 index 0000000..efddfde --- /dev/null +++ b/rds-postgres/debezium-msk-connector/outputs.tf @@ -0,0 +1,34 @@ +output "connector" { + description = "The created MSK Connect connector resource." + value = aws_mskconnect_connector.this +} + +output "connector_arn" { + description = "ARN of the created MSK Connect connector." + value = aws_mskconnect_connector.this.arn +} + +output "connector_name" { + description = "Name of the created MSK Connect connector." + value = aws_mskconnect_connector.this.name +} + +output "custom_plugin_arn" { + description = "ARN of the MSK Connect custom plugin used by the connector." + value = data.aws_mskconnect_custom_plugin.this.arn +} + +output "log_group_name" { + description = "Name of the CloudWatch log group receiving connector worker logs." + value = aws_cloudwatch_log_group.this.name +} + +output "service_execution_role_arn" { + description = "ARN of the IAM role assumed by MSK Connect." + value = aws_iam_role.this.arn +} + +output "worker_configuration_arn" { + description = "ARN of the worker configuration attached to the connector." + value = aws_mskconnect_worker_configuration.this.arn +} diff --git a/rds-postgres/debezium-msk-connector/scripts/build_upload_plugin.py b/rds-postgres/debezium-msk-connector/scripts/build_upload_plugin.py new file mode 100755 index 0000000..0a0619b --- /dev/null +++ b/rds-postgres/debezium-msk-connector/scripts/build_upload_plugin.py @@ -0,0 +1,506 @@ +#!/usr/bin/env python3 + +import atexit +import json +import os +import re +import shutil +import subprocess +import sys +import tarfile +import urllib.error +import urllib.request +import zipfile +from pathlib import Path + + +DEBEZIUM_BASE_URL = "https://repo1.maven.org/maven2/io/debezium/debezium-connector-postgres" +DEBEZIUM_RELEASES_URL = "https://debezium.io/releases/" +MSK_CONFIG_PROVIDER_BASE_URL = "https://github.com/aws-samples/msk-config-providers/releases/download" +MSK_CONFIG_PROVIDER_RELEASES_URL = "https://github.com/aws-samples/msk-config-providers/releases" +DEFAULT_MSK_CONFIG_PROVIDER_VERSION = "0.4.0" +DEFAULT_DEBEZIUM_VERSION = "3.5.0" + +SCRIPT_PATH = Path(__file__).resolve() +MODULE_DIR = SCRIPT_PATH.parent.parent +WORK_ROOT = MODULE_DIR / ".plugin-build" + +RED = "\033[31m" +GREEN = "\033[32m" +YELLOW = "\033[33m" +BLUE = "\033[34m" +BOLD = "\033[1m" +RESET = "\033[0m" + + +class ScriptError(Exception): + pass + + +class BuilderState: + def __init__(self) -> None: + self.debezium_version = "" + self.debezium_archive_version = "" + self.debezium_archive_name = "" + self.debezium_archive_url = "" + self.msk_config_provider_version = "" + self.aws_region = "" + self.auth_mode = "current" + self.aws_profile = "" + self.role_arn = "" + self.role_session_name = "" + self.s3_bucket = "" + self.s3_prefix = "" + self.plugin_zip_name = "" + self.custom_plugin_name = "" + self.confirm_overwrite = False + + +STATE = BuilderState() + + +def print_header(message: str) -> None: + print() + print(f"{BOLD}{BLUE}== {message} =={RESET}") + + +def print_info(message: str) -> None: + print(f"{BLUE}[info]{RESET} {message}") + + +def print_success(message: str) -> None: + print(f"{GREEN}[ok]{RESET} {message}") + + +def print_warning(message: str) -> None: + print(f"{YELLOW}[warn]{RESET} {message}", file=sys.stderr) + + +def print_error(message: str) -> None: + print(f"{RED}[error]{RESET} {message}", file=sys.stderr) + + +def print_progress(message: str) -> None: + print(f"\r{BLUE}[download]{RESET} {message}", end="", flush=True) + + +def finish_progress() -> None: + print() + + +def require_command(command_name: str) -> None: + if shutil.which(command_name) is None: + raise ScriptError(f"Required command not found: {command_name}") + + +def trim(value: str) -> str: + return value.strip() + + +def sanitize_version_for_name(value: str) -> str: + return re.sub(r"[.+]", "-", value) + + +def normalize_s3_prefix(value: str) -> str: + normalized = trim(value).strip("/") + return normalized + + +def prompt_value(prompt_text: str, default_value: str = "") -> str: + if default_value: + response = input(f"{prompt_text} [{default_value}]: ") + else: + response = input(f"{prompt_text}: ") + + response = trim(response) + if not response: + return default_value + return response + + +def prompt_non_empty(prompt_text: str, default_value: str = "") -> str: + while True: + response = prompt_value(prompt_text, default_value) + if response: + return response + print_warning("A value is required.") + + +def prompt_yes_no(prompt_text: str, default_answer: str = "y") -> bool: + alternate = "n" if default_answer == "y" else "y" + while True: + response = input(f"{prompt_text} [{default_answer}/{alternate}]: ") + response = trim(response) or default_answer + lowered = response.lower() + if lowered in {"y", "yes"}: + return True + if lowered in {"n", "no"}: + return False + print_warning("Please answer y or n.") + + +def prompt_auth_mode() -> str: + while True: + response = input( + "Authentication mode: press Enter for current AWS credentials, 1 for AWS profile, or 2 for IAM role: " + ) + response = trim(response) + if response == "": + return "current" + if response == "1": + return "profile" + if response == "2": + return "role" + print_warning("Please press Enter, 1, or 2.") + + +def build_debezium_meta(version: str) -> tuple[str, str, str]: + archive_version = f"{version}.Final" + archive_name = f"debezium-connector-postgres-{archive_version}-plugin.tar.gz" + archive_url = f"{DEBEZIUM_BASE_URL}/{archive_version}/{archive_name}" + return archive_version, archive_name, archive_url + + +def validate_http_url(url: str) -> bool: + request = urllib.request.Request(url, method="HEAD") + try: + with urllib.request.urlopen(request): + return True + except urllib.error.HTTPError as exc: + if exc.code == 405: + try: + with urllib.request.urlopen(url): + return True + except Exception: + return False + return False + except Exception: + return False + + +def detect_default_region() -> str: + env = os.environ.copy() + try: + result = subprocess.run( + ["aws", "configure", "get", "region"], + check=False, + capture_output=True, + text=True, + env=env, + ) + except OSError: + return "" + + if result.returncode != 0: + return "" + return trim(result.stdout) + + +def run_aws_command(args: list[str], capture_output: bool = False) -> subprocess.CompletedProcess: + env = os.environ.copy() + return subprocess.run( + args, + check=True, + text=True, + capture_output=capture_output, + env=env, + ) + + +def assume_role_if_requested() -> None: + if not STATE.role_arn: + return + + print_info(f"Assuming IAM role {STATE.role_arn}") + + session_name = STATE.role_session_name or "debezium-plugin-build" + result = run_aws_command( + [ + "aws", + "sts", + "assume-role", + "--role-arn", + STATE.role_arn, + "--role-session-name", + session_name, + "--output", + "json", + ], + capture_output=True, + ) + + payload = json.loads(result.stdout) + credentials = payload["Credentials"] + os.environ.pop("AWS_PROFILE", None) + os.environ["AWS_ACCESS_KEY_ID"] = credentials["AccessKeyId"] + os.environ["AWS_SECRET_ACCESS_KEY"] = credentials["SecretAccessKey"] + os.environ["AWS_SESSION_TOKEN"] = credentials["SessionToken"] + + print_success("Role assumed successfully.") + + +def validate_bucket_access() -> None: + print_info(f"Validating access to s3://{STATE.s3_bucket}") + run_aws_command(["aws", "s3api", "head-bucket", "--bucket", STATE.s3_bucket]) + + +def resolve_bucket_region() -> str: + result = run_aws_command( + [ + "aws", + "s3api", + "get-bucket-location", + "--bucket", + STATE.s3_bucket, + "--output", + "json", + ], + capture_output=True, + ) + payload = json.loads(result.stdout) + bucket_region = payload.get("LocationConstraint") + if bucket_region in (None, "None"): + return "us-east-1" + return str(bucket_region) + + +def build_s3_uri() -> str: + if STATE.s3_prefix: + return f"s3://{STATE.s3_bucket}/{STATE.s3_prefix}/{STATE.plugin_zip_name}" + return f"s3://{STATE.s3_bucket}/{STATE.plugin_zip_name}" + + +def cleanup_workspace() -> None: + if WORK_ROOT.exists(): + shutil.rmtree(WORK_ROOT) + + +def prepare_workspace() -> None: + cleanup_workspace() + (WORK_ROOT / "custom-plugin").mkdir(parents=True, exist_ok=True) + + +def download_file(url: str, destination: Path) -> None: + chunk_size = 1024 * 1024 + with urllib.request.urlopen(url) as response, destination.open("wb") as handle: + total_bytes_header = response.headers.get("Content-Length") + total_bytes = int(total_bytes_header) if total_bytes_header else 0 + downloaded_bytes = 0 + + while True: + chunk = response.read(chunk_size) + if not chunk: + break + + handle.write(chunk) + downloaded_bytes += len(chunk) + + if total_bytes > 0: + percentage = (downloaded_bytes / total_bytes) * 100 + print_progress( + f"{downloaded_bytes:,}/{total_bytes:,} bytes ({percentage:.1f}%)" + ) + else: + print_progress(f"{downloaded_bytes:,} bytes") + + finish_progress() + + +def download_and_extract_debezium() -> None: + archive_path = WORK_ROOT / STATE.debezium_archive_name + + print_info("Downloading Debezium plugin archive") + download_file(STATE.debezium_archive_url, archive_path) + + print_info("Extracting Debezium plugin archive") + with tarfile.open(archive_path, "r:gz") as archive: + archive.extractall(WORK_ROOT / "custom-plugin") + archive_path.unlink() + + +def download_and_extract_config_provider() -> None: + version = STATE.msk_config_provider_version + release_tag = f"r{version}" + archive_name = f"msk-config-providers-{version}-with-dependencies.zip" + archive_url = f"{MSK_CONFIG_PROVIDER_BASE_URL}/{release_tag}/{archive_name}" + archive_path = WORK_ROOT / archive_name + + print_info("Validating MSK config provider archive URL") + if not validate_http_url(archive_url): + raise ScriptError(f"MSK config provider version {version} is not available at {archive_url}") + + print_info("Downloading MSK config provider archive") + download_file(archive_url, archive_path) + + print_info("Extracting MSK config provider archive") + with zipfile.ZipFile(archive_path) as archive: + archive.extractall(WORK_ROOT / "custom-plugin") + archive_path.unlink() + + +def create_plugin_zip() -> None: + zip_path = MODULE_DIR / STATE.plugin_zip_name + + if zip_path.exists() and not STATE.confirm_overwrite: + print_warning(f"Output file already exists: {zip_path}") + if not prompt_yes_no("Overwrite the existing ZIP file?", "n"): + raise ScriptError(f"Aborted to avoid overwriting {zip_path}") + + print_info("Creating plugin ZIP archive") + if zip_path.exists(): + zip_path.unlink() + + with zipfile.ZipFile(zip_path, "w", compression=zipfile.ZIP_DEFLATED) as archive: + for path in sorted((WORK_ROOT / "custom-plugin").rglob("*")): + if path.is_file(): + archive.write(path, path.relative_to(WORK_ROOT / "custom-plugin")) + + print_success(f"Plugin ZIP created at {zip_path}") + + +def upload_plugin_zip() -> None: + zip_path = MODULE_DIR / STATE.plugin_zip_name + s3_uri = build_s3_uri() + + print_info(f"Uploading plugin ZIP to {s3_uri}") + run_aws_command(["aws", "s3", "cp", str(zip_path), s3_uri]) + print_success(f"Upload completed: {s3_uri}") + + +def print_summary() -> None: + print_header("Summary") + print(f"Debezium version: {STATE.debezium_version}") + print(f"Debezium archive version: {STATE.debezium_archive_version}") + print(f"MSK config provider version: {STATE.msk_config_provider_version}") + print(f"AWS region: {STATE.aws_region}") + print(f"Authentication mode: {STATE.auth_mode}") + print(f"AWS profile: {STATE.aws_profile or 'current AWS credentials'}") + print(f"IAM role: {STATE.role_arn or ''}") + print(f"S3 bucket: {STATE.s3_bucket}") + print(f"S3 prefix: {STATE.s3_prefix or ''}") + print(f"Plugin ZIP: {STATE.plugin_zip_name}") + print(f"Suggested plugin name: {STATE.custom_plugin_name}") + print(f"S3 destination: {build_s3_uri()}") + + +def collect_inputs() -> None: + print_header("Build Inputs") + print_info("Available Debezium plugin versions:") + print(f" {DEBEZIUM_RELEASES_URL}") + + while True: + STATE.debezium_version = prompt_non_empty( + "Debezium PostgreSQL connector version", + DEFAULT_DEBEZIUM_VERSION, + ) + ( + STATE.debezium_archive_version, + STATE.debezium_archive_name, + STATE.debezium_archive_url, + ) = build_debezium_meta(STATE.debezium_version) + + print_info(f"Checking Debezium archive availability: {STATE.debezium_archive_url}") + if validate_http_url(STATE.debezium_archive_url): + print_success(f"Debezium version {STATE.debezium_archive_version} is available.") + break + print_warning(f"Debezium version {STATE.debezium_archive_version} was not found.") + + while True: + print_info("Available MSK config provider versions:") + print(f" {MSK_CONFIG_PROVIDER_RELEASES_URL}") + STATE.msk_config_provider_version = prompt_non_empty( + "MSK config provider version", + DEFAULT_MSK_CONFIG_PROVIDER_VERSION, + ) + config_url = ( + f"{MSK_CONFIG_PROVIDER_BASE_URL}/r{STATE.msk_config_provider_version}/" + f"msk-config-providers-{STATE.msk_config_provider_version}-with-dependencies.zip" + ) + print_info(f"Checking MSK config provider availability: {config_url}") + if validate_http_url(config_url): + print_success(f"MSK config provider version {STATE.msk_config_provider_version} is available.") + break + print_warning(f"MSK config provider version {STATE.msk_config_provider_version} was not found.") + + STATE.auth_mode = prompt_auth_mode() + + if STATE.auth_mode == "profile": + STATE.aws_profile = prompt_non_empty("AWS profile name") + os.environ["AWS_PROFILE"] = STATE.aws_profile + print_info(f"Using AWS profile {STATE.aws_profile}") + else: + os.environ.pop("AWS_PROFILE", None) + STATE.aws_profile = "" + + if STATE.auth_mode == "role": + STATE.role_arn = prompt_non_empty("IAM role ARN to assume") + STATE.role_session_name = prompt_value("Role session name", "debezium-plugin-build") + else: + STATE.role_arn = "" + STATE.role_session_name = "" + + default_region = detect_default_region() + STATE.aws_region = prompt_non_empty("AWS region to use for S3 and STS operations", default_region) + os.environ["AWS_REGION"] = STATE.aws_region + os.environ["AWS_DEFAULT_REGION"] = STATE.aws_region + + STATE.s3_bucket = prompt_non_empty("S3 bucket name") + STATE.s3_prefix = normalize_s3_prefix(prompt_value("Optional S3 key prefix (press Enter for bucket root)")) + + default_zip_name = f"postgres-msk-debezium-plugin-{STATE.debezium_version}.zip" + STATE.plugin_zip_name = prompt_non_empty("Local output ZIP filename", default_zip_name) + + default_plugin_name = f"postgresql-msk-debezium-connector-{sanitize_version_for_name(STATE.debezium_version)}" + STATE.custom_plugin_name = prompt_non_empty("Suggested MSK Connect custom plugin name", default_plugin_name) + + STATE.confirm_overwrite = not (MODULE_DIR / STATE.plugin_zip_name).exists() + + +def main() -> int: + require_command("aws") + + print_header("Debezium MSK Connect Plugin Builder") + print("This script builds the Debezium PostgreSQL MSK Connect plugin ZIP and uploads it to S3.") + print("It validates artifact URLs before downloading and can optionally assume an IAM role first.") + + collect_inputs() + print_summary() + + if not prompt_yes_no("Proceed with build and upload?", "y"): + raise ScriptError("Aborted by user.") + + assume_role_if_requested() + validate_bucket_access() + + bucket_region = resolve_bucket_region() + if bucket_region != STATE.aws_region: + print_warning(f"Bucket region is {bucket_region}, while the selected AWS region is {STATE.aws_region}.") + + prepare_workspace() + atexit.register(cleanup_workspace) + + download_and_extract_debezium() + download_and_extract_config_provider() + create_plugin_zip() + upload_plugin_zip() + + print_success("Plugin build and upload completed successfully.") + print() + print("Terraform module input reference:") + print(f" custom_plugin_name = \"{STATE.custom_plugin_name}\"") + print(f" S3 object = {build_s3_uri()}") + return 0 + + +if __name__ == "__main__": + try: + sys.exit(main()) + except KeyboardInterrupt: + print_error("Interrupted by user.") + sys.exit(130) + except subprocess.CalledProcessError as exc: + print_error(f"Command failed with exit code {exc.returncode}: {' '.join(exc.cmd)}") + sys.exit(exc.returncode) + except ScriptError as exc: + print_error(str(exc)) + sys.exit(1) diff --git a/rds-postgres/debezium-msk-connector/variables.tf b/rds-postgres/debezium-msk-connector/variables.tf new file mode 100644 index 0000000..88276e3 --- /dev/null +++ b/rds-postgres/debezium-msk-connector/variables.tf @@ -0,0 +1,117 @@ +variable "cluster_name" { + type = string + description = "Base name used to derive connector, slot, publication, and topic defaults." +} + +variable "connector_name" { + type = string + default = null + description = "Explicit name for the MSK Connect connector. Defaults to '-postgres-connector'." +} + +variable "worker_configuration_name" { + type = string + default = null + description = "Explicit name for the MSK Connect worker configuration. Defaults to '-worker-config'." +} + +variable "log_group_name" { + type = string + default = null + description = "Explicit name for the CloudWatch log group. Defaults to '-logs'." +} + +variable "kafkaconnect_version" { + type = string + default = "3.7.x" + description = "Kafka Connect runtime version for the MSK Connect connector." +} + +variable "mcu_count" { + type = number + default = 1 + description = "Number of MSK Connect units (MCUs) allocated to each connector worker." +} + +variable "min_worker_count" { + type = number + default = 1 + description = "Minimum number of autoscaled workers for the MSK Connect connector." +} + +variable "max_worker_count" { + type = number + default = 2 + description = "Maximum number of autoscaled workers for the MSK Connect connector." +} + +variable "database_credentials_secret_name" { + type = string + description = "Name of the AWS Secrets Manager secret that stores the PostgreSQL connection details with keys `host`, `port`, `dbname`, `username`, and `password`." +} + +variable "kafka_iam_broker_endpoint_parameter_name" { + type = string + description = "Name of the SSM parameter that contains the MSK IAM bootstrap broker endpoint." +} + +variable "table_include_list" { + type = string + description = "Comma-separated list of PostgreSQL tables that Debezium should capture. Example: 'public.table_a,public.table_b'." +} + +variable "bootstrap_brokers_sasl_iam" { + type = string + description = "SASL/IAM bootstrap broker connection string for the target Amazon MSK cluster." +} + +variable "security_group_ids" { + type = list(string) + description = "Security group IDs attached to the MSK Connect connector within the VPC." +} + +variable "subnet_ids" { + type = list(string) + description = "Subnet IDs where the MSK Connect connector workers will run." +} + +variable "custom_plugin_name" { + type = string + description = "Name of the existing AWS MSK Connect custom plugin that contains the Debezium PostgreSQL connector artifacts." +} + +variable "schema_history_topic" { + type = string + default = null + description = "Override for the internal Kafka topic used by Debezium schema history. Defaults to 'schemahistory.' with hyphens replaced by underscores." +} + +variable "tasks_max" { + type = number + default = 1 + description = "Maximum number of connector tasks. PostgreSQL Debezium connectors typically run with a single task." +} + +variable "cloudwatch_log_retention_in_days" { + type = number + default = 30 + description = "Number of days to retain connector worker logs in CloudWatch." +} + +variable "worker_configuration_properties_file_content" { + type = string + default = null + description = "Optional full worker configuration properties content. Defaults to a configuration that enables Secrets Manager and SSM config providers in the current AWS region." +} + +variable "connector_configuration_overrides" { + type = map(string) + default = {} + description = "Additional connector configuration entries to merge over the module defaults." +} + +variable "tags" { + type = map(string) + default = {} + description = "Tags to apply to supported resources created by this module." +} diff --git a/rds-postgres/debezium-msk-connector/versions.tf b/rds-postgres/debezium-msk-connector/versions.tf new file mode 100644 index 0000000..20568c8 --- /dev/null +++ b/rds-postgres/debezium-msk-connector/versions.tf @@ -0,0 +1,10 @@ +terraform { + required_version = ">= 1.6.2" + + required_providers { + aws = { + source = "hashicorp/aws" + version = "~> 6.0" + } + } +}