From 5e481843fd6171b741272b618e9b9c807e0de37e Mon Sep 17 00:00:00 2001 From: salihoto Date: Mon, 27 Jun 2022 11:55:43 +0300 Subject: [PATCH 01/11] bq producer --- pom.xml | 23 +- .../com/zendesk/maxwell/MaxwellConfig.java | 27 ++ .../com/zendesk/maxwell/MaxwellContext.java | 3 + .../producer/MaxwellBigQueryProducer.java | 267 ++++++++++++++++++ .../schema/BqToBqStorageSchemaConverter.java | 88 ++++++ 5 files changed, 401 insertions(+), 7 deletions(-) create mode 100644 src/main/java/com/zendesk/maxwell/producer/MaxwellBigQueryProducer.java create mode 100644 src/main/java/com/zendesk/maxwell/schema/BqToBqStorageSchemaConverter.java diff --git a/pom.xml b/pom.xml index 4f875515b..48b77b61c 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ com.zendesk maxwell - 1.37.3 + 1.37.7 jar maxwell @@ -123,8 +123,17 @@ - + + com.google.cloud + google-cloud-bigquerystorage + 2.14.2 + + + com.google.cloud + google-cloud-bigquery + 2.13.3 + com.mchange c3p0 @@ -133,7 +142,7 @@ com.amazonaws aws-java-sdk-core - 1.12.137 + 1.12.217 com.amazonaws @@ -218,7 +227,7 @@ com.zendesk mysql-binlog-connector-java - 0.25.3 + 0.25.6 net.sf.jopt-simple @@ -313,12 +322,12 @@ com.viafoura metrics-datadog - 2.0.0-RC2 + 2.0.0-RC3 com.amazonaws aws-java-sdk-sns - 1.12.137 + 1.12.191 com.amazonaws @@ -556,4 +565,4 @@ - + \ No newline at end of file diff --git a/src/main/java/com/zendesk/maxwell/MaxwellConfig.java b/src/main/java/com/zendesk/maxwell/MaxwellConfig.java index 9be74598b..d900cce8e 100644 --- a/src/main/java/com/zendesk/maxwell/MaxwellConfig.java +++ b/src/main/java/com/zendesk/maxwell/MaxwellConfig.java @@ -203,6 +203,21 @@ public class MaxwellConfig extends AbstractConfig { */ public String ddlPubsubTopic; + /** + * {@link com.zendesk.maxwell.producer.MaxwellBigQueryProducer} project id + */ + public String bigQueryProjectId; + + /** + * {@link com.zendesk.maxwell.producer.MaxwellBigQueryProducer} dataset + */ + public String bigQueryDataset; + + /** + * {@link com.zendesk.maxwell.producer.MaxwellBigQueryProducer} table + */ + public String bigQueryTable; + /** * {@link com.zendesk.maxwell.producer.MaxwellPubsubProducer} bytes request threshold */ @@ -789,6 +804,14 @@ protected MaxwellOptionParser buildOptionParser() { parser.accepts( "nats_url", "Url(s) of Nats connection (comma separated). Default is localhost:4222" ).withRequiredArg(); parser.accepts( "nats_subject", "Subject Hierarchies of Nats. Default is '%{database}.%{table}'" ).withRequiredArg(); + parser.section( "bigquery" ); + parser.accepts( "bigquery_project_id", "provide a google cloud platform project id associated with the bigquery table" ) + .withRequiredArg(); + parser.accepts( "bigquery_dataset", "provide a google cloud platform dataset id associated with the bigquery table" ) + .withRequiredArg(); + parser.accepts( "bigquery_table", "provide a google cloud platform table id associated with the bigquery table" ) + .withRequiredArg(); + parser.section( "pubsub" ); parser.accepts( "pubsub_project_id", "provide a google cloud platform project id associated with the pubsub topic" ) .withRequiredArg(); @@ -994,6 +1017,10 @@ private void setup(OptionSet options, Properties properties) { this.kafkaPartitionHash = fetchStringOption("kafka_partition_hash", options, properties, "default"); this.ddlKafkaTopic = fetchStringOption("ddl_kafka_topic", options, properties, this.kafkaTopic); + this.bigQueryProjectId = fetchStringOption("bigquery_project_id", options, properties, null); + this.bigQueryDataset = fetchStringOption("bigquery_dataset", options, properties, null); + this.bigQueryTable = fetchStringOption("bigquery_table", options, properties, null); + this.pubsubProjectId = fetchStringOption("pubsub_project_id", options, properties, null); this.pubsubTopic = fetchStringOption("pubsub_topic", options, properties, "maxwell"); this.ddlPubsubTopic = fetchStringOption("ddl_pubsub_topic", options, properties, this.pubsubTopic); diff --git a/src/main/java/com/zendesk/maxwell/MaxwellContext.java b/src/main/java/com/zendesk/maxwell/MaxwellContext.java index 484886a7d..2cbbaf217 100644 --- a/src/main/java/com/zendesk/maxwell/MaxwellContext.java +++ b/src/main/java/com/zendesk/maxwell/MaxwellContext.java @@ -550,6 +550,9 @@ public AbstractProducer getProducer() throws IOException { case "redis": this.producer = new MaxwellRedisProducer(this); break; + case "bigquery": + this.producer = new MaxwellBigQueryProducer(this, this.config.bigQueryProjectId, this.config.bigQueryDataset, this.config.bigQueryTable); + break; case "none": this.producer = new NoneProducer(this); break; diff --git a/src/main/java/com/zendesk/maxwell/producer/MaxwellBigQueryProducer.java b/src/main/java/com/zendesk/maxwell/producer/MaxwellBigQueryProducer.java new file mode 100644 index 000000000..65ad3612d --- /dev/null +++ b/src/main/java/com/zendesk/maxwell/producer/MaxwellBigQueryProducer.java @@ -0,0 +1,267 @@ +package com.zendesk.maxwell.producer; + +import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutureCallback; +import com.google.api.core.ApiFutures; +import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.BigQueryOptions; +import com.google.cloud.bigquery.Schema; +import com.google.cloud.bigquery.Table; +import com.google.cloud.bigquery.storage.v1.AppendRowsResponse; +import com.google.cloud.bigquery.storage.v1.Exceptions; +import com.google.cloud.bigquery.storage.v1.Exceptions.StorageException; +import com.google.cloud.bigquery.storage.v1.JsonStreamWriter; +import com.google.cloud.bigquery.storage.v1.TableName; +import com.google.cloud.bigquery.storage.v1.TableSchema; +import com.google.common.collect.ImmutableList; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.protobuf.Descriptors.DescriptorValidationException; +import com.zendesk.maxwell.MaxwellContext; +import com.zendesk.maxwell.monitoring.Metrics; +import com.zendesk.maxwell.replication.Position; +import com.zendesk.maxwell.row.RowMap; +import com.zendesk.maxwell.schema.BqToBqStorageSchemaConverter; +import com.zendesk.maxwell.util.StoppableTask; +import com.zendesk.maxwell.util.StoppableTaskState; + +import io.grpc.Status; +import io.grpc.Status.Code; +import java.io.IOException; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.Phaser; +import java.util.concurrent.TimeoutException; +import com.codahale.metrics.Counter; +import com.codahale.metrics.Meter; + +import javax.annotation.concurrent.GuardedBy; +import org.json.JSONArray; +import org.json.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class BigQueryCallback implements ApiFutureCallback { + public final Logger LOGGER = LoggerFactory.getLogger(BigQueryCallback.class); + + private final MaxwellBigQueryProducerWorker parent; + private final AbstractAsyncProducer.CallbackCompleter cc; + private final Position position; + private MaxwellContext context; + AppendContext appendContext; + + private Counter succeededMessageCount; + private Counter failedMessageCount; + private Meter succeededMessageMeter; + private Meter failedMessageMeter; + + private static final int MAX_RETRY_COUNT = 2; + private final ImmutableList RETRIABLE_ERROR_CODES = ImmutableList.of(Code.INTERNAL, Code.ABORTED, + Code.CANCELLED); + + public BigQueryCallback(MaxwellBigQueryProducerWorker parent, + AppendContext appendContext, + AbstractAsyncProducer.CallbackCompleter cc, + Position position, + Counter producedMessageCount, Counter failedMessageCount, + Meter succeededMessageMeter, Meter failedMessageMeter, + MaxwellContext context) { + this.parent = parent; + this.appendContext = appendContext; + this.cc = cc; + this.position = position; + this.succeededMessageCount = producedMessageCount; + this.failedMessageCount = failedMessageCount; + this.succeededMessageMeter = succeededMessageMeter; + this.failedMessageMeter = failedMessageMeter; + this.context = context; + } + + @Override + public void onSuccess(AppendRowsResponse response) { + this.succeededMessageCount.inc(); + this.succeededMessageMeter.mark(); + + if (LOGGER.isDebugEnabled()) { + try { + LOGGER.debug("-> {}\n" + + " {}\n", + this.appendContext.r.toJSON(), this.position); + } catch (Exception e) { + e.printStackTrace(); + } + } + cc.markCompleted(); + } + + @Override + public void onFailure(Throwable t) { + this.failedMessageCount.inc(); + this.failedMessageMeter.mark(); + + LOGGER.error(t.getClass().getSimpleName() + " @ " + position); + LOGGER.error(t.getLocalizedMessage()); + + Status status = Status.fromThrowable(t); + if (appendContext.retryCount < MAX_RETRY_COUNT + && RETRIABLE_ERROR_CODES.contains(status.getCode())) { + appendContext.retryCount++; + try { + this.parent.sendAsync(appendContext.r, this.cc); + cc.markCompleted(); + return; + } catch (Exception e) { + System.out.format("Failed to retry append: %s\n", e); + } + } + + synchronized (this.parent.lock) { + if (this.parent.error == null) { + StorageException storageException = Exceptions.toStorageException(t); + this.parent.error = (storageException != null) ? storageException : new RuntimeException(t); + } + } + cc.markCompleted(); + } +} + +public class MaxwellBigQueryProducer extends AbstractProducer { + + private final ArrayBlockingQueue queue; + private final MaxwellBigQueryProducerWorker worker; + + public MaxwellBigQueryProducer(MaxwellContext context,String bigQueryProjectId, + String bigQueryDataset, String bigQueryTable) + throws IOException { + super(context); + this.queue = new ArrayBlockingQueue<>(100); + this.worker = new MaxwellBigQueryProducerWorker(context, this.queue, bigQueryProjectId, bigQueryDataset, bigQueryTable); + + TableName table = TableName.of(bigQueryProjectId, bigQueryDataset, bigQueryTable); + try { + this.worker.initialize(table); + } catch (DescriptorValidationException e) { + e.printStackTrace(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + Thread thread = new Thread(this.worker, "maxwell-bigquery-worker"); + thread.setDaemon(true); + thread.start(); + } + + + + @Override + public void push(RowMap r) throws Exception { + this.queue.put(r); + } +} + +class AppendContext { + JSONArray data; + int retryCount = 0; + RowMap r = null; + + AppendContext(JSONArray data, int retryCount, RowMap r) { + this.data = data; + this.retryCount = retryCount; + this.r = r; + } +} + +class MaxwellBigQueryProducerWorker extends AbstractAsyncProducer implements Runnable, StoppableTask { + static final Logger LOGGER = LoggerFactory.getLogger(MaxwellBigQueryProducerWorker.class); + + private final ArrayBlockingQueue queue; + private StoppableTaskState taskState; + private Thread thread; + + public MaxwellBigQueryProducerWorker(MaxwellContext context, + ArrayBlockingQueue queue,String bigQueryProjectId, + String bigQueryDataset, String bigQueryTable) throws IOException { + super(context); + this.queue = queue; + Metrics metrics = context.getMetrics(); + this.taskState = new StoppableTaskState("MaxwellBigQueryProducerWorker"); + } + + public final Object lock = new Object(); + private JsonStreamWriter streamWriter; + + @GuardedBy("lock") + public RuntimeException error = null; + + public void initialize(TableName tName) + throws DescriptorValidationException, IOException, InterruptedException { + + BigQuery bigquery = BigQueryOptions.newBuilder().setProjectId(tName.getProject()).build().getService(); + Table table = bigquery.getTable(tName.getDataset(), tName.getTable()); + Schema schema = table.getDefinition().getSchema(); + TableSchema tableSchema = BqToBqStorageSchemaConverter.convertTableSchema(schema); + streamWriter = JsonStreamWriter.newBuilder(tName.toString(), tableSchema).build(); + } + + @Override + public void requestStop() throws Exception { + taskState.requestStop(); + streamWriter.close(); + synchronized (this.lock) { + if (this.error != null) { + throw this.error; + } + } + } + + @Override + public void awaitStop(Long timeout) throws TimeoutException { + taskState.awaitStop(thread, timeout); + } + + @Override + public void run() { + this.thread = Thread.currentThread(); + while (true) { + try { + RowMap row = queue.take(); + if (!taskState.isRunning()) { + taskState.stopped(); + return; + } + this.push(row); + } catch (Exception e) { + taskState.stopped(); + context.terminate(e); + return; + } + } + } + + @Override + public void sendAsync(RowMap r, CallbackCompleter cc) throws Exception { + synchronized (this.lock) { + if (this.error != null) { + throw this.error; + } + } + JSONArray jsonArr = new JSONArray(); + JSONObject record = new JSONObject(r.toJSON(outputConfig)); + LOGGER.debug("maxwell incoming log -> " + r.toJSON(outputConfig)); + //stringfy columns in order to adapt noon cdc log table schema + String data = record.getJSONObject("data").toString(); + String old = record.getJSONObject("data").toString(); + String primary_key = record.get("primary_key").toString(); + record.put("data", data); + record.put("old", old); + record.put("primary_key", primary_key); + + jsonArr.put(record); + AppendContext appendContext = new AppendContext(jsonArr, 3, r); + + ApiFuture future = streamWriter.append(appendContext.data); + ApiFutures.addCallback( + future, new BigQueryCallback(this, appendContext, cc, r.getNextPosition(), + this.succeededMessageCount, this.failedMessageCount, this.succeededMessageMeter, this.failedMessageMeter, + this.context), + MoreExecutors.directExecutor()); + } +} diff --git a/src/main/java/com/zendesk/maxwell/schema/BqToBqStorageSchemaConverter.java b/src/main/java/com/zendesk/maxwell/schema/BqToBqStorageSchemaConverter.java new file mode 100644 index 000000000..cfda9609e --- /dev/null +++ b/src/main/java/com/zendesk/maxwell/schema/BqToBqStorageSchemaConverter.java @@ -0,0 +1,88 @@ +package com.zendesk.maxwell.schema; + +/* + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import com.google.cloud.bigquery.Field; +import com.google.cloud.bigquery.Schema; +import com.google.cloud.bigquery.StandardSQLTypeName; +import com.google.cloud.bigquery.storage.v1.TableFieldSchema; +import com.google.cloud.bigquery.storage.v1.TableSchema; +import com.google.common.collect.ImmutableMap; + +/** Converts structure from BigQuery client to BigQueryStorage client */ +public class BqToBqStorageSchemaConverter { + private static ImmutableMap BQTableSchemaModeMap = + ImmutableMap.of( + Field.Mode.NULLABLE, TableFieldSchema.Mode.NULLABLE, + Field.Mode.REPEATED, TableFieldSchema.Mode.REPEATED, + Field.Mode.REQUIRED, TableFieldSchema.Mode.REQUIRED); + + private static ImmutableMap BQTableSchemaTypeMap = + new ImmutableMap.Builder() + .put(StandardSQLTypeName.BOOL, TableFieldSchema.Type.BOOL) + .put(StandardSQLTypeName.BYTES, TableFieldSchema.Type.BYTES) + .put(StandardSQLTypeName.DATE, TableFieldSchema.Type.DATE) + .put(StandardSQLTypeName.DATETIME, TableFieldSchema.Type.DATETIME) + .put(StandardSQLTypeName.FLOAT64, TableFieldSchema.Type.DOUBLE) + .put(StandardSQLTypeName.GEOGRAPHY, TableFieldSchema.Type.GEOGRAPHY) + .put(StandardSQLTypeName.INT64, TableFieldSchema.Type.INT64) + .put(StandardSQLTypeName.NUMERIC, TableFieldSchema.Type.NUMERIC) + .put(StandardSQLTypeName.STRING, TableFieldSchema.Type.STRING) + .put(StandardSQLTypeName.STRUCT, TableFieldSchema.Type.STRUCT) + .put(StandardSQLTypeName.TIME, TableFieldSchema.Type.TIME) + .put(StandardSQLTypeName.TIMESTAMP, TableFieldSchema.Type.TIMESTAMP) + .build(); + + /** + * Converts from BigQuery client Table Schema to bigquery storage API Table Schema. + * + * @param schema the BigQuery client Table Schema + * @return the bigquery storage API Table Schema + */ + public static TableSchema convertTableSchema(Schema schema) { + TableSchema.Builder result = TableSchema.newBuilder(); + for (int i = 0; i < schema.getFields().size(); i++) { + result.addFields(i, convertFieldSchema(schema.getFields().get(i))); + } + return result.build(); + } + + /** + * Converts from bigquery v2 Field Schema to bigquery storage API Field Schema. + * + * @param field the BigQuery client Field Schema + * @return the bigquery storage API Field Schema + */ + public static TableFieldSchema convertFieldSchema(Field field) { + TableFieldSchema.Builder result = TableFieldSchema.newBuilder(); + if (field.getMode() == null) { + field = field.toBuilder().setMode(Field.Mode.NULLABLE).build(); + } + result.setMode(BQTableSchemaModeMap.get(field.getMode())); + result.setName(field.getName()); + result.setType(BQTableSchemaTypeMap.get(field.getType().getStandardType())); + if (field.getDescription() != null) { + result.setDescription(field.getDescription()); + } + if (field.getSubFields() != null) { + for (int i = 0; i < field.getSubFields().size(); i++) { + result.addFields(i, convertFieldSchema(field.getSubFields().get(i))); + } + } + return result.build(); + } +} From c9fb89cfb2ec4e3dd92f07f30a5db7ac7b494959 Mon Sep 17 00:00:00 2001 From: salihoto Date: Mon, 27 Jun 2022 12:01:45 +0300 Subject: [PATCH 02/11] fx --- .../com/zendesk/maxwell/producer/MaxwellBigQueryProducer.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/com/zendesk/maxwell/producer/MaxwellBigQueryProducer.java b/src/main/java/com/zendesk/maxwell/producer/MaxwellBigQueryProducer.java index 65ad3612d..2a9c2a563 100644 --- a/src/main/java/com/zendesk/maxwell/producer/MaxwellBigQueryProducer.java +++ b/src/main/java/com/zendesk/maxwell/producer/MaxwellBigQueryProducer.java @@ -248,7 +248,7 @@ public void sendAsync(RowMap r, CallbackCompleter cc) throws Exception { LOGGER.debug("maxwell incoming log -> " + r.toJSON(outputConfig)); //stringfy columns in order to adapt noon cdc log table schema String data = record.getJSONObject("data").toString(); - String old = record.getJSONObject("data").toString(); + String old = record.getJSONObject("old").toString(); String primary_key = record.get("primary_key").toString(); record.put("data", data); record.put("old", old); From 3897e390762b5007f7e84fce5d5073fb789354b2 Mon Sep 17 00:00:00 2001 From: salihoto Date: Mon, 27 Jun 2022 13:10:12 +0300 Subject: [PATCH 03/11] upgrade protobuf --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 48b77b61c..0fe870832 100644 --- a/pom.xml +++ b/pom.xml @@ -287,7 +287,7 @@ com.google.protobuf protobuf-java - 3.16.1 + 3.20.0 io.dropwizard.metrics From 69fde30a428cb7a755c9a5e6f8dadaf2bb8bedc9 Mon Sep 17 00:00:00 2001 From: salihoto Date: Mon, 27 Jun 2022 16:26:07 +0300 Subject: [PATCH 04/11] . --- .../com/zendesk/maxwell/producer/MaxwellBigQueryProducer.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/com/zendesk/maxwell/producer/MaxwellBigQueryProducer.java b/src/main/java/com/zendesk/maxwell/producer/MaxwellBigQueryProducer.java index 2a9c2a563..0468c3dc6 100644 --- a/src/main/java/com/zendesk/maxwell/producer/MaxwellBigQueryProducer.java +++ b/src/main/java/com/zendesk/maxwell/producer/MaxwellBigQueryProducer.java @@ -255,7 +255,7 @@ public void sendAsync(RowMap r, CallbackCompleter cc) throws Exception { record.put("primary_key", primary_key); jsonArr.put(record); - AppendContext appendContext = new AppendContext(jsonArr, 3, r); + AppendContext appendContext = new AppendContext(jsonArr, 0, r); ApiFuture future = streamWriter.append(appendContext.data); ApiFutures.addCallback( From 21aedf43af6e2caebe1cdd4bbeb7c4adf23583b9 Mon Sep 17 00:00:00 2001 From: salihoto Date: Sat, 2 Jul 2022 23:42:36 +0300 Subject: [PATCH 05/11] bigquery producer --- config.properties.example | 8 +- docs/docs/config.md | 9 + docs/docs/producers.md | 28 ++ docs/docs/quickstart.md | 9 + pom.xml | 12 +- .../com/zendesk/maxwell/MaxwellConfig.java | 28 ++ .../com/zendesk/maxwell/MaxwellContext.java | 3 + .../producer/MaxwellBigQueryProducer.java | 281 ++++++++++++++++++ .../schema/BqToBqStorageSchemaConverter.java | 88 ++++++ .../producer/BigQueryCallbackTest.java | 79 +++++ 10 files changed, 543 insertions(+), 2 deletions(-) create mode 100644 src/main/java/com/zendesk/maxwell/producer/MaxwellBigQueryProducer.java create mode 100644 src/main/java/com/zendesk/maxwell/schema/BqToBqStorageSchemaConverter.java create mode 100644 src/test/java/com/zendesk/maxwell/producer/BigQueryCallbackTest.java diff --git a/config.properties.example b/config.properties.example index a862236bc..924c41718 100644 --- a/config.properties.example +++ b/config.properties.example @@ -11,7 +11,7 @@ password=maxwell # *** general *** -# choose where to produce data to. stdout|file|kafka|kinesis|pubsub|sqs|rabbitmq|redis +# choose where to produce data to. stdout|file|kafka|kinesis|pubsub|sqs|rabbitmq|redis|bigquery #producer=kafka # set the log level. note that you can configure things further in log4j2.xml @@ -218,6 +218,12 @@ kafka.acks=1 #pubsub_topic=maxwell #ddl_pubsub_topic=maxwell_ddl +# *** bigquery *** + +#bigquery_project_id=maxwell +#bigquery_dataset=maxwell +#bigquery_table=maxwell_ddl + # *** rabbit-mq *** #rabbitmq_host=rabbitmq_hostname diff --git a/docs/docs/config.md b/docs/docs/config.md index 11da42592..9b7185fef 100644 --- a/docs/docs/config.md +++ b/docs/docs/config.md @@ -129,6 +129,15 @@ pubsub_total_timeout | LONG | Puts a limit on the value in secon _See also:_ [PubSub Producer Documentation](/producers#google-cloud-pubsub) +## bigquery producer +option | argument | description | default +-------------------------------|-------------------------------------| --------------------------------------------------- | ------- +bigquery_project_id | STRING | Google Cloud bigquery project id | +bigquery_dataset | STRING | Google Cloud bigquery dataset id | +bigquery_table | STRING | Google Cloud bigquery table id | + +_See also:_ [PubSub Producer Documentation](/producers#google-cloud-bigquery) + ## rabbitmq producer option | argument | description | default -------------------------------|-------------------------------------| --------------------------------------------------- | ------- diff --git a/docs/docs/producers.md b/docs/docs/producers.md index ea30ca63c..845464104 100644 --- a/docs/docs/producers.md +++ b/docs/docs/producers.md @@ -260,6 +260,34 @@ for DDL updates by setting the `ddl_pubsub_topic` property. The producer uses the [Google Cloud Java Library for Pub/Sub](https://github.com/GoogleCloudPlatform/google-cloud-java/tree/master/google-cloud-pubsub) and uses its built-in configurations. +# Google Cloud BigQuery +*** +To stream data into Google Cloud Bigquery, first there must be a table created on bigquery in order to stream the data +into defined as `bigquery_project_id.bigquery_dataset.bigquery_table`. The schema of the table must match the outputConfig. The column types should be defined as below + +- database: string +- table: string +- type: string +- ts: integer +- xid: integer +- xoffset: integer +- commit: boolean +- position: string +- gtid: string +- server_id: integer +- primary_key: string +- data: string +- old: string + +See the Google Cloud Platform docs for the [latest examples of which permissions are needed](https://cloud.google.com/bigquery/docs/access-control), as well as [how to properly configure service accounts](https://cloud.google.com/compute/docs/access/create-enable-service-accounts-for-instances). + +Set the output stream in `config.properties` by setting the `bigquery_project_id`, `bigquery_dataset` and `bigquery_table` properties. + +The producer uses the [Google Cloud Java Bigquery Storage Library for Bigquery](https://github.com/googleapis/java-bigquerystorage) [Bigquery Storage Write API documenatation](https://cloud.google.com/bigquery/docs/write-api). +To use the Storage Write API, you must have `bigquery.tables.updateData` permissions. + +This producer is using the Default Stream with at-least once semantics for greater data resiliency and fewer scaling restrictions + # RabbitMQ *** To produce messages to RabbitMQ, you will need to specify a host in `config.properties` with `rabbitmq_host`. This is the only required property, everything else falls back to a sane default. diff --git a/docs/docs/quickstart.md b/docs/docs/quickstart.md index 7ddb86a63..e9ae3ac6e 100644 --- a/docs/docs/quickstart.md +++ b/docs/docs/quickstart.md @@ -100,6 +100,15 @@ bin/maxwell --user='maxwell' --password='XXXXXX' --host='127.0.0.1' \ --pubsub_topic='maxwell' ``` +## Google Cloud Bigquery + +``` +bin/maxwell --user='maxwell' --password='XXXXXX' --host='127.0.0.1' \ + --producer=bigquery --bigquery_project_id='$BIGQUERY_PROJECT_ID' \ + --bigquery_dataset='$BIGQUERY_DATASET' \ + --bigquery_table='$BIGQUERY_TABLE' +``` + ## RabbitMQ ``` diff --git a/pom.xml b/pom.xml index c945dfa16..095ed6a02 100644 --- a/pom.xml +++ b/pom.xml @@ -125,6 +125,16 @@ + + com.google.cloud + google-cloud-bigquerystorage + 2.14.2 + + + com.google.cloud + google-cloud-bigquery + 2.13.3 + com.mchange c3p0 @@ -278,7 +288,7 @@ com.google.protobuf protobuf-java - 3.16.1 + 3.20.0 io.dropwizard.metrics diff --git a/src/main/java/com/zendesk/maxwell/MaxwellConfig.java b/src/main/java/com/zendesk/maxwell/MaxwellConfig.java index b58b15247..b9e301325 100644 --- a/src/main/java/com/zendesk/maxwell/MaxwellConfig.java +++ b/src/main/java/com/zendesk/maxwell/MaxwellConfig.java @@ -253,6 +253,22 @@ public class MaxwellConfig extends AbstractConfig { */ public Duration pubsubTotalTimeout; + /** + * {@link com.zendesk.maxwell.producer.MaxwellBigQueryProducer} project id + */ + public String bigQueryProjectId; + + /** + * {@link com.zendesk.maxwell.producer.MaxwellBigQueryProducer} dataset + */ + public String bigQueryDataset; + + /** + * {@link com.zendesk.maxwell.producer.MaxwellBigQueryProducer} table + */ + public String bigQueryTable; + + /** * Used in all producers deriving from {@link com.zendesk.maxwell.producer.AbstractAsyncProducer}.
* In milliseconds, time a message can spend in the {@link com.zendesk.maxwell.producer.InflightMessageList} @@ -789,6 +805,14 @@ protected MaxwellOptionParser buildOptionParser() { parser.accepts( "nats_url", "Url(s) of Nats connection (comma separated). Default is localhost:4222" ).withRequiredArg(); parser.accepts( "nats_subject", "Subject Hierarchies of Nats. Default is '%{database}.%{table}'" ).withRequiredArg(); + parser.section( "bigquery" ); + parser.accepts( "bigquery_project_id", "provide a google cloud platform project id associated with the bigquery table" ) + .withRequiredArg(); + parser.accepts( "bigquery_dataset", "provide a google cloud platform dataset id associated with the bigquery table" ) + .withRequiredArg(); + parser.accepts( "bigquery_table", "provide a google cloud platform table id associated with the bigquery table" ) + .withRequiredArg(); + parser.section( "pubsub" ); parser.accepts( "pubsub_project_id", "provide a google cloud platform project id associated with the pubsub topic" ) .withRequiredArg(); @@ -994,6 +1018,10 @@ private void setup(OptionSet options, Properties properties) { this.kafkaPartitionHash = fetchStringOption("kafka_partition_hash", options, properties, "default"); this.ddlKafkaTopic = fetchStringOption("ddl_kafka_topic", options, properties, this.kafkaTopic); + this.bigQueryProjectId = fetchStringOption("bigquery_project_id", options, properties, null); + this.bigQueryDataset = fetchStringOption("bigquery_dataset", options, properties, null); + this.bigQueryTable = fetchStringOption("bigquery_table", options, properties, null); + this.pubsubProjectId = fetchStringOption("pubsub_project_id", options, properties, null); this.pubsubTopic = fetchStringOption("pubsub_topic", options, properties, "maxwell"); this.ddlPubsubTopic = fetchStringOption("ddl_pubsub_topic", options, properties, this.pubsubTopic); diff --git a/src/main/java/com/zendesk/maxwell/MaxwellContext.java b/src/main/java/com/zendesk/maxwell/MaxwellContext.java index 484886a7d..2cbbaf217 100644 --- a/src/main/java/com/zendesk/maxwell/MaxwellContext.java +++ b/src/main/java/com/zendesk/maxwell/MaxwellContext.java @@ -550,6 +550,9 @@ public AbstractProducer getProducer() throws IOException { case "redis": this.producer = new MaxwellRedisProducer(this); break; + case "bigquery": + this.producer = new MaxwellBigQueryProducer(this, this.config.bigQueryProjectId, this.config.bigQueryDataset, this.config.bigQueryTable); + break; case "none": this.producer = new NoneProducer(this); break; diff --git a/src/main/java/com/zendesk/maxwell/producer/MaxwellBigQueryProducer.java b/src/main/java/com/zendesk/maxwell/producer/MaxwellBigQueryProducer.java new file mode 100644 index 000000000..6b996043d --- /dev/null +++ b/src/main/java/com/zendesk/maxwell/producer/MaxwellBigQueryProducer.java @@ -0,0 +1,281 @@ +package com.zendesk.maxwell.producer; + +import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutureCallback; +import com.google.api.core.ApiFutures; +import com.google.api.services.bigquery.model.JsonObject; +import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.BigQueryOptions; +import com.google.cloud.bigquery.Schema; +import com.google.cloud.bigquery.Table; +import com.google.cloud.bigquery.storage.v1.AppendRowsResponse; +import com.google.cloud.bigquery.storage.v1.Exceptions; +import com.google.cloud.bigquery.storage.v1.Exceptions.StorageException; +import com.google.cloud.bigquery.storage.v1.JsonStreamWriter; +import com.google.cloud.bigquery.storage.v1.TableName; +import com.google.cloud.bigquery.storage.v1.TableSchema; +import com.google.common.collect.ImmutableList; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.protobuf.Descriptors.DescriptorValidationException; +import com.zendesk.maxwell.MaxwellContext; +import com.zendesk.maxwell.monitoring.Metrics; +import com.zendesk.maxwell.replication.Position; +import com.zendesk.maxwell.row.RowMap; +import com.zendesk.maxwell.schema.BqToBqStorageSchemaConverter; +import com.zendesk.maxwell.util.StoppableTask; +import com.zendesk.maxwell.util.StoppableTaskState; + +import io.grpc.Status; +import io.grpc.Status.Code; +import java.io.IOException; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.Phaser; +import java.util.concurrent.TimeoutException; +import com.codahale.metrics.Counter; +import com.codahale.metrics.Meter; + +import javax.annotation.concurrent.GuardedBy; +import org.json.JSONArray; +import org.json.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class BigQueryCallback implements ApiFutureCallback { + public final Logger LOGGER = LoggerFactory.getLogger(BigQueryCallback.class); + + private final MaxwellBigQueryProducerWorker parent; + private final AbstractAsyncProducer.CallbackCompleter cc; + private final Position position; + private MaxwellContext context; + AppendContext appendContext; + + private Counter succeededMessageCount; + private Counter failedMessageCount; + private Meter succeededMessageMeter; + private Meter failedMessageMeter; + + private static final int MAX_RETRY_COUNT = 2; + private final ImmutableList RETRIABLE_ERROR_CODES = ImmutableList.of(Code.INTERNAL, Code.ABORTED, + Code.CANCELLED); + + public BigQueryCallback(MaxwellBigQueryProducerWorker parent, + AppendContext appendContext, + AbstractAsyncProducer.CallbackCompleter cc, + Position position, + Counter producedMessageCount, Counter failedMessageCount, + Meter succeededMessageMeter, Meter failedMessageMeter, + MaxwellContext context) { + this.parent = parent; + this.appendContext = appendContext; + this.cc = cc; + this.position = position; + this.succeededMessageCount = producedMessageCount; + this.failedMessageCount = failedMessageCount; + this.succeededMessageMeter = succeededMessageMeter; + this.failedMessageMeter = failedMessageMeter; + this.context = context; + } + + @Override + public void onSuccess(AppendRowsResponse response) { + this.succeededMessageCount.inc(); + this.succeededMessageMeter.mark(); + + if (LOGGER.isDebugEnabled()) { + try { + LOGGER.debug("-> {}\n" + + " {}\n", + this.appendContext.r.toJSON(), this.position); + } catch (Exception e) { + e.printStackTrace(); + } + } + cc.markCompleted(); + } + + @Override + public void onFailure(Throwable t) { + this.failedMessageCount.inc(); + this.failedMessageMeter.mark(); + + LOGGER.error(t.getClass().getSimpleName() + " @ " + position); + LOGGER.error(t.getLocalizedMessage()); + + Status status = Status.fromThrowable(t); + if (appendContext.retryCount < MAX_RETRY_COUNT + && RETRIABLE_ERROR_CODES.contains(status.getCode())) { + appendContext.retryCount++; + try { + this.parent.sendAsync(appendContext.r, this.cc); + cc.markCompleted(); + return; + } catch (Exception e) { + System.out.format("Failed to retry append: %s\n", e); + } + } + + synchronized (this.parent.getLock()) { + if (this.parent.getError() == null && !this.context.getConfig().ignoreProducerError) { + StorageException storageException = Exceptions.toStorageException(t); + this.parent.setError((storageException != null) ? storageException : new RuntimeException(t)); + } + } + cc.markCompleted(); + } +} + +public class MaxwellBigQueryProducer extends AbstractProducer { + + private final ArrayBlockingQueue queue; + private final MaxwellBigQueryProducerWorker worker; + + public MaxwellBigQueryProducer(MaxwellContext context, String bigQueryProjectId, + String bigQueryDataset, String bigQueryTable) + throws IOException { + super(context); + this.queue = new ArrayBlockingQueue<>(100); + this.worker = new MaxwellBigQueryProducerWorker(context, this.queue, bigQueryProjectId, bigQueryDataset, + bigQueryTable); + + TableName table = TableName.of(bigQueryProjectId, bigQueryDataset, bigQueryTable); + try { + this.worker.initialize(table); + } catch (DescriptorValidationException e) { + e.printStackTrace(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + Thread thread = new Thread(this.worker, "maxwell-bigquery-worker"); + thread.setDaemon(true); + thread.start(); + } + + @Override + public void push(RowMap r) throws Exception { + this.queue.put(r); + } +} + +class AppendContext { + JSONArray data; + int retryCount = 0; + RowMap r = null; + + AppendContext(JSONArray data, int retryCount, RowMap r) { + this.data = data; + this.retryCount = retryCount; + this.r = r; + } +} + +class MaxwellBigQueryProducerWorker extends AbstractAsyncProducer implements Runnable, StoppableTask { + static final Logger LOGGER = LoggerFactory.getLogger(MaxwellBigQueryProducerWorker.class); + + private final ArrayBlockingQueue queue; + private StoppableTaskState taskState; + private Thread thread; + private final Object lock = new Object(); + + @GuardedBy("lock") + private RuntimeException error = null; + private JsonStreamWriter streamWriter; + + public MaxwellBigQueryProducerWorker(MaxwellContext context, + ArrayBlockingQueue queue, String bigQueryProjectId, + String bigQueryDataset, String bigQueryTable) throws IOException { + super(context); + this.queue = queue; + Metrics metrics = context.getMetrics(); + this.taskState = new StoppableTaskState("MaxwellBigQueryProducerWorker"); + } + + public Object getLock() { + return lock; + } + + public RuntimeException getError() { + return error; + } + + public void setError(RuntimeException error) { + this.error = error; + } + + private void covertJSONObjectFieldsToString(JSONObject record) { + if (this.context.getConfig().outputConfig.includesPrimaryKeys) { + record.put("primary_key", record.get("primary_key").toString()); + } + String data = record.has("data") == true ? record.get("data").toString() : null; + record.put("data", data); + String old = record.has("old") == true ? record.get("old").toString() : null; + record.put("old", old); + } + + public void initialize(TableName tName) + throws DescriptorValidationException, IOException, InterruptedException { + + BigQuery bigquery = BigQueryOptions.newBuilder().setProjectId(tName.getProject()).build().getService(); + Table table = bigquery.getTable(tName.getDataset(), tName.getTable()); + Schema schema = table.getDefinition().getSchema(); + TableSchema tableSchema = BqToBqStorageSchemaConverter.convertTableSchema(schema); + streamWriter = JsonStreamWriter.newBuilder(tName.toString(), tableSchema).build(); + } + + @Override + public void requestStop() throws Exception { + taskState.requestStop(); + streamWriter.close(); + synchronized (this.lock) { + if (this.error != null) { + throw this.error; + } + } + } + + @Override + public void awaitStop(Long timeout) throws TimeoutException { + taskState.awaitStop(thread, timeout); + } + + @Override + public void run() { + this.thread = Thread.currentThread(); + while (true) { + try { + RowMap row = queue.take(); + if (!taskState.isRunning()) { + taskState.stopped(); + return; + } + this.push(row); + } catch (Exception e) { + taskState.stopped(); + context.terminate(e); + return; + } + } + } + + @Override + public void sendAsync(RowMap r, CallbackCompleter cc) throws Exception { + synchronized (this.lock) { + if (this.error != null) { + throw this.error; + } + } + JSONArray jsonArr = new JSONArray(); + JSONObject record = new JSONObject(r.toJSON(outputConfig)); + //convert json and array fields to String + covertJSONObjectFieldsToString(record); + jsonArr.put(record); + AppendContext appendContext = new AppendContext(jsonArr, 0, r); + + ApiFuture future = streamWriter.append(appendContext.data); + ApiFutures.addCallback( + future, new BigQueryCallback(this, appendContext, cc, r.getNextPosition(), + this.succeededMessageCount, this.failedMessageCount, this.succeededMessageMeter, this.failedMessageMeter, + this.context), + MoreExecutors.directExecutor()); + } +} \ No newline at end of file diff --git a/src/main/java/com/zendesk/maxwell/schema/BqToBqStorageSchemaConverter.java b/src/main/java/com/zendesk/maxwell/schema/BqToBqStorageSchemaConverter.java new file mode 100644 index 000000000..de95f565d --- /dev/null +++ b/src/main/java/com/zendesk/maxwell/schema/BqToBqStorageSchemaConverter.java @@ -0,0 +1,88 @@ +package com.zendesk.maxwell.schema; + +/* + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import com.google.cloud.bigquery.Field; +import com.google.cloud.bigquery.Schema; +import com.google.cloud.bigquery.StandardSQLTypeName; +import com.google.cloud.bigquery.storage.v1.TableFieldSchema; +import com.google.cloud.bigquery.storage.v1.TableSchema; +import com.google.common.collect.ImmutableMap; + +/** Converts structure from BigQuery client to BigQueryStorage client */ +public class BqToBqStorageSchemaConverter { + private static ImmutableMap BQTableSchemaModeMap = + ImmutableMap.of( + Field.Mode.NULLABLE, TableFieldSchema.Mode.NULLABLE, + Field.Mode.REPEATED, TableFieldSchema.Mode.REPEATED, + Field.Mode.REQUIRED, TableFieldSchema.Mode.REQUIRED); + + private static ImmutableMap BQTableSchemaTypeMap = + new ImmutableMap.Builder() + .put(StandardSQLTypeName.BOOL, TableFieldSchema.Type.BOOL) + .put(StandardSQLTypeName.BYTES, TableFieldSchema.Type.BYTES) + .put(StandardSQLTypeName.DATE, TableFieldSchema.Type.DATE) + .put(StandardSQLTypeName.DATETIME, TableFieldSchema.Type.DATETIME) + .put(StandardSQLTypeName.FLOAT64, TableFieldSchema.Type.DOUBLE) + .put(StandardSQLTypeName.GEOGRAPHY, TableFieldSchema.Type.GEOGRAPHY) + .put(StandardSQLTypeName.INT64, TableFieldSchema.Type.INT64) + .put(StandardSQLTypeName.NUMERIC, TableFieldSchema.Type.NUMERIC) + .put(StandardSQLTypeName.STRING, TableFieldSchema.Type.STRING) + .put(StandardSQLTypeName.STRUCT, TableFieldSchema.Type.STRUCT) + .put(StandardSQLTypeName.TIME, TableFieldSchema.Type.TIME) + .put(StandardSQLTypeName.TIMESTAMP, TableFieldSchema.Type.TIMESTAMP) + .build(); + + /** + * Converts from BigQuery client Table Schema to bigquery storage API Table Schema. + * + * @param schema the BigQuery client Table Schema + * @return the bigquery storage API Table Schema + */ + public static TableSchema convertTableSchema(Schema schema) { + TableSchema.Builder result = TableSchema.newBuilder(); + for (int i = 0; i < schema.getFields().size(); i++) { + result.addFields(i, convertFieldSchema(schema.getFields().get(i))); + } + return result.build(); + } + + /** + * Converts from bigquery v2 Field Schema to bigquery storage API Field Schema. + * + * @param field the BigQuery client Field Schema + * @return the bigquery storage API Field Schema + */ + public static TableFieldSchema convertFieldSchema(Field field) { + TableFieldSchema.Builder result = TableFieldSchema.newBuilder(); + if (field.getMode() == null) { + field = field.toBuilder().setMode(Field.Mode.NULLABLE).build(); + } + result.setMode(BQTableSchemaModeMap.get(field.getMode())); + result.setName(field.getName()); + result.setType(BQTableSchemaTypeMap.get(field.getType().getStandardType())); + if (field.getDescription() != null) { + result.setDescription(field.getDescription()); + } + if (field.getSubFields() != null) { + for (int i = 0; i < field.getSubFields().size(); i++) { + result.addFields(i, convertFieldSchema(field.getSubFields().get(i))); + } + } + return result.build(); + } +} \ No newline at end of file diff --git a/src/test/java/com/zendesk/maxwell/producer/BigQueryCallbackTest.java b/src/test/java/com/zendesk/maxwell/producer/BigQueryCallbackTest.java new file mode 100644 index 000000000..ca8aa4475 --- /dev/null +++ b/src/test/java/com/zendesk/maxwell/producer/BigQueryCallbackTest.java @@ -0,0 +1,79 @@ +package com.zendesk.maxwell.producer; + +import com.codahale.metrics.Counter; +import com.codahale.metrics.Meter; +import com.codahale.metrics.Timer; +import com.zendesk.maxwell.MaxwellConfig; +import com.zendesk.maxwell.MaxwellContext; +import com.zendesk.maxwell.replication.BinlogPosition; +import com.zendesk.maxwell.replication.Position; +import com.zendesk.maxwell.row.RowIdentity; +import com.zendesk.maxwell.row.RowMap; + +import io.grpc.Status; +import io.grpc.StatusRuntimeException; + +import org.apache.commons.lang3.tuple.Pair; +import org.json.JSONArray; +import org.json.JSONException; +import org.json.JSONObject; +import org.junit.Test; +import com.zendesk.maxwell.monitoring.NoOpMetrics; + +import static org.mockito.Mockito.*; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.concurrent.ArrayBlockingQueue; + +public class BigQueryCallbackTest { + + @Test + public void shouldIgnoreProducerErrorByDefault() throws JSONException, Exception { + MaxwellContext context = mock(MaxwellContext.class); + MaxwellConfig config = new MaxwellConfig(); + when(context.getConfig()).thenReturn(config); + when(context.getMetrics()).thenReturn(new NoOpMetrics()); + MaxwellOutputConfig outputConfig = new MaxwellOutputConfig(); + outputConfig.includesServerId = true; + RowMap r = new RowMap("insert", "MyDatabase", "MyTable", 1234567890L, new ArrayList(), null); + JSONArray jsonArr = new JSONArray(); + JSONObject record = new JSONObject(r.toJSON(outputConfig)); + jsonArr.put(record); + AbstractAsyncProducer.CallbackCompleter cc = mock(AbstractAsyncProducer.CallbackCompleter.class); + AppendContext appendContext = new AppendContext(jsonArr, 0, r); + ArrayBlockingQueue queue = new ArrayBlockingQueue(100); + MaxwellBigQueryProducerWorker producerWorker = new MaxwellBigQueryProducerWorker(context, queue,"myproject", "mydataset", "mytable"); + BigQueryCallback callback = new BigQueryCallback(producerWorker, appendContext, cc, + new Position(new BinlogPosition(1, "binlog-1"), 0L), + new Counter(), new Counter(), new Meter(), new Meter(), context); + Throwable t = new Throwable("error"); + callback.onFailure(t); + verify(cc).markCompleted(); + } + + @Test + public void shouldTerminateWhenNotIgnoreProducerError() throws JSONException, Exception { + MaxwellContext context = mock(MaxwellContext.class); + MaxwellConfig config = new MaxwellConfig(); + config.ignoreProducerError = false; + when(context.getConfig()).thenReturn(config); + when(context.getMetrics()).thenReturn(new NoOpMetrics()); + MaxwellOutputConfig outputConfig = new MaxwellOutputConfig(); + outputConfig.includesServerId = true; + RowMap r = new RowMap("insert", "MyDatabase", "MyTable", 1234567890L, new ArrayList(), null); + JSONArray jsonArr = new JSONArray(); + JSONObject record = new JSONObject(r.toJSON(outputConfig)); + jsonArr.put(record); + AbstractAsyncProducer.CallbackCompleter cc = mock(AbstractAsyncProducer.CallbackCompleter.class); + AppendContext appendContext = new AppendContext(jsonArr, 0, r); + ArrayBlockingQueue queue = new ArrayBlockingQueue(100); + MaxwellBigQueryProducerWorker producerWorker = new MaxwellBigQueryProducerWorker(context, queue,"myproject", "mydataset", "mytable"); + BigQueryCallback callback = new BigQueryCallback(producerWorker, appendContext, cc, + new Position(new BinlogPosition(1, "binlog-1"), 0L), + new Counter(), new Counter(), new Meter(), new Meter(), context); + Throwable t = new StatusRuntimeException(Status.DEADLINE_EXCEEDED); + callback.onFailure(t); + verify(cc).markCompleted(); + } +} \ No newline at end of file From 1271478a2068d4f89b0672feddf5f403417f9cd4 Mon Sep 17 00:00:00 2001 From: salihoto Date: Sat, 2 Jul 2022 23:54:21 +0300 Subject: [PATCH 06/11] modify example --- config.properties.example | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/config.properties.example b/config.properties.example index 924c41718..c9286251b 100644 --- a/config.properties.example +++ b/config.properties.example @@ -220,9 +220,9 @@ kafka.acks=1 # *** bigquery *** -#bigquery_project_id=maxwell -#bigquery_dataset=maxwell -#bigquery_table=maxwell_ddl +#bigquery_project_id=myproject +#bigquery_dataset=mydataset +#bigquery_table=mytable # *** rabbit-mq *** From 808fba80779b21e0cbf6337dda66baaa5ba4c6a9 Mon Sep 17 00:00:00 2001 From: salihoto Date: Wed, 6 Jul 2022 18:15:15 +0300 Subject: [PATCH 07/11] fix error handling --- .../com/zendesk/maxwell/producer/MaxwellBigQueryProducer.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/zendesk/maxwell/producer/MaxwellBigQueryProducer.java b/src/main/java/com/zendesk/maxwell/producer/MaxwellBigQueryProducer.java index 6b996043d..6629ddbe9 100644 --- a/src/main/java/com/zendesk/maxwell/producer/MaxwellBigQueryProducer.java +++ b/src/main/java/com/zendesk/maxwell/producer/MaxwellBigQueryProducer.java @@ -107,7 +107,6 @@ public void onFailure(Throwable t) { appendContext.retryCount++; try { this.parent.sendAsync(appendContext.r, this.cc); - cc.markCompleted(); return; } catch (Exception e) { System.out.format("Failed to retry append: %s\n", e); @@ -118,6 +117,8 @@ public void onFailure(Throwable t) { if (this.parent.getError() == null && !this.context.getConfig().ignoreProducerError) { StorageException storageException = Exceptions.toStorageException(t); this.parent.setError((storageException != null) ? storageException : new RuntimeException(t)); + context.terminate(); + return; } } cc.markCompleted(); From 5620ac4d12de5205cfd1969ddf3df03e6959ce8d Mon Sep 17 00:00:00 2001 From: salihoto Date: Tue, 12 Jul 2022 08:58:10 +0300 Subject: [PATCH 08/11] kafka version --- Dockerfile | 2 +- bin/maxwell | 2 +- pom.xml | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/Dockerfile b/Dockerfile index b0010ce3a..50c33352c 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,5 +1,5 @@ FROM maven:3.6-jdk-11 as builder -ENV MAXWELL_VERSION=1.37.7 KAFKA_VERSION=1.0.0 +ENV MAXWELL_VERSION=1.37.3 KAFKA_VERSION=2.7.0 RUN apt-get update \ && apt-get -y upgrade \ diff --git a/bin/maxwell b/bin/maxwell index 4dd47e9a6..b1987b897 100755 --- a/bin/maxwell +++ b/bin/maxwell @@ -18,7 +18,7 @@ fi CLASSPATH="$CLASSPATH:$lib_dir/*" -KAFKA_VERSION="1.0.0" +KAFKA_VERSION="2.7.0" function use_kafka() { wanted="$1" diff --git a/pom.xml b/pom.xml index 095ed6a02..3094efa3f 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ com.zendesk maxwell - 1.37.7 + 1.37.3 jar maxwell From 9ed089b2342f09e2b6c41b7cb9e3d57a1e8ffdc4 Mon Sep 17 00:00:00 2001 From: salihoto Date: Sat, 23 Jul 2022 23:36:34 +0300 Subject: [PATCH 09/11] ref --- .../producer/MaxwellBigQueryProducer.java | 59 ++----------------- 1 file changed, 5 insertions(+), 54 deletions(-) diff --git a/src/main/java/com/zendesk/maxwell/producer/MaxwellBigQueryProducer.java b/src/main/java/com/zendesk/maxwell/producer/MaxwellBigQueryProducer.java index 6629ddbe9..0e6314f10 100644 --- a/src/main/java/com/zendesk/maxwell/producer/MaxwellBigQueryProducer.java +++ b/src/main/java/com/zendesk/maxwell/producer/MaxwellBigQueryProducer.java @@ -3,18 +3,14 @@ import com.google.api.core.ApiFuture; import com.google.api.core.ApiFutureCallback; import com.google.api.core.ApiFutures; -import com.google.api.services.bigquery.model.JsonObject; import com.google.cloud.bigquery.BigQuery; import com.google.cloud.bigquery.BigQueryOptions; import com.google.cloud.bigquery.Schema; import com.google.cloud.bigquery.Table; import com.google.cloud.bigquery.storage.v1.AppendRowsResponse; -import com.google.cloud.bigquery.storage.v1.Exceptions; -import com.google.cloud.bigquery.storage.v1.Exceptions.StorageException; import com.google.cloud.bigquery.storage.v1.JsonStreamWriter; import com.google.cloud.bigquery.storage.v1.TableName; import com.google.cloud.bigquery.storage.v1.TableSchema; -import com.google.common.collect.ImmutableList; import com.google.common.util.concurrent.MoreExecutors; import com.google.protobuf.Descriptors.DescriptorValidationException; import com.zendesk.maxwell.MaxwellContext; @@ -25,11 +21,8 @@ import com.zendesk.maxwell.util.StoppableTask; import com.zendesk.maxwell.util.StoppableTaskState; -import io.grpc.Status; -import io.grpc.Status.Code; import java.io.IOException; import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.Phaser; import java.util.concurrent.TimeoutException; import com.codahale.metrics.Counter; import com.codahale.metrics.Meter; @@ -54,9 +47,6 @@ class BigQueryCallback implements ApiFutureCallback { private Meter succeededMessageMeter; private Meter failedMessageMeter; - private static final int MAX_RETRY_COUNT = 2; - private final ImmutableList RETRIABLE_ERROR_CODES = ImmutableList.of(Code.INTERNAL, Code.ABORTED, - Code.CANCELLED); public BigQueryCallback(MaxwellBigQueryProducerWorker parent, AppendContext appendContext, @@ -101,26 +91,13 @@ public void onFailure(Throwable t) { LOGGER.error(t.getClass().getSimpleName() + " @ " + position); LOGGER.error(t.getLocalizedMessage()); - Status status = Status.fromThrowable(t); - if (appendContext.retryCount < MAX_RETRY_COUNT - && RETRIABLE_ERROR_CODES.contains(status.getCode())) { - appendContext.retryCount++; - try { - this.parent.sendAsync(appendContext.r, this.cc); - return; - } catch (Exception e) { - System.out.format("Failed to retry append: %s\n", e); - } - } + LOGGER.error("bq insertion error ->" + appendContext.data.toString()); - synchronized (this.parent.getLock()) { - if (this.parent.getError() == null && !this.context.getConfig().ignoreProducerError) { - StorageException storageException = Exceptions.toStorageException(t); - this.parent.setError((storageException != null) ? storageException : new RuntimeException(t)); - context.terminate(); - return; - } + if (!this.context.getConfig().ignoreProducerError) { + this.context.terminate(new RuntimeException(t)); + return; } + cc.markCompleted(); } } @@ -176,10 +153,6 @@ class MaxwellBigQueryProducerWorker extends AbstractAsyncProducer implements Run private final ArrayBlockingQueue queue; private StoppableTaskState taskState; private Thread thread; - private final Object lock = new Object(); - - @GuardedBy("lock") - private RuntimeException error = null; private JsonStreamWriter streamWriter; public MaxwellBigQueryProducerWorker(MaxwellContext context, @@ -191,18 +164,6 @@ public MaxwellBigQueryProducerWorker(MaxwellContext context, this.taskState = new StoppableTaskState("MaxwellBigQueryProducerWorker"); } - public Object getLock() { - return lock; - } - - public RuntimeException getError() { - return error; - } - - public void setError(RuntimeException error) { - this.error = error; - } - private void covertJSONObjectFieldsToString(JSONObject record) { if (this.context.getConfig().outputConfig.includesPrimaryKeys) { record.put("primary_key", record.get("primary_key").toString()); @@ -227,11 +188,6 @@ public void initialize(TableName tName) public void requestStop() throws Exception { taskState.requestStop(); streamWriter.close(); - synchronized (this.lock) { - if (this.error != null) { - throw this.error; - } - } } @Override @@ -260,11 +216,6 @@ public void run() { @Override public void sendAsync(RowMap r, CallbackCompleter cc) throws Exception { - synchronized (this.lock) { - if (this.error != null) { - throw this.error; - } - } JSONArray jsonArr = new JSONArray(); JSONObject record = new JSONObject(r.toJSON(outputConfig)); //convert json and array fields to String From 0e5503ac9800819250d3d446d74358235cb6511d Mon Sep 17 00:00:00 2001 From: salihoto Date: Sun, 24 Jul 2022 01:03:41 +0300 Subject: [PATCH 10/11] . --- .../java/com/zendesk/maxwell/MaxwellConfig.java | 16 ---------------- .../schema/BqToBqStorageSchemaConverter.java | 4 ---- 2 files changed, 20 deletions(-) diff --git a/src/main/java/com/zendesk/maxwell/MaxwellConfig.java b/src/main/java/com/zendesk/maxwell/MaxwellConfig.java index 00a145d14..189ce6151 100644 --- a/src/main/java/com/zendesk/maxwell/MaxwellConfig.java +++ b/src/main/java/com/zendesk/maxwell/MaxwellConfig.java @@ -268,22 +268,6 @@ public class MaxwellConfig extends AbstractConfig { */ public Duration pubsubTotalTimeout; - /** - * {@link com.zendesk.maxwell.producer.MaxwellBigQueryProducer} project id - */ - public String bigQueryProjectId; - - /** - * {@link com.zendesk.maxwell.producer.MaxwellBigQueryProducer} dataset - */ - public String bigQueryDataset; - - /** - * {@link com.zendesk.maxwell.producer.MaxwellBigQueryProducer} table - */ - public String bigQueryTable; - - /** * Used in all producers deriving from {@link com.zendesk.maxwell.producer.AbstractAsyncProducer}.
* In milliseconds, time a message can spend in the {@link com.zendesk.maxwell.producer.InflightMessageList} diff --git a/src/main/java/com/zendesk/maxwell/schema/BqToBqStorageSchemaConverter.java b/src/main/java/com/zendesk/maxwell/schema/BqToBqStorageSchemaConverter.java index 8525969b3..cfda9609e 100644 --- a/src/main/java/com/zendesk/maxwell/schema/BqToBqStorageSchemaConverter.java +++ b/src/main/java/com/zendesk/maxwell/schema/BqToBqStorageSchemaConverter.java @@ -85,8 +85,4 @@ public static TableFieldSchema convertFieldSchema(Field field) { } return result.build(); } -<<<<<<< HEAD } -======= -} ->>>>>>> salihoto-maxwell/bigquery-producer-v2 From 1cb1be795ef8187570498e02556826f8e5d0e73c Mon Sep 17 00:00:00 2001 From: salihoto Date: Sat, 30 Jul 2022 13:44:58 +0300 Subject: [PATCH 11/11] skip rows bigger 10mb --- .../zendesk/maxwell/producer/MaxwellBigQueryProducer.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/main/java/com/zendesk/maxwell/producer/MaxwellBigQueryProducer.java b/src/main/java/com/zendesk/maxwell/producer/MaxwellBigQueryProducer.java index da8289f84..dbd1f33f4 100644 --- a/src/main/java/com/zendesk/maxwell/producer/MaxwellBigQueryProducer.java +++ b/src/main/java/com/zendesk/maxwell/producer/MaxwellBigQueryProducer.java @@ -94,6 +94,11 @@ public void onFailure(Throwable t) { LOGGER.error("bq insertion error ->" + appendContext.data.toString()); if (!this.context.getConfig().ignoreProducerError) { + if(t.getLocalizedMessage().contains("MessageSize is too large")){ + LOGGER.warn("skipping row exceeding 10 MB" + appendContext.data.toString()); + cc.markCompleted(); + return; + } this.context.terminate(new RuntimeException(t)); return; }