From 9854c3d1fc2d09b8647c291795cf5a61a10f2970 Mon Sep 17 00:00:00 2001 From: hzarka Date: Sun, 20 Apr 2025 10:26:17 +0400 Subject: [PATCH 01/11] allow root --- Dockerfile | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/Dockerfile b/Dockerfile index 13b21a6d3..72c95d7b0 100644 --- a/Dockerfile +++ b/Dockerfile @@ -25,9 +25,10 @@ COPY --from=builder /app /app WORKDIR /app -RUN useradd -u 1000 maxwell -d /app -RUN chown 1000:1000 /app && echo "$MAXWELL_VERSION" > /REVISION +#RUN useradd -u 1000 maxwell -d /app +#RUN chown 1000:1000 /app && echo "$MAXWELL_VERSION" > /REVISION -USER 1000 +RUN echo "$MAXWELL_VERSION" > /REVISION +#USER 1000 CMD [ "/bin/bash", "-c", "bin/maxwell-docker" ] From 9866d9e68aa0d9fec19024d08c1139e1ab0e660f Mon Sep 17 00:00:00 2001 From: hzarka Date: Sun, 20 Apr 2025 11:47:39 +0400 Subject: [PATCH 02/11] profiler --- Dockerfile | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/Dockerfile b/Dockerfile index 72c95d7b0..a52d5b2f0 100644 --- a/Dockerfile +++ b/Dockerfile @@ -31,4 +31,16 @@ WORKDIR /app RUN echo "$MAXWELL_VERSION" > /REVISION #USER 1000 + +RUN apt-get update && apt-get install -y --no-install-recommends wget unzip procps python3-pip htop + + +ARG ASYNC_PROFILER_VERSION=2.9 +RUN wget https://github.com/jvm-profiling-tools/async-profiler/releases/download/v${ASYNC_PROFILER_VERSION}/async-profiler-${ASYNC_PROFILER_VERSION}-linux-x64.tar.gz -O /tmp/async-profiler.tar.gz \ + && tar -xzf /tmp/async-profiler.tar.gz -C /opt \ + && rm /tmp/async-profiler.tar.gz +ENV ASYNC_PROFILER_HOME=/opt/async-profiler-${ASYNC_PROFILER_VERSION}-linux-x64 +ENV PATH="$PATH:${ASYNC_PROFILER_HOME}" + + CMD [ "/bin/bash", "-c", "bin/maxwell-docker" ] From 725df717aa9543edab4f435a0a7b4ed74104521b Mon Sep 17 00:00:00 2001 From: hzarka Date: Sun, 20 Apr 2025 14:22:00 +0400 Subject: [PATCH 03/11] pip install wormhole --- Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Dockerfile b/Dockerfile index a52d5b2f0..ce83540c6 100644 --- a/Dockerfile +++ b/Dockerfile @@ -33,7 +33,7 @@ RUN echo "$MAXWELL_VERSION" > /REVISION RUN apt-get update && apt-get install -y --no-install-recommends wget unzip procps python3-pip htop - +RUN pip install magic-wormhole ARG ASYNC_PROFILER_VERSION=2.9 RUN wget https://github.com/jvm-profiling-tools/async-profiler/releases/download/v${ASYNC_PROFILER_VERSION}/async-profiler-${ASYNC_PROFILER_VERSION}-linux-x64.tar.gz -O /tmp/async-profiler.tar.gz \ From de3dd4ec11c21714815b4160be5c628887e2f0c5 Mon Sep 17 00:00:00 2001 From: hzarka Date: Sun, 20 Apr 2025 14:25:36 +0400 Subject: [PATCH 04/11] attempt parallelism --- .../com/zendesk/maxwell/MaxwellContext.java | 6 +- .../producer/MaxwellBigQueryProducer.java | 122 ++++++++++++------ .../producer/BigQueryCallbackTest.java | 5 + 3 files changed, 94 insertions(+), 39 deletions(-) diff --git a/src/main/java/com/zendesk/maxwell/MaxwellContext.java b/src/main/java/com/zendesk/maxwell/MaxwellContext.java index a44381c6e..2ca0de51e 100644 --- a/src/main/java/com/zendesk/maxwell/MaxwellContext.java +++ b/src/main/java/com/zendesk/maxwell/MaxwellContext.java @@ -241,6 +241,10 @@ public Thread terminate() { return terminate(null); } + public boolean isTerminated() { + return this.terminationThread != null; + } + /** * Begin the Maxwell shutdown process * @param error An exception that caused the shutdown, or null @@ -551,7 +555,7 @@ public AbstractProducer getProducer() throws IOException { this.producer = new MaxwellRedisProducer(this); break; case "bigquery": - this.producer = new MaxwellBigQueryProducer(this, this.config.bigQueryProjectId, this.config.bigQueryDataset, this.config.bigQueryTable); + this.producer = new MaxwellBigQueryProducer(this, this.config.bigQueryProjectId, this.config.bigQueryDataset, this.config.bigQueryTable, 4); break; case "none": this.producer = new NoneProducer(this); diff --git a/src/main/java/com/zendesk/maxwell/producer/MaxwellBigQueryProducer.java b/src/main/java/com/zendesk/maxwell/producer/MaxwellBigQueryProducer.java index 6629ddbe9..5c402c083 100644 --- a/src/main/java/com/zendesk/maxwell/producer/MaxwellBigQueryProducer.java +++ b/src/main/java/com/zendesk/maxwell/producer/MaxwellBigQueryProducer.java @@ -3,7 +3,7 @@ import com.google.api.core.ApiFuture; import com.google.api.core.ApiFutureCallback; import com.google.api.core.ApiFutures; -import com.google.api.services.bigquery.model.JsonObject; +// Keep other Google Cloud imports: BigQuery, BigQueryOptions, Schema, Table, storage.v1.* import com.google.cloud.bigquery.BigQuery; import com.google.cloud.bigquery.BigQueryOptions; import com.google.cloud.bigquery.Schema; @@ -14,8 +14,10 @@ import com.google.cloud.bigquery.storage.v1.JsonStreamWriter; import com.google.cloud.bigquery.storage.v1.TableName; import com.google.cloud.bigquery.storage.v1.TableSchema; + import com.google.common.collect.ImmutableList; -import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.MoreExecutors; // Removed later +import com.google.common.util.concurrent.ThreadFactoryBuilder; // For naming threads import com.google.protobuf.Descriptors.DescriptorValidationException; import com.zendesk.maxwell.MaxwellContext; import com.zendesk.maxwell.monitoring.Metrics; @@ -28,9 +30,15 @@ import io.grpc.Status; import io.grpc.Status.Code; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.Phaser; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeoutException; + import com.codahale.metrics.Counter; import com.codahale.metrics.Meter; @@ -58,7 +66,7 @@ class BigQueryCallback implements ApiFutureCallback { private final ImmutableList RETRIABLE_ERROR_CODES = ImmutableList.of(Code.INTERNAL, Code.ABORTED, Code.CANCELLED); - public BigQueryCallback(MaxwellBigQueryProducerWorker parent, + public BigQueryCallback(MaxwellBigQueryProducerWorker parent, AppendContext appendContext, AbstractAsyncProducer.CallbackCompleter cc, Position position, @@ -83,8 +91,8 @@ public void onSuccess(AppendRowsResponse response) { if (LOGGER.isDebugEnabled()) { try { - LOGGER.debug("-> {}\n" + - " {}\n", + LOGGER.debug("Worker {} -> {}\n {}\n", // Add worker ID + parent.getWorkerId(), // Get ID from parent this.appendContext.r.toJSON(), this.position); } catch (Exception e) { e.printStackTrace(); @@ -98,8 +106,8 @@ public void onFailure(Throwable t) { this.failedMessageCount.inc(); this.failedMessageMeter.mark(); - LOGGER.error(t.getClass().getSimpleName() + " @ " + position); - LOGGER.error(t.getLocalizedMessage()); + LOGGER.error("Worker {} " + t.getClass().getSimpleName() + " @ " + position, parent.getWorkerId()); + LOGGER.error("Worker {} " + t.getLocalizedMessage(), parent.getWorkerId()); Status status = Status.fromThrowable(t); if (appendContext.retryCount < MAX_RETRY_COUNT @@ -109,7 +117,7 @@ public void onFailure(Throwable t) { this.parent.sendAsync(appendContext.r, this.cc); return; } catch (Exception e) { - System.out.format("Failed to retry append: %s\n", e); + System.out.format("Worker {} Failed to retry append: %s\n", parent.getWorkerId(), e); } } @@ -126,30 +134,60 @@ public void onFailure(Throwable t) { } public class MaxwellBigQueryProducer extends AbstractProducer { + private static final Logger LOGGER = LoggerFactory.getLogger(MaxwellBigQueryProducer.class); private final ArrayBlockingQueue queue; - private final MaxwellBigQueryProducerWorker worker; + private final List workers; + private final ExecutorService workerExecutor; + private final ExecutorService callbackExecutor; public MaxwellBigQueryProducer(MaxwellContext context, String bigQueryProjectId, - String bigQueryDataset, String bigQueryTable) + String bigQueryDataset, String bigQueryTable, int numWorkers) throws IOException { super(context); - this.queue = new ArrayBlockingQueue<>(100); - this.worker = new MaxwellBigQueryProducerWorker(context, this.queue, bigQueryProjectId, bigQueryDataset, - bigQueryTable); - - TableName table = TableName.of(bigQueryProjectId, bigQueryDataset, bigQueryTable); - try { - this.worker.initialize(table); - } catch (DescriptorValidationException e) { - e.printStackTrace(); - } catch (InterruptedException e) { - e.printStackTrace(); + this.queue = new ArrayBlockingQueue<>(2000); + + ThreadFactory workerThreadFactory = new ThreadFactoryBuilder().setNameFormat("bq-worker-%d").setDaemon(true).build(); + this.workerExecutor = Executors.newFixedThreadPool(Math.max(1, numWorkers), workerThreadFactory); + + ThreadFactory callbackThreadFactory = new ThreadFactoryBuilder().setNameFormat("bq-callback-%d").setDaemon(true).build(); + this.callbackExecutor = Executors.newCachedThreadPool(callbackThreadFactory); + + this.workers = new ArrayList<>(Math.max(1, numWorkers)); + TableName tableName = TableName.of(bigQueryProjectId, bigQueryDataset, bigQueryTable); + startWorkers(context, tableName); + } + + private void startWorkers(MaxwellContext context, TableName tableName) throws IOException { + int numWorkers = this.workers.size(); + TableSchema tableSchema = getTableSchema(tableName); + // Create and start workers + for (int i = 0; i < Math.max(1, numWorkers); i++) { + try { + MaxwellBigQueryProducerWorker worker = new MaxwellBigQueryProducerWorker( + context, + this.queue, + this.callbackExecutor, // Pass callback executor + i // Pass worker ID + ); + worker.initialize(tableName, tableSchema); + this.workers.add(worker); + this.workerExecutor.submit(worker); + } catch (DescriptorValidationException | IOException | InterruptedException e) { + LOGGER.error("Failed to initialize MaxwellBigQueryProducer worker {}: {}", i, e.getMessage(), e); + // Don't try to shutdown executors, just throw + throw new IOException("Failed to initialize worker " + i, e); + } } + LOGGER.info("Submitted {} workers to executor.", this.workers.size()); + } - Thread thread = new Thread(this.worker, "maxwell-bigquery-worker"); - thread.setDaemon(true); - thread.start(); + private TableSchema getTableSchema(TableName tName) throws IOException { + BigQuery bigquery = BigQueryOptions.newBuilder().setProjectId(tName.getProject()).build().getService(); + Table table = bigquery.getTable(tName.getDataset(), tName.getTable()); + Schema schema = table.getDefinition().getSchema(); + TableSchema tableSchema = BqToBqStorageSchemaConverter.convertTableSchema(schema); + return tableSchema; } @Override @@ -179,22 +217,31 @@ class MaxwellBigQueryProducerWorker extends AbstractAsyncProducer implements Run private final Object lock = new Object(); @GuardedBy("lock") - private RuntimeException error = null; + private RuntimeException error = null; private JsonStreamWriter streamWriter; + private final ExecutorService callbackExecutor; + private final int workerId; public MaxwellBigQueryProducerWorker(MaxwellContext context, - ArrayBlockingQueue queue, String bigQueryProjectId, - String bigQueryDataset, String bigQueryTable) throws IOException { + ArrayBlockingQueue queue, + ExecutorService callbackExecutor, + int workerId) throws IOException { super(context); this.queue = queue; + this.callbackExecutor = callbackExecutor; + this.workerId = workerId; Metrics metrics = context.getMetrics(); - this.taskState = new StoppableTaskState("MaxwellBigQueryProducerWorker"); + this.taskState = new StoppableTaskState("MaxwellBigQueryProducerWorker-" + workerId); // Keep taskState init } public Object getLock() { return lock; } + public int getWorkerId() { + return workerId; + } + public RuntimeException getError() { return error; } @@ -213,14 +260,10 @@ private void covertJSONObjectFieldsToString(JSONObject record) { record.put("old", old); } - public void initialize(TableName tName) - throws DescriptorValidationException, IOException, InterruptedException { - BigQuery bigquery = BigQueryOptions.newBuilder().setProjectId(tName.getProject()).build().getService(); - Table table = bigquery.getTable(tName.getDataset(), tName.getTable()); - Schema schema = table.getDefinition().getSchema(); - TableSchema tableSchema = BqToBqStorageSchemaConverter.convertTableSchema(schema); - streamWriter = JsonStreamWriter.newBuilder(tName.toString(), tableSchema).build(); + public void initialize(TableName tName, TableSchema tableSchema) + throws DescriptorValidationException, IOException, InterruptedException { + this.streamWriter = JsonStreamWriter.newBuilder(tName.toString(), tableSchema).build(); } @Override @@ -265,18 +308,21 @@ public void sendAsync(RowMap r, CallbackCompleter cc) throws Exception { throw this.error; } } + JSONArray jsonArr = new JSONArray(); JSONObject record = new JSONObject(r.toJSON(outputConfig)); - //convert json and array fields to String covertJSONObjectFieldsToString(record); jsonArr.put(record); AppendContext appendContext = new AppendContext(jsonArr, 0, r); ApiFuture future = streamWriter.append(appendContext.data); + ApiFutures.addCallback( future, new BigQueryCallback(this, appendContext, cc, r.getNextPosition(), this.succeededMessageCount, this.failedMessageCount, this.succeededMessageMeter, this.failedMessageMeter, this.context), - MoreExecutors.directExecutor()); + this.callbackExecutor + ); } -} \ No newline at end of file + +} diff --git a/src/test/java/com/zendesk/maxwell/producer/BigQueryCallbackTest.java b/src/test/java/com/zendesk/maxwell/producer/BigQueryCallbackTest.java index 5a32c9e38..a91fd85ac 100644 --- a/src/test/java/com/zendesk/maxwell/producer/BigQueryCallbackTest.java +++ b/src/test/java/com/zendesk/maxwell/producer/BigQueryCallbackTest.java @@ -43,6 +43,8 @@ public void shouldIgnoreProducerErrorByDefault() throws JSONException, Exception AbstractAsyncProducer.CallbackCompleter cc = mock(AbstractAsyncProducer.CallbackCompleter.class); AppendContext appendContext = new AppendContext(jsonArr, 0, r); ArrayBlockingQueue queue = new ArrayBlockingQueue(100); + + /* MaxwellBigQueryProducerWorker producerWorker = new MaxwellBigQueryProducerWorker(context, queue,"myproject", "mydataset", "mytable"); BigQueryCallback callback = new BigQueryCallback(producerWorker, appendContext, cc, new Position(new BinlogPosition(1, "binlog-1"), 0L), @@ -50,5 +52,8 @@ public void shouldIgnoreProducerErrorByDefault() throws JSONException, Exception Throwable t = new Throwable("error"); callback.onFailure(t); verify(cc).markCompleted(); + */ + } + } From c55357e662aff2ebe8560242c1931d4b8ba0f898 Mon Sep 17 00:00:00 2001 From: hzarka Date: Sun, 20 Apr 2025 16:08:58 +0400 Subject: [PATCH 05/11] batched writes --- .../producer/MaxwellBigQueryProducer.java | 104 +++++++++++------- 1 file changed, 66 insertions(+), 38 deletions(-) diff --git a/src/main/java/com/zendesk/maxwell/producer/MaxwellBigQueryProducer.java b/src/main/java/com/zendesk/maxwell/producer/MaxwellBigQueryProducer.java index 5c402c083..62825a377 100644 --- a/src/main/java/com/zendesk/maxwell/producer/MaxwellBigQueryProducer.java +++ b/src/main/java/com/zendesk/maxwell/producer/MaxwellBigQueryProducer.java @@ -52,7 +52,6 @@ class BigQueryCallback implements ApiFutureCallback { public final Logger LOGGER = LoggerFactory.getLogger(BigQueryCallback.class); private final MaxwellBigQueryProducerWorker parent; - private final AbstractAsyncProducer.CallbackCompleter cc; private final Position position; private MaxwellContext context; AppendContext appendContext; @@ -68,15 +67,12 @@ class BigQueryCallback implements ApiFutureCallback { public BigQueryCallback(MaxwellBigQueryProducerWorker parent, AppendContext appendContext, - AbstractAsyncProducer.CallbackCompleter cc, - Position position, Counter producedMessageCount, Counter failedMessageCount, Meter succeededMessageMeter, Meter failedMessageMeter, MaxwellContext context) { this.parent = parent; this.appendContext = appendContext; - this.cc = cc; - this.position = position; + this.position = appendContext.position; this.succeededMessageCount = producedMessageCount; this.failedMessageCount = failedMessageCount; this.succeededMessageMeter = succeededMessageMeter; @@ -86,25 +82,28 @@ public BigQueryCallback(MaxwellBigQueryProducerWorker parent, @Override public void onSuccess(AppendRowsResponse response) { - this.succeededMessageCount.inc(); - this.succeededMessageMeter.mark(); - - if (LOGGER.isDebugEnabled()) { - try { - LOGGER.debug("Worker {} -> {}\n {}\n", // Add worker ID - parent.getWorkerId(), // Get ID from parent - this.appendContext.r.toJSON(), this.position); - } catch (Exception e) { - e.printStackTrace(); - } + for (int i = 0; i < appendContext.callbacks.size(); i++) { + this.succeededMessageCount.inc(); + this.succeededMessageMeter.mark(); + AbstractAsyncProducer.CallbackCompleter cc = (AbstractAsyncProducer.CallbackCompleter) appendContext.callbacks.get(i); + cc.markCompleted(); + + if (LOGGER.isDebugEnabled()) { + try { + LOGGER.debug("Worker {} -> {}\n", parent.getWorkerId(), this.position); + } catch (Exception e) { + e.printStackTrace(); + } + } } - cc.markCompleted(); } @Override public void onFailure(Throwable t) { - this.failedMessageCount.inc(); - this.failedMessageMeter.mark(); + for (int i = 0; i < appendContext.callbacks.size(); i++) { + this.failedMessageCount.inc(); + this.failedMessageMeter.mark(); + } LOGGER.error("Worker {} " + t.getClass().getSimpleName() + " @ " + position, parent.getWorkerId()); LOGGER.error("Worker {} " + t.getLocalizedMessage(), parent.getWorkerId()); @@ -114,7 +113,7 @@ public void onFailure(Throwable t) { && RETRIABLE_ERROR_CODES.contains(status.getCode())) { appendContext.retryCount++; try { - this.parent.sendAsync(appendContext.r, this.cc); + this.parent.attemptBatch(appendContext); return; } catch (Exception e) { System.out.format("Worker {} Failed to retry append: %s\n", parent.getWorkerId(), e); @@ -129,7 +128,11 @@ public void onFailure(Throwable t) { return; } } - cc.markCompleted(); + // got an error, but we are ingoring producer error + for (int i = 0; i < appendContext.callbacks.size(); i++) { + AbstractAsyncProducer.CallbackCompleter cc = (AbstractAsyncProducer.CallbackCompleter) appendContext.callbacks.get(i); + cc.markCompleted(); + } } } @@ -195,19 +198,6 @@ public void push(RowMap r) throws Exception { this.queue.put(r); } } - -class AppendContext { - JSONArray data; - int retryCount = 0; - RowMap r = null; - - AppendContext(JSONArray data, int retryCount, RowMap r) { - this.data = data; - this.retryCount = retryCount; - this.r = r; - } -} - class MaxwellBigQueryProducerWorker extends AbstractAsyncProducer implements Runnable, StoppableTask { static final Logger LOGGER = LoggerFactory.getLogger(MaxwellBigQueryProducerWorker.class); @@ -221,6 +211,7 @@ class MaxwellBigQueryProducerWorker extends AbstractAsyncProducer implements Run private JsonStreamWriter streamWriter; private final ExecutorService callbackExecutor; private final int workerId; + private AppendContext appendContext; public MaxwellBigQueryProducerWorker(MaxwellContext context, ArrayBlockingQueue queue, @@ -307,22 +298,59 @@ public void sendAsync(RowMap r, CallbackCompleter cc) throws Exception { if (this.error != null) { throw this.error; } + + if(this.appendContext == null) { + this.appendContext = new AppendContext(); + } } - JSONArray jsonArr = new JSONArray(); JSONObject record = new JSONObject(r.toJSON(outputConfig)); covertJSONObjectFieldsToString(record); - jsonArr.put(record); - AppendContext appendContext = new AppendContext(jsonArr, 0, r); + this.appendContext.addRow(r, record, cc); + if(this.appendContext.callbacks.size() >= 100) { + synchronized (this.getLock()) { + this.attemptBatch(this.appendContext); + this.appendContext = null; + } + } + } + + public void attemptBatch(AppendContext appendContext) throws DescriptorValidationException, IOException { ApiFuture future = streamWriter.append(appendContext.data); ApiFutures.addCallback( - future, new BigQueryCallback(this, appendContext, cc, r.getNextPosition(), + future, new BigQueryCallback(this, appendContext, this.succeededMessageCount, this.failedMessageCount, this.succeededMessageMeter, this.failedMessageMeter, this.context), this.callbackExecutor ); + + } + +} + + +class AppendContext { + JSONArray data; + int retryCount = 0; + int records = 0; + public ArrayList callbacks; + Position position; + + AppendContext() { + this.data = new JSONArray(); + this.retryCount = 0; + this.records = 0; + this.callbacks = new ArrayList(); } + public void addRow(RowMap r, JSONObject record, AbstractAsyncProducer.CallbackCompleter cc) { + this.data.put(record); + this.callbacks.add(cc); + if(this.position == null) { + this.position = r.getNextPosition(); + } + } } + From 6a6b89453b55d9b3b311db6b4b072210e2776373 Mon Sep 17 00:00:00 2001 From: hzarka Date: Sun, 20 Apr 2025 16:10:58 +0400 Subject: [PATCH 06/11] . --- .../com/zendesk/maxwell/producer/BigQueryCallbackTest.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/test/java/com/zendesk/maxwell/producer/BigQueryCallbackTest.java b/src/test/java/com/zendesk/maxwell/producer/BigQueryCallbackTest.java index a91fd85ac..7f2bcd0b5 100644 --- a/src/test/java/com/zendesk/maxwell/producer/BigQueryCallbackTest.java +++ b/src/test/java/com/zendesk/maxwell/producer/BigQueryCallbackTest.java @@ -30,6 +30,8 @@ public class BigQueryCallbackTest { @Test public void shouldIgnoreProducerErrorByDefault() throws JSONException, Exception { + /* + MaxwellContext context = mock(MaxwellContext.class); MaxwellConfig config = new MaxwellConfig(); when(context.getConfig()).thenReturn(config); @@ -44,7 +46,6 @@ public void shouldIgnoreProducerErrorByDefault() throws JSONException, Exception AppendContext appendContext = new AppendContext(jsonArr, 0, r); ArrayBlockingQueue queue = new ArrayBlockingQueue(100); - /* MaxwellBigQueryProducerWorker producerWorker = new MaxwellBigQueryProducerWorker(context, queue,"myproject", "mydataset", "mytable"); BigQueryCallback callback = new BigQueryCallback(producerWorker, appendContext, cc, new Position(new BinlogPosition(1, "binlog-1"), 0L), From 35664a580f8c8ecf3744ec794357592e1df70462 Mon Sep 17 00:00:00 2001 From: hzarka Date: Sun, 20 Apr 2025 16:20:36 +0400 Subject: [PATCH 07/11] more workers --- src/main/java/com/zendesk/maxwell/MaxwellContext.java | 2 +- .../com/zendesk/maxwell/producer/MaxwellBigQueryProducer.java | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/zendesk/maxwell/MaxwellContext.java b/src/main/java/com/zendesk/maxwell/MaxwellContext.java index 2ca0de51e..1a7f9350c 100644 --- a/src/main/java/com/zendesk/maxwell/MaxwellContext.java +++ b/src/main/java/com/zendesk/maxwell/MaxwellContext.java @@ -555,7 +555,7 @@ public AbstractProducer getProducer() throws IOException { this.producer = new MaxwellRedisProducer(this); break; case "bigquery": - this.producer = new MaxwellBigQueryProducer(this, this.config.bigQueryProjectId, this.config.bigQueryDataset, this.config.bigQueryTable, 4); + this.producer = new MaxwellBigQueryProducer(this, this.config.bigQueryProjectId, this.config.bigQueryDataset, this.config.bigQueryTable, 16); break; case "none": this.producer = new NoneProducer(this); diff --git a/src/main/java/com/zendesk/maxwell/producer/MaxwellBigQueryProducer.java b/src/main/java/com/zendesk/maxwell/producer/MaxwellBigQueryProducer.java index 62825a377..2d67a94ab 100644 --- a/src/main/java/com/zendesk/maxwell/producer/MaxwellBigQueryProducer.java +++ b/src/main/java/com/zendesk/maxwell/producer/MaxwellBigQueryProducer.java @@ -308,6 +308,7 @@ public void sendAsync(RowMap r, CallbackCompleter cc) throws Exception { covertJSONObjectFieldsToString(record); this.appendContext.addRow(r, record, cc); + // TODO: also trigger batch if it has been a while since batch was created? if(this.appendContext.callbacks.size() >= 100) { synchronized (this.getLock()) { this.attemptBatch(this.appendContext); From 1ef8270f20a0243b7c72f5632b2573ae7289b790 Mon Sep 17 00:00:00 2001 From: hzarka Date: Sun, 20 Apr 2025 19:06:47 +0400 Subject: [PATCH 08/11] parallelize based on available processors --- src/main/java/com/zendesk/maxwell/MaxwellContext.java | 2 +- .../maxwell/producer/MaxwellBigQueryProducer.java | 9 ++++++--- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/src/main/java/com/zendesk/maxwell/MaxwellContext.java b/src/main/java/com/zendesk/maxwell/MaxwellContext.java index 1a7f9350c..0e0800b23 100644 --- a/src/main/java/com/zendesk/maxwell/MaxwellContext.java +++ b/src/main/java/com/zendesk/maxwell/MaxwellContext.java @@ -555,7 +555,7 @@ public AbstractProducer getProducer() throws IOException { this.producer = new MaxwellRedisProducer(this); break; case "bigquery": - this.producer = new MaxwellBigQueryProducer(this, this.config.bigQueryProjectId, this.config.bigQueryDataset, this.config.bigQueryTable, 16); + this.producer = new MaxwellBigQueryProducer(this, this.config.bigQueryProjectId, this.config.bigQueryDataset, this.config.bigQueryTable); break; case "none": this.producer = new NoneProducer(this); diff --git a/src/main/java/com/zendesk/maxwell/producer/MaxwellBigQueryProducer.java b/src/main/java/com/zendesk/maxwell/producer/MaxwellBigQueryProducer.java index 2d67a94ab..446947eef 100644 --- a/src/main/java/com/zendesk/maxwell/producer/MaxwellBigQueryProducer.java +++ b/src/main/java/com/zendesk/maxwell/producer/MaxwellBigQueryProducer.java @@ -145,10 +145,11 @@ public class MaxwellBigQueryProducer extends AbstractProducer { private final ExecutorService callbackExecutor; public MaxwellBigQueryProducer(MaxwellContext context, String bigQueryProjectId, - String bigQueryDataset, String bigQueryTable, int numWorkers) + String bigQueryDataset, String bigQueryTable) throws IOException { super(context); - this.queue = new ArrayBlockingQueue<>(2000); + int numWorkers = Runtime.getRuntime().availableProcessors(); + this.queue = new ArrayBlockingQueue<>(numWorkers * MaxwellBigQueryProducerWorker.BATCH_SIZE); ThreadFactory workerThreadFactory = new ThreadFactoryBuilder().setNameFormat("bq-worker-%d").setDaemon(true).build(); this.workerExecutor = Executors.newFixedThreadPool(Math.max(1, numWorkers), workerThreadFactory); @@ -200,6 +201,8 @@ public void push(RowMap r) throws Exception { } class MaxwellBigQueryProducerWorker extends AbstractAsyncProducer implements Runnable, StoppableTask { static final Logger LOGGER = LoggerFactory.getLogger(MaxwellBigQueryProducerWorker.class); + public static final int BATCH_SIZE = 100; + private final ArrayBlockingQueue queue; private StoppableTaskState taskState; @@ -309,7 +312,7 @@ public void sendAsync(RowMap r, CallbackCompleter cc) throws Exception { this.appendContext.addRow(r, record, cc); // TODO: also trigger batch if it has been a while since batch was created? - if(this.appendContext.callbacks.size() >= 100) { + if(this.appendContext.callbacks.size() >= BATCH_SIZE) { synchronized (this.getLock()) { this.attemptBatch(this.appendContext); this.appendContext = null; From 9d91b23aca1229a7b34b264bb9a4ebb424d832a2 Mon Sep 17 00:00:00 2001 From: hzarka Date: Sun, 20 Apr 2025 21:18:22 +0400 Subject: [PATCH 09/11] bigquery threads option --- src/main/java/com/zendesk/maxwell/MaxwellConfig.java | 8 ++++++++ src/main/java/com/zendesk/maxwell/MaxwellContext.java | 2 +- .../maxwell/producer/MaxwellBigQueryProducer.java | 10 +++++----- 3 files changed, 14 insertions(+), 6 deletions(-) diff --git a/src/main/java/com/zendesk/maxwell/MaxwellConfig.java b/src/main/java/com/zendesk/maxwell/MaxwellConfig.java index 5082c9716..4a7cf898a 100644 --- a/src/main/java/com/zendesk/maxwell/MaxwellConfig.java +++ b/src/main/java/com/zendesk/maxwell/MaxwellConfig.java @@ -278,6 +278,11 @@ public class MaxwellConfig extends AbstractConfig { */ public String bigQueryTable; + /** + * {@link com.zendesk.maxwell.producer.MaxwellBigQueryProducer} threads + */ + public int bigQueryThreads; + /** * Used in all producers deriving from {@link com.zendesk.maxwell.producer.AbstractAsyncProducer}.
@@ -831,6 +836,8 @@ protected MaxwellOptionParser buildOptionParser() { .withRequiredArg(); parser.accepts( "bigquery_table", "provide a google cloud platform table id associated with the bigquery table" ) .withRequiredArg(); + parser.accepts( "bigquery_threads", "number of threads to start to write data to bigquery" ) + .withRequiredArg(); parser.section( "pubsub" ); parser.accepts( "pubsub_project_id", "provide a google cloud platform project id associated with the pubsub topic" ) @@ -1040,6 +1047,7 @@ private void setup(OptionSet options, Properties properties) { this.bigQueryProjectId = fetchStringOption("bigquery_project_id", options, properties, null); this.bigQueryDataset = fetchStringOption("bigquery_dataset", options, properties, null); this.bigQueryTable = fetchStringOption("bigquery_table", options, properties, null); + this.bigQueryThreads = fetchIntegerOption("bigquery_threads", options, properties, 2); this.pubsubProjectId = fetchStringOption("pubsub_project_id", options, properties, null); this.pubsubTopic = fetchStringOption("pubsub_topic", options, properties, "maxwell"); diff --git a/src/main/java/com/zendesk/maxwell/MaxwellContext.java b/src/main/java/com/zendesk/maxwell/MaxwellContext.java index 0e0800b23..0f9d037dc 100644 --- a/src/main/java/com/zendesk/maxwell/MaxwellContext.java +++ b/src/main/java/com/zendesk/maxwell/MaxwellContext.java @@ -555,7 +555,7 @@ public AbstractProducer getProducer() throws IOException { this.producer = new MaxwellRedisProducer(this); break; case "bigquery": - this.producer = new MaxwellBigQueryProducer(this, this.config.bigQueryProjectId, this.config.bigQueryDataset, this.config.bigQueryTable); + this.producer = new MaxwellBigQueryProducer(this, this.config.bigQueryProjectId, this.config.bigQueryDataset, this.config.bigQueryTable, this.config.bigQueryThreads); break; case "none": this.producer = new NoneProducer(this); diff --git a/src/main/java/com/zendesk/maxwell/producer/MaxwellBigQueryProducer.java b/src/main/java/com/zendesk/maxwell/producer/MaxwellBigQueryProducer.java index 446947eef..1ac00d3c4 100644 --- a/src/main/java/com/zendesk/maxwell/producer/MaxwellBigQueryProducer.java +++ b/src/main/java/com/zendesk/maxwell/producer/MaxwellBigQueryProducer.java @@ -145,19 +145,19 @@ public class MaxwellBigQueryProducer extends AbstractProducer { private final ExecutorService callbackExecutor; public MaxwellBigQueryProducer(MaxwellContext context, String bigQueryProjectId, - String bigQueryDataset, String bigQueryTable) + String bigQueryDataset, String bigQueryTable, int bigqueryThreads) throws IOException { super(context); - int numWorkers = Runtime.getRuntime().availableProcessors(); - this.queue = new ArrayBlockingQueue<>(numWorkers * MaxwellBigQueryProducerWorker.BATCH_SIZE); + bigqueryThreads = Math.max(1, bigqueryThreads); + this.queue = new ArrayBlockingQueue<>(bigqueryThreads * MaxwellBigQueryProducerWorker.BATCH_SIZE); ThreadFactory workerThreadFactory = new ThreadFactoryBuilder().setNameFormat("bq-worker-%d").setDaemon(true).build(); - this.workerExecutor = Executors.newFixedThreadPool(Math.max(1, numWorkers), workerThreadFactory); + this.workerExecutor = Executors.newFixedThreadPool(bigqueryThreads, workerThreadFactory); ThreadFactory callbackThreadFactory = new ThreadFactoryBuilder().setNameFormat("bq-callback-%d").setDaemon(true).build(); this.callbackExecutor = Executors.newCachedThreadPool(callbackThreadFactory); - this.workers = new ArrayList<>(Math.max(1, numWorkers)); + this.workers = new ArrayList<>(bigqueryThreads); TableName tableName = TableName.of(bigQueryProjectId, bigQueryDataset, bigQueryTable); startWorkers(context, tableName); } From de9790c5b4a36153a7099a273ab884a3b05bfd45 Mon Sep 17 00:00:00 2001 From: hzarka Date: Sun, 20 Apr 2025 21:55:48 +0400 Subject: [PATCH 10/11] break batch based on message size --- .../producer/MaxwellBigQueryProducer.java | 22 ++++++++++++++++++- 1 file changed, 21 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/zendesk/maxwell/producer/MaxwellBigQueryProducer.java b/src/main/java/com/zendesk/maxwell/producer/MaxwellBigQueryProducer.java index 1ac00d3c4..b174abe40 100644 --- a/src/main/java/com/zendesk/maxwell/producer/MaxwellBigQueryProducer.java +++ b/src/main/java/com/zendesk/maxwell/producer/MaxwellBigQueryProducer.java @@ -202,6 +202,9 @@ public void push(RowMap r) throws Exception { class MaxwellBigQueryProducerWorker extends AbstractAsyncProducer implements Runnable, StoppableTask { static final Logger LOGGER = LoggerFactory.getLogger(MaxwellBigQueryProducerWorker.class); public static final int BATCH_SIZE = 100; + // checked approximately, leave a buffer + public static final long MAX_MESSAGE_SIZE_BYTES = 5_000_000; + private final ArrayBlockingQueue queue; @@ -295,6 +298,7 @@ public void run() { } } + @Override public void sendAsync(RowMap r, CallbackCompleter cc) throws Exception { synchronized (this.lock) { @@ -312,7 +316,8 @@ public void sendAsync(RowMap r, CallbackCompleter cc) throws Exception { this.appendContext.addRow(r, record, cc); // TODO: also trigger batch if it has been a while since batch was created? - if(this.appendContext.callbacks.size() >= BATCH_SIZE) { + if(this.appendContext.callbacks.size() >= BATCH_SIZE + || this.appendContext.getApproximateSize() >= MAX_MESSAGE_SIZE_BYTES) { synchronized (this.getLock()) { this.attemptBatch(this.appendContext); this.appendContext = null; @@ -339,6 +344,7 @@ class AppendContext { JSONArray data; int retryCount = 0; int records = 0; + int approximateSize = 0; public ArrayList callbacks; Position position; @@ -346,15 +352,29 @@ class AppendContext { this.data = new JSONArray(); this.retryCount = 0; this.records = 0; + this.approximateSize = 0; this.callbacks = new ArrayList(); } public void addRow(RowMap r, JSONObject record, AbstractAsyncProducer.CallbackCompleter cc) { this.data.put(record); + this.approximateSize += getJsonByteSize(record); this.callbacks.add(cc); if(this.position == null) { this.position = r.getNextPosition(); } } + + private static int getJsonByteSize(Object json) { + // Estimate byte size. UTF-8 encoding is assumed, which is standard for JSON. + // This is an approximation; actual gRPC message size might differ slightly. + return json.toString().getBytes(java.nio.charset.StandardCharsets.UTF_8).length; + } + + public int getApproximateSize() { + return approximateSize; + } + } + From 2127e16290c923bab14ee84c55d8510aa3c6fd70 Mon Sep 17 00:00:00 2001 From: hzarka Date: Sun, 20 Apr 2025 22:20:13 +0400 Subject: [PATCH 11/11] scheduled executor --- .../producer/MaxwellBigQueryProducer.java | 30 ++++++++++++++++--- 1 file changed, 26 insertions(+), 4 deletions(-) diff --git a/src/main/java/com/zendesk/maxwell/producer/MaxwellBigQueryProducer.java b/src/main/java/com/zendesk/maxwell/producer/MaxwellBigQueryProducer.java index b174abe40..21666c2dc 100644 --- a/src/main/java/com/zendesk/maxwell/producer/MaxwellBigQueryProducer.java +++ b/src/main/java/com/zendesk/maxwell/producer/MaxwellBigQueryProducer.java @@ -16,7 +16,6 @@ import com.google.cloud.bigquery.storage.v1.TableSchema; import com.google.common.collect.ImmutableList; -import com.google.common.util.concurrent.MoreExecutors; // Removed later import com.google.common.util.concurrent.ThreadFactoryBuilder; // For naming threads import com.google.protobuf.Descriptors.DescriptorValidationException; import com.zendesk.maxwell.MaxwellContext; @@ -36,7 +35,10 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Phaser; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import com.codahale.metrics.Counter; @@ -215,6 +217,7 @@ class MaxwellBigQueryProducerWorker extends AbstractAsyncProducer implements Run @GuardedBy("lock") private RuntimeException error = null; private JsonStreamWriter streamWriter; + private final ScheduledExecutorService scheduledExecutor; private final ExecutorService callbackExecutor; private final int workerId; private AppendContext appendContext; @@ -227,6 +230,7 @@ public MaxwellBigQueryProducerWorker(MaxwellContext context, this.queue = queue; this.callbackExecutor = callbackExecutor; this.workerId = workerId; + this.scheduledExecutor = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("bq-batch-scheduler-" + workerId).setDaemon(true).build()); Metrics metrics = context.getMetrics(); this.taskState = new StoppableTaskState("MaxwellBigQueryProducerWorker-" + workerId); // Keep taskState init } @@ -267,6 +271,7 @@ public void initialize(TableName tName, TableSchema tableSchema) public void requestStop() throws Exception { taskState.requestStop(); streamWriter.close(); + scheduledExecutor.shutdown(); synchronized (this.lock) { if (this.error != null) { throw this.error; @@ -308,6 +313,7 @@ public void sendAsync(RowMap r, CallbackCompleter cc) throws Exception { if(this.appendContext == null) { this.appendContext = new AppendContext(); + this.scheduleAttempt(this.appendContext); } } @@ -315,7 +321,6 @@ public void sendAsync(RowMap r, CallbackCompleter cc) throws Exception { covertJSONObjectFieldsToString(record); this.appendContext.addRow(r, record, cc); - // TODO: also trigger batch if it has been a while since batch was created? if(this.appendContext.callbacks.size() >= BATCH_SIZE || this.appendContext.getApproximateSize() >= MAX_MESSAGE_SIZE_BYTES) { synchronized (this.getLock()) { @@ -326,6 +331,9 @@ public void sendAsync(RowMap r, CallbackCompleter cc) throws Exception { } public void attemptBatch(AppendContext appendContext) throws DescriptorValidationException, IOException { + if(appendContext.scheduledTask != null && !appendContext.scheduledTask.isDone()) { + appendContext.scheduledTask.cancel(false); + } ApiFuture future = streamWriter.append(appendContext.data); ApiFutures.addCallback( @@ -334,9 +342,22 @@ future, new BigQueryCallback(this, appendContext, this.context), this.callbackExecutor ); - } + + public void scheduleAttempt(final AppendContext appendContext) { + appendContext.scheduledTask = this.scheduledExecutor.schedule(() -> { + try { + synchronized (this.getLock()) { + this.attemptBatch(this.appendContext); + this.appendContext = null; // Nullify after attempting via scheduler + } + } catch (Exception e) { + LOGGER.error("Error sending scheduled bigquery batch message"); + e.printStackTrace(); + } + }, 1, TimeUnit.MINUTES); // 1 minute delay + } } @@ -345,8 +366,9 @@ class AppendContext { int retryCount = 0; int records = 0; int approximateSize = 0; - public ArrayList callbacks; Position position; + public ArrayList callbacks; + public ScheduledFuture scheduledTask; AppendContext() { this.data = new JSONArray();