Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ RUN cd /workspace \
&& echo "$MAXWELL_VERSION" > /REVISION

# Build clean image with non-root priveledge
FROM openjdk:23-jdk-slim
FROM eclipse-temurin:23-jre-noble

RUN apt-get update \
&& apt-get -y upgrade
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;

import com.codahale.metrics.Counter;
import com.codahale.metrics.Meter;
Expand Down Expand Up @@ -192,7 +193,12 @@ private TableSchema getTableSchema(TableName tName) throws IOException {
BigQuery bigquery = BigQueryOptions.newBuilder().setProjectId(tName.getProject()).build().getService();
Table table = bigquery.getTable(tName.getDataset(), tName.getTable());
Schema schema = table.getDefinition().getSchema();
TableSchema tableSchema = BqToBqStorageSchemaConverter.convertTableSchema(schema);
// Filter out bq_inserted_at column from the schema
List<com.google.cloud.bigquery.Field> filteredFields = schema.getFields().stream()
.filter(field -> !"bq_inserted_at".equals(field.getName()))
.collect(Collectors.toList());
Schema filteredSchema = Schema.of(filteredFields);
TableSchema tableSchema = BqToBqStorageSchemaConverter.convertTableSchema(filteredSchema);
return tableSchema;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,15 @@
import com.zendesk.maxwell.schema.*;
import com.zendesk.maxwell.schema.columndef.ColumnDefCastException;
import com.zendesk.maxwell.schema.ddl.DDLMap;
import com.zendesk.maxwell.schema.ddl.InvalidSchemaError;
import com.zendesk.maxwell.schema.ddl.ResolvedSchemaChange;
import com.zendesk.maxwell.scripting.Scripting;
import com.zendesk.maxwell.util.RunLoopProcess;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.sql.SQLException;
import java.util.List;
import java.util.Objects;
import java.util.Set;
Expand Down Expand Up @@ -382,7 +384,7 @@ private RowMap processHeartbeats(RowMap row) {
* @param timestamp The timestamp of the SQL binlog event
*/
private void processQueryEvent(String dbName, String sql, SchemaStore schemaStore, Position position, Position nextPosition, Long timestamp) throws Exception {
List<ResolvedSchemaChange> changes = schemaStore.processSQL(sql, dbName, position);
List<ResolvedSchemaChange> changes = processSqlWithSchemaRetry(schemaStore, sql, dbName, position);
Long schemaId = getSchemaId();

if ( bootstrapper != null)
Expand All @@ -402,6 +404,72 @@ private void processQueryEvent(String dbName, String sql, SchemaStore schemaStor
tableCache.clear();
}

private List<ResolvedSchemaChange> processSqlWithSchemaRetry(SchemaStore schemaStore, String sql, String dbName, Position position) throws Exception {
boolean retried = false;
while (true) {
try {
return schemaStore.processSQL(sql, dbName, position);
} catch (InvalidSchemaError e) {
if (retried || !isSchemaMismatchError(e) || !recaptureSchema("DDL: " + sql, e)) {
throw e;
}
retried = true;
tableCache.clear();
}
}
}

private boolean isSchemaMismatchError(InvalidSchemaError e) {
String message = e.getMessage();
if (message == null) {
return false;
}
return message.contains("create existing table")
|| message.contains("Couldn't find table")
|| message.contains("Couldn't find database");
}

private boolean recaptureSchema(String reason, Exception e) {
if (!(schemaStore instanceof MysqlSchemaStore)) {
LOGGER.warn("Unable to recapture schema for {}: schemaStore type {}", reason, schemaStore.getClass().getName(), e);
return false;
}

try {
LOGGER.warn("Recapturing schema after {} due to error: {}", reason, e.getMessage());
((MysqlSchemaStore) schemaStore).captureAndSaveSchema();
return true;
} catch (SQLException ex) {
LOGGER.error("Schema recapture failed after {}.", reason, ex);
return false;
}
}

private void processTableMapEvent(BinlogConnectorEvent event) throws Exception {
TableMapEventData data = event.tableMapData();
boolean retried = false;
while (true) {
try {
tableCache.processEvent(getSchema(), this.filter, this.ignoreMissingSchema, data.getTableId(), data.getDatabase(), data.getTable());
return;
} catch (RuntimeException e) {
if (retried || !isMissingTableOrDatabase(e) || !recaptureSchema("TABLE_MAP for " + data.getDatabase() + "." + data.getTable(), e)) {
throw e;
}
retried = true;
tableCache.clear();
}
}
}

private boolean isMissingTableOrDatabase(RuntimeException e) {
String message = e.getMessage();
if (message == null) {
return false;
}
return message.startsWith("Couldn't find table") || message.startsWith("Couldn't find database");
}

private void processQueryEvent(BinlogConnectorEvent event) throws Exception {
QueryEventData data = event.queryData();
processQueryEvent(
Expand Down Expand Up @@ -588,8 +656,7 @@ private RowMapBuffer getTransactionRows(BinlogConnectorEvent beginEvent) throws
}
break;
case TABLE_MAP:
TableMapEventData data = event.tableMapData();
tableCache.processEvent(getSchema(), this.filter, this.ignoreMissingSchema, data.getTableId(), data.getDatabase(), data.getTable());
processTableMapEvent(event);
break;
case ROWS_QUERY:
RowsQueryEventData rqed = event.getEvent().getData();
Expand Down Expand Up @@ -716,8 +783,7 @@ public RowMap getRow() throws Exception {
rowBuffer = getTransactionRows(event);
break;
case TABLE_MAP:
TableMapEventData data = event.tableMapData();
tableCache.processEvent(getSchema(), this.filter,this.ignoreMissingSchema, data.getTableId(), data.getDatabase(), data.getTable());
processTableMapEvent(event);
break;
case QUERY:
QueryEventData qe = event.queryData();
Expand Down