diff --git a/Dockerfile b/Dockerfile index 8a5927c63..0e7cc56c6 100644 --- a/Dockerfile +++ b/Dockerfile @@ -20,7 +20,7 @@ RUN cd /workspace \ && echo "$MAXWELL_VERSION" > /REVISION # Build clean image with non-root priveledge -FROM openjdk:23-jdk-slim +FROM eclipse-temurin:23-jre-noble RUN apt-get update \ && apt-get -y upgrade diff --git a/src/main/java/com/zendesk/maxwell/producer/MaxwellBigQueryProducer.java b/src/main/java/com/zendesk/maxwell/producer/MaxwellBigQueryProducer.java index 5f3e3101f..1eae32026 100644 --- a/src/main/java/com/zendesk/maxwell/producer/MaxwellBigQueryProducer.java +++ b/src/main/java/com/zendesk/maxwell/producer/MaxwellBigQueryProducer.java @@ -40,6 +40,7 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; import com.codahale.metrics.Counter; import com.codahale.metrics.Meter; @@ -192,7 +193,12 @@ private TableSchema getTableSchema(TableName tName) throws IOException { BigQuery bigquery = BigQueryOptions.newBuilder().setProjectId(tName.getProject()).build().getService(); Table table = bigquery.getTable(tName.getDataset(), tName.getTable()); Schema schema = table.getDefinition().getSchema(); - TableSchema tableSchema = BqToBqStorageSchemaConverter.convertTableSchema(schema); + // Filter out bq_inserted_at column from the schema + List filteredFields = schema.getFields().stream() + .filter(field -> !"bq_inserted_at".equals(field.getName())) + .collect(Collectors.toList()); + Schema filteredSchema = Schema.of(filteredFields); + TableSchema tableSchema = BqToBqStorageSchemaConverter.convertTableSchema(filteredSchema); return tableSchema; } diff --git a/src/main/java/com/zendesk/maxwell/replication/BinlogConnectorReplicator.java b/src/main/java/com/zendesk/maxwell/replication/BinlogConnectorReplicator.java index fe24f17ba..9c92b7fd4 100644 --- a/src/main/java/com/zendesk/maxwell/replication/BinlogConnectorReplicator.java +++ b/src/main/java/com/zendesk/maxwell/replication/BinlogConnectorReplicator.java @@ -24,6 +24,7 @@ import com.zendesk.maxwell.schema.*; import com.zendesk.maxwell.schema.columndef.ColumnDefCastException; import com.zendesk.maxwell.schema.ddl.DDLMap; +import com.zendesk.maxwell.schema.ddl.InvalidSchemaError; import com.zendesk.maxwell.schema.ddl.ResolvedSchemaChange; import com.zendesk.maxwell.scripting.Scripting; import com.zendesk.maxwell.util.RunLoopProcess; @@ -31,6 +32,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.sql.SQLException; import java.util.List; import java.util.Objects; import java.util.Set; @@ -382,7 +384,7 @@ private RowMap processHeartbeats(RowMap row) { * @param timestamp The timestamp of the SQL binlog event */ private void processQueryEvent(String dbName, String sql, SchemaStore schemaStore, Position position, Position nextPosition, Long timestamp) throws Exception { - List changes = schemaStore.processSQL(sql, dbName, position); + List changes = processSqlWithSchemaRetry(schemaStore, sql, dbName, position); Long schemaId = getSchemaId(); if ( bootstrapper != null) @@ -402,6 +404,72 @@ private void processQueryEvent(String dbName, String sql, SchemaStore schemaStor tableCache.clear(); } + private List processSqlWithSchemaRetry(SchemaStore schemaStore, String sql, String dbName, Position position) throws Exception { + boolean retried = false; + while (true) { + try { + return schemaStore.processSQL(sql, dbName, position); + } catch (InvalidSchemaError e) { + if (retried || !isSchemaMismatchError(e) || !recaptureSchema("DDL: " + sql, e)) { + throw e; + } + retried = true; + tableCache.clear(); + } + } + } + + private boolean isSchemaMismatchError(InvalidSchemaError e) { + String message = e.getMessage(); + if (message == null) { + return false; + } + return message.contains("create existing table") + || message.contains("Couldn't find table") + || message.contains("Couldn't find database"); + } + + private boolean recaptureSchema(String reason, Exception e) { + if (!(schemaStore instanceof MysqlSchemaStore)) { + LOGGER.warn("Unable to recapture schema for {}: schemaStore type {}", reason, schemaStore.getClass().getName(), e); + return false; + } + + try { + LOGGER.warn("Recapturing schema after {} due to error: {}", reason, e.getMessage()); + ((MysqlSchemaStore) schemaStore).captureAndSaveSchema(); + return true; + } catch (SQLException ex) { + LOGGER.error("Schema recapture failed after {}.", reason, ex); + return false; + } + } + + private void processTableMapEvent(BinlogConnectorEvent event) throws Exception { + TableMapEventData data = event.tableMapData(); + boolean retried = false; + while (true) { + try { + tableCache.processEvent(getSchema(), this.filter, this.ignoreMissingSchema, data.getTableId(), data.getDatabase(), data.getTable()); + return; + } catch (RuntimeException e) { + if (retried || !isMissingTableOrDatabase(e) || !recaptureSchema("TABLE_MAP for " + data.getDatabase() + "." + data.getTable(), e)) { + throw e; + } + retried = true; + tableCache.clear(); + } + } + } + + private boolean isMissingTableOrDatabase(RuntimeException e) { + String message = e.getMessage(); + if (message == null) { + return false; + } + return message.startsWith("Couldn't find table") || message.startsWith("Couldn't find database"); + } + private void processQueryEvent(BinlogConnectorEvent event) throws Exception { QueryEventData data = event.queryData(); processQueryEvent( @@ -588,8 +656,7 @@ private RowMapBuffer getTransactionRows(BinlogConnectorEvent beginEvent) throws } break; case TABLE_MAP: - TableMapEventData data = event.tableMapData(); - tableCache.processEvent(getSchema(), this.filter, this.ignoreMissingSchema, data.getTableId(), data.getDatabase(), data.getTable()); + processTableMapEvent(event); break; case ROWS_QUERY: RowsQueryEventData rqed = event.getEvent().getData(); @@ -716,8 +783,7 @@ public RowMap getRow() throws Exception { rowBuffer = getTransactionRows(event); break; case TABLE_MAP: - TableMapEventData data = event.tableMapData(); - tableCache.processEvent(getSchema(), this.filter,this.ignoreMissingSchema, data.getTableId(), data.getDatabase(), data.getTable()); + processTableMapEvent(event); break; case QUERY: QueryEventData qe = event.queryData();