Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -326,9 +326,7 @@ private AddedAndRemovedFiles getUpdatesToPartitionForReplaceCommit(
partitionValuesExtractor.extractPartitionValues(partitioningFields, partitionPath);
Stream<HoodieFileGroup> fileGroups =
Stream.concat(
fsView.getAllFileGroups(partitionPath),
fsView.getReplacedFileGroupsBeforeOrOn(
instantToConsider.getTimestamp(), partitionPath));
fsView.getAllFileGroups(partitionPath), fsView.getAllReplacedFileGroups(partitionPath));
fileGroups.forEach(
fileGroup -> {
List<HoodieBaseFile> baseFiles = fileGroup.getAllBaseFiles().collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,19 @@ public void deletePartition(String partition, HoodieTableType tableType) {
assertNoWriteErrors(result);
}

public List<HoodieRecord<HoodieAvroPayload>> insertOverwrite(
List<HoodieRecord<HoodieAvroPayload>> records) {
String actionType =
CommitUtils.getCommitActionType(
WriteOperationType.INSERT_OVERWRITE, HoodieTableType.COPY_ON_WRITE);
String instant = getStartCommitOfActionType(actionType);
JavaRDD<HoodieRecord<HoodieAvroPayload>> writeRecords = jsc.parallelize(records, 1);
HoodieWriteResult writeResult = writeClient.insertOverwrite(writeRecords, instant);
List<WriteStatus> result = writeResult.getWriteStatuses().collect();
assertNoWriteErrors(result);
return records;
}

public void cluster() {
String instant = writeClient.scheduleClustering(Option.empty()).get();
writeClient.cluster(instant, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,54 @@ public void testsForDropPartition(HoodieTableType tableType) {
}
}

@Test
public void testMultipleInsertOverwriteOnSamePartitions() {
String tableName = "test_table_" + UUID.randomUUID();
try (TestSparkHudiTable table =
TestSparkHudiTable.forStandardSchema(
tableName, tempDir, jsc, "level:SIMPLE", HoodieTableType.COPY_ON_WRITE)) {
List<List<String>> allBaseFilePaths = new ArrayList<>();
List<TableChange> allTableChanges = new ArrayList<>();

// Initial insert into partition "INFO"
String commitInstant1 = table.startCommit();
List<HoodieRecord<HoodieAvroPayload>> insertsForCommit1 = table.generateRecords(50, "INFO");
table.insertRecordsWithCommitAlreadyStarted(insertsForCommit1, commitInstant1, true);
allBaseFilePaths.add(table.getAllLatestBaseFilePaths());

// INSERT_OVERWRITE on "INFO" partition (replacecommit A — new file groups replace initial)
List<HoodieRecord<HoodieAvroPayload>> overwriteRecords1 = table.generateRecords(30, "INFO");
table.insertOverwrite(overwriteRecords1);
allBaseFilePaths.add(table.getAllLatestBaseFilePaths());

// INSERT_OVERWRITE on "INFO" partition again (replacecommit B — new file groups replace A's)
List<HoodieRecord<HoodieAvroPayload>> overwriteRecords2 = table.generateRecords(20, "INFO");
table.insertOverwrite(overwriteRecords2);
allBaseFilePaths.add(table.getAllLatestBaseFilePaths());

HudiConversionSource hudiClient =
getHudiSourceClient(CONFIGURATION, table.getBasePath(), "level:VALUE");
// Get the current snapshot
InternalSnapshot internalSnapshot = hudiClient.getCurrentSnapshot();
ValidationTestHelper.validateSnapshot(
internalSnapshot, allBaseFilePaths.get(allBaseFilePaths.size() - 1));
// Get changes in Incremental format since the initial insert
InstantsForIncrementalSync instantsForIncrementalSync =
InstantsForIncrementalSync.builder()
.lastSyncInstant(HudiInstantUtils.parseFromInstantTime(commitInstant1))
.build();
CommitsBacklog<HoodieInstant> instantCommitsBacklog =
hudiClient.getCommitsBacklog(instantsForIncrementalSync);
for (HoodieInstant instant : instantCommitsBacklog.getCommitsToProcess()) {
TableChange tableChange = hudiClient.getTableChangeForCommit(instant);
allTableChanges.add(tableChange);
}
// Without the fix, replacecommit A would have 0 adds because the FileSystemView
// built from the full timeline marks A's file groups as replaced by B.
ValidationTestHelper.validateTableChanges(allBaseFilePaths, allTableChanges);
}
}

@ParameterizedTest
@MethodSource("testsForAllTableTypes")
public void testsForDeleteAllRecordsInPartition(HoodieTableType tableType) {
Expand Down