Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion docs/content/append-table/row-tracking.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ CREATE TABLE part_t (
WITH ('row-tracking.enabled' = 'true');
```
Notice that:
- Row tracking is only supported for unaware append tables, not for primary key tables. Which means you can't define `bucket` and `bucket-key` for the table.
- Row tracking is only supported for append tables, not for primary key tables.
- Config bucket-append-ordered must be set to false when using bucket tables.
- Only spark support update, merge into and delete operations on row-tracking tables, Flink SQL does not support these operations yet.
- This function is experimental, this line will be removed after being stable.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,31 @@ public TableSchema(
numBucket = CoreOptions.fromMap(options).bucket();
}

private TableSchema(
int version,
long id,
List<DataField> fields,
int highestFieldId,
List<String> partitionKeys,
List<String> primaryKeys,
Map<String, String> options,
@Nullable String comment,
long timeMillis,
List<String> bucketKeys,
int numBucket) {
this.version = version;
this.id = id;
this.fields = Collections.unmodifiableList(new ArrayList<>(fields));
this.highestFieldId = highestFieldId;
this.partitionKeys = partitionKeys;
this.primaryKeys = primaryKeys;
this.options = options;
this.comment = comment;
this.timeMillis = timeMillis;
this.bucketKeys = bucketKeys;
this.numBucket = numBucket;
}

public int version() {
return version;
}
Expand Down Expand Up @@ -294,7 +319,9 @@ public TableSchema project(@Nullable List<String> writeCols) {
primaryKeys,
options,
comment,
timeMillis);
timeMillis,
bucketKeys,
numBucket);
}

private List<DataField> projectedDataFields(List<String> projectedFieldNames) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,98 +143,115 @@ static class CompactPlanner {

List<DataEvolutionCompactTask> compactPlan(List<ManifestEntry> input) {
List<DataEvolutionCompactTask> tasks = new ArrayList<>();
Map<BinaryRow, List<DataFileMeta>> partitionedFiles = new LinkedHashMap<>();
Map<BinaryRow, Map<Integer, List<DataFileMeta>>> partitionedFiles =
new LinkedHashMap<>();
for (ManifestEntry entry : input) {
partitionedFiles
.computeIfAbsent(entry.partition(), k -> new ArrayList<>())
.computeIfAbsent(entry.partition(), k -> new LinkedHashMap<>())
.computeIfAbsent(entry.bucket(), k -> new ArrayList<>())
.add(entry.file());
}

for (Map.Entry<BinaryRow, List<DataFileMeta>> partitionFiles :
for (Map.Entry<BinaryRow, Map<Integer, List<DataFileMeta>>> partitionFiles :
partitionedFiles.entrySet()) {
BinaryRow partition = partitionFiles.getKey();
List<DataFileMeta> files = partitionFiles.getValue();
RangeHelper<DataFileMeta> rangeHelper =
new RangeHelper<>(
DataFileMeta::nonNullFirstRowId,
// merge adjacent files
f -> f.nonNullFirstRowId() + f.rowCount());
for (Map.Entry<Integer, List<DataFileMeta>> bucketFiles :
partitionFiles.getValue().entrySet()) {
BinaryRow partition = partitionFiles.getKey();
int bucket = bucketFiles.getKey();
List<DataFileMeta> files = bucketFiles.getValue();
RangeHelper<DataFileMeta> rangeHelper =
new RangeHelper<>(
DataFileMeta::nonNullFirstRowId,
// merge adjacent files
f -> f.nonNullFirstRowId() + f.rowCount());

List<List<DataFileMeta>> ranges = rangeHelper.mergeOverlappingRanges(files);
List<List<DataFileMeta>> ranges = rangeHelper.mergeOverlappingRanges(files);

for (List<DataFileMeta> group : ranges) {
List<DataFileMeta> dataFiles = new ArrayList<>();
List<DataFileMeta> blobFiles = new ArrayList<>();
TreeMap<Long, DataFileMeta> treeMap = new TreeMap<>();
Map<DataFileMeta, List<DataFileMeta>> dataFileToBlobFiles = new HashMap<>();
for (DataFileMeta f : group) {
if (!isBlobFile(f.fileName())) {
treeMap.put(f.nonNullFirstRowId(), f);
dataFiles.add(f);
} else {
blobFiles.add(f);
for (List<DataFileMeta> group : ranges) {
List<DataFileMeta> dataFiles = new ArrayList<>();
List<DataFileMeta> blobFiles = new ArrayList<>();
TreeMap<Long, DataFileMeta> treeMap = new TreeMap<>();
Map<DataFileMeta, List<DataFileMeta>> dataFileToBlobFiles = new HashMap<>();
for (DataFileMeta f : group) {
if (!isBlobFile(f.fileName())) {
treeMap.put(f.nonNullFirstRowId(), f);
dataFiles.add(f);
} else {
blobFiles.add(f);
}
}
}

if (compactBlob) {
// associate blob files to data files
for (DataFileMeta blobFile : blobFiles) {
Long key = treeMap.floorKey(blobFile.nonNullFirstRowId());
if (key != null) {
DataFileMeta dataFile = treeMap.get(key);
if (blobFile.nonNullFirstRowId() >= dataFile.nonNullFirstRowId()
&& blobFile.nonNullFirstRowId()
<= dataFile.nonNullFirstRowId()
+ dataFile.rowCount()
- 1) {
dataFileToBlobFiles
.computeIfAbsent(dataFile, k -> new ArrayList<>())
.add(blobFile);
if (compactBlob) {
// associate blob files to data files
for (DataFileMeta blobFile : blobFiles) {
Long key = treeMap.floorKey(blobFile.nonNullFirstRowId());
if (key != null) {
DataFileMeta dataFile = treeMap.get(key);
if (blobFile.nonNullFirstRowId() >= dataFile.nonNullFirstRowId()
&& blobFile.nonNullFirstRowId()
<= dataFile.nonNullFirstRowId()
+ dataFile.rowCount()
- 1) {
dataFileToBlobFiles
.computeIfAbsent(dataFile, k -> new ArrayList<>())
.add(blobFile);
}
}
}
}
}

RangeHelper<DataFileMeta> rangeHelper2 =
new RangeHelper<>(
DataFileMeta::nonNullFirstRowId,
// files group
f -> f.nonNullFirstRowId() + f.rowCount() - 1);
List<List<DataFileMeta>> groupedFiles =
rangeHelper2.mergeOverlappingRanges(dataFiles);
List<DataFileMeta> waitCompactFiles = new ArrayList<>();
RangeHelper<DataFileMeta> rangeHelper2 =
new RangeHelper<>(
DataFileMeta::nonNullFirstRowId,
// files group
f -> f.nonNullFirstRowId() + f.rowCount() - 1);
List<List<DataFileMeta>> groupedFiles =
rangeHelper2.mergeOverlappingRanges(dataFiles);
List<DataFileMeta> waitCompactFiles = new ArrayList<>();

long weightSum = 0L;
for (List<DataFileMeta> fileGroup : groupedFiles) {
checkArgument(
rangeHelper.areAllRangesSame(fileGroup),
"Data files %s should be all row id ranges same.",
dataFiles);
long currentGroupWeight =
fileGroup.stream()
.mapToLong(d -> Math.max(d.fileSize(), openFileCost))
.sum();
if (currentGroupWeight > targetFileSize) {
// compact current file group to merge field files
tasks.addAll(triggerTask(fileGroup, partition, dataFileToBlobFiles));
// compact wait compact files
tasks.addAll(
triggerTask(waitCompactFiles, partition, dataFileToBlobFiles));
waitCompactFiles = new ArrayList<>();
weightSum = 0;
} else {
weightSum += currentGroupWeight;
waitCompactFiles.addAll(fileGroup);
if (weightSum > targetFileSize) {
long weightSum = 0L;
for (List<DataFileMeta> fileGroup : groupedFiles) {
checkArgument(
rangeHelper.areAllRangesSame(fileGroup),
"Data files %s should be all row id ranges same.",
dataFiles);
long currentGroupWeight =
fileGroup.stream()
.mapToLong(d -> Math.max(d.fileSize(), openFileCost))
.sum();
if (currentGroupWeight > targetFileSize) {
// compact current file group to merge field files
tasks.addAll(
triggerTask(
waitCompactFiles, partition, dataFileToBlobFiles));
fileGroup, partition, bucket, dataFileToBlobFiles));
// compact wait compact files
tasks.addAll(
triggerTask(
waitCompactFiles,
partition,
bucket,
dataFileToBlobFiles));
waitCompactFiles = new ArrayList<>();
weightSum = 0L;
weightSum = 0;
} else {
weightSum += currentGroupWeight;
waitCompactFiles.addAll(fileGroup);
if (weightSum > targetFileSize) {
tasks.addAll(
triggerTask(
waitCompactFiles,
partition,
bucket,
dataFileToBlobFiles));
waitCompactFiles = new ArrayList<>();
weightSum = 0L;
}
}
}
tasks.addAll(
triggerTask(
waitCompactFiles, partition, bucket, dataFileToBlobFiles));
}
tasks.addAll(triggerTask(waitCompactFiles, partition, dataFileToBlobFiles));
}
}
return tasks;
Expand All @@ -243,10 +260,11 @@ List<DataEvolutionCompactTask> compactPlan(List<ManifestEntry> input) {
private List<DataEvolutionCompactTask> triggerTask(
List<DataFileMeta> dataFiles,
BinaryRow partition,
int bucket,
Map<DataFileMeta, List<DataFileMeta>> dataFileToBlobFiles) {
List<DataEvolutionCompactTask> tasks = new ArrayList<>();
if (dataFiles.size() >= compactMinFileNum) {
tasks.add(new DataEvolutionCompactTask(partition, dataFiles, false));
tasks.add(new DataEvolutionCompactTask(partition, bucket, dataFiles, false));
}

if (compactBlob) {
Expand All @@ -256,7 +274,7 @@ private List<DataEvolutionCompactTask> triggerTask(
dataFileToBlobFiles.getOrDefault(dataFile, Collections.emptyList()));
}
if (blobFiles.size() >= compactMinFileNum) {
tasks.add(new DataEvolutionCompactTask(partition, blobFiles, true));
tasks.add(new DataEvolutionCompactTask(partition, bucket, blobFiles, true));
}
}
return tasks;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,15 @@ public class DataEvolutionCompactTask {
Collections.singletonMap(CoreOptions.TARGET_FILE_SIZE.key(), "99999 G");

private final BinaryRow partition;
private final int bucket;
private final List<DataFileMeta> compactBefore;
private final List<DataFileMeta> compactAfter;
private final boolean blobTask;

public DataEvolutionCompactTask(
BinaryRow partition, List<DataFileMeta> files, boolean blobTask) {
BinaryRow partition, int bucket, List<DataFileMeta> files, boolean blobTask) {
this.partition = partition;
this.bucket = bucket;
this.compactBefore = new ArrayList<>(files);
this.compactAfter = new ArrayList<>();
this.blobTask = blobTask;
Expand All @@ -68,6 +70,10 @@ public BinaryRow partition() {
return partition;
}

public int getBucket() {
return bucket;
}

public List<DataFileMeta> compactBefore() {
return compactBefore;
}
Expand Down Expand Up @@ -100,7 +106,7 @@ public CommitMessage doCompact(FileStoreTable table, String commitUser) throws E
DataSplit dataSplit =
DataSplit.builder()
.withPartition(partition)
.withBucket(0)
.withBucket(bucket)
.withDataFiles(compactBefore)
.withBucketPath(pathFactory.bucketPath(partition, 0).toString())
.rawConvertible(false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ public void serializeList(List<DataEvolutionCompactTask> list, DataOutputView vi

private void serialize(DataEvolutionCompactTask task, DataOutputView view) throws IOException {
serializeBinaryRow(task.partition(), view);
view.writeInt(task.getBucket());
dataFileSerializer.serializeList(task.compactBefore(), view);
view.writeBoolean(task.isBlobTask());
}
Expand Down Expand Up @@ -105,6 +106,7 @@ private void checkVersion(int version) {
private DataEvolutionCompactTask deserialize(DataInputView view) throws IOException {
return new DataEvolutionCompactTask(
deserializeBinaryRow(view),
view.readInt(),
dataFileSerializer.deserializeList(view),
view.readBoolean());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ protected CompactManager getCompactManager(
List<DataFileMeta> restoredFiles,
ExecutorService compactExecutor,
@Nullable BucketedDvMaintainer dvMaintainer) {
if (options.writeOnly()) {
if (options.writeOnly() || options.rowTrackingEnabled()) {
return new NoopCompactManager();
} else if (options.bucketClusterEnabled()) {
return new BucketedAppendClusterManager(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -634,10 +634,6 @@ private static void validateMergeFunctionFactory(TableSchema schema) {
private static void validateRowTracking(TableSchema schema, CoreOptions options) {
boolean rowTrackingEnabled = options.rowTrackingEnabled();
if (rowTrackingEnabled) {
checkArgument(
options.bucket() == -1,
"Cannot define %s for row tracking table, it only support bucket = -1",
CoreOptions.BUCKET.key());
checkArgument(
schema.primaryKeys().isEmpty(),
"Cannot define %s for row tracking table.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ public void testSerializerBasic() throws IOException {
createDataFileMeta("file2.parquet", 100L, 100L, 0, 1024));

DataEvolutionCompactTask task =
new DataEvolutionCompactTask(BinaryRow.EMPTY_ROW, files, false);
new DataEvolutionCompactTask(BinaryRow.EMPTY_ROW, 0, files, false);

byte[] bytes = serializer.serialize(task);
DataEvolutionCompactTask deserialized =
Expand All @@ -266,7 +266,7 @@ public void testSerializerBlobTask() throws IOException {
createDataFileMeta("file2.blob", 0L, 100L, 0, 1024));

DataEvolutionCompactTask task =
new DataEvolutionCompactTask(BinaryRow.EMPTY_ROW, files, true);
new DataEvolutionCompactTask(BinaryRow.EMPTY_ROW, 0, files, true);

byte[] bytes = serializer.serialize(task);
DataEvolutionCompactTask deserialized =
Expand All @@ -286,7 +286,7 @@ public void testSerializerWithPartition() throws IOException {
createDataFileMeta("file2.parquet", 100L, 100L, 0, 1024));

BinaryRow partition = BinaryRow.singleColumn(42);
DataEvolutionCompactTask task = new DataEvolutionCompactTask(partition, files, false);
DataEvolutionCompactTask task = new DataEvolutionCompactTask(partition, 0, files, false);

byte[] bytes = serializer.serialize(task);
DataEvolutionCompactTask deserialized =
Expand Down
Loading