Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -1392,6 +1392,42 @@
<td>String</td>
<td>Define upsert key to do MERGE INTO when executing INSERT INTO, cannot be defined with primary key.</td>
</tr>
<tr>
<td><h5>variant.inferShreddingSchema</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Whether to automatically infer the shredding schema when writing Variant columns.</td>
</tr>
<tr>
<td><h5>variant.shredding.maxInferBufferRow</h5></td>
<td style="word-wrap: break-word;">4096</td>
<td>Integer</td>
<td>Maximum number of rows to buffer for schema inference.</td>
</tr>
<tr>
<td><h5>variant.shredding.maxSchemaDepth</h5></td>
<td style="word-wrap: break-word;">50</td>
<td>Integer</td>
<td>Maximum traversal depth in Variant values during schema inference.</td>
</tr>
<tr>
<td><h5>variant.shredding.maxSchemaWidth</h5></td>
<td style="word-wrap: break-word;">300</td>
<td>Integer</td>
<td>Maximum number of shredded fields allowed in an inferred schema.</td>
</tr>
<tr>
<td><h5>variant.shredding.minFieldCardinalityRatio</h5></td>
<td style="word-wrap: break-word;">0.1</td>
<td>Double</td>
<td>Minimum fraction of rows that must contain a field for it to be shredded. Fields below this threshold will remain in the un-shredded Variant binary.</td>
</tr>
<tr>
<td><h5>variant.shreddingSchema</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>The Variant shredding schema for writing.</td>
</tr>
<tr>
<td><h5>write-buffer-for-append</h5></td>
<td style="word-wrap: break-word;">false</td>
Expand Down
42 changes: 42 additions & 0 deletions paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,48 @@ public InlineElement getDescription() {
.defaultValue(true)
.withDescription("Whether enabled read file index.");

public static final ConfigOption<String> VARIANT_SHREDDING_SCHEMA =
key("variant.shreddingSchema")
.stringType()
.noDefaultValue()
.withFallbackKeys("parquet.variant.shreddingSchema")
.withDescription("The Variant shredding schema for writing.");

public static final ConfigOption<Boolean> VARIANT_INFER_SHREDDING_SCHEMA =
key("variant.inferShreddingSchema")
.booleanType()
.defaultValue(false)
.withDescription(
"Whether to automatically infer the shredding schema when writing Variant columns.");

public static final ConfigOption<Integer> VARIANT_SHREDDING_MAX_SCHEMA_WIDTH =
key("variant.shredding.maxSchemaWidth")
.intType()
.defaultValue(300)
.withDescription(
"Maximum number of shredded fields allowed in an inferred schema.");

public static final ConfigOption<Integer> VARIANT_SHREDDING_MAX_SCHEMA_DEPTH =
key("variant.shredding.maxSchemaDepth")
.intType()
.defaultValue(50)
.withDescription(
"Maximum traversal depth in Variant values during schema inference.");

public static final ConfigOption<Double> VARIANT_SHREDDING_MIN_FIELD_CARDINALITY_RATIO =
key("variant.shredding.minFieldCardinalityRatio")
.doubleType()
.defaultValue(0.1)
.withDescription(
"Minimum fraction of rows that must contain a field for it to be shredded. "
+ "Fields below this threshold will remain in the un-shredded Variant binary.");

public static final ConfigOption<Integer> VARIANT_SHREDDING_MAX_INFER_BUFFER_ROW =
key("variant.shredding.maxInferBufferRow")
.intType()
.defaultValue(4096)
.withDescription("Maximum number of rows to buffer for schema inference.");

public static final ConfigOption<String> MANIFEST_FORMAT =
key("manifest.format")
.stringType()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.format.variant;

import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.variant.InferVariantShreddingSchema;
import org.apache.paimon.format.BundleFormatWriter;
import org.apache.paimon.format.FormatWriter;
import org.apache.paimon.fs.PositionOutputStream;
import org.apache.paimon.io.BundleRecords;
import org.apache.paimon.types.RowType;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

/**
* A generic writer that infers the shredding schema from buffered rows before writing.
*
* <p>This writer buffers rows up to a threshold, infers the optimal schema from them, then writes
* all data using the inferred schema. It works with any format that implements {@link
* SupportsVariantInference}.
*/
public class InferVariantShreddingWriter implements BundleFormatWriter {

private final SupportsVariantInference writerFactory;
private final InferVariantShreddingSchema shreddingSchemaInfer;
private final int maxBufferRow;
private final PositionOutputStream out;
private final String compression;

private final List<InternalRow> bufferedRows;
private final List<BundleRecords> bufferedBundles;

private FormatWriter actualWriter;
private boolean schemaFinalized = false;
private long totalBufferedRowCount = 0;

public InferVariantShreddingWriter(
SupportsVariantInference writerFactory,
InferVariantShreddingSchema shreddingSchemaInfer,
int maxBufferRow,
PositionOutputStream out,
String compression) {
this.writerFactory = writerFactory;
this.shreddingSchemaInfer = shreddingSchemaInfer;
this.maxBufferRow = maxBufferRow;
this.out = out;
this.compression = compression;
this.bufferedRows = new ArrayList<>();
this.bufferedBundles = new ArrayList<>();
}

@Override
public void addElement(InternalRow row) throws IOException {
if (!schemaFinalized) {
bufferedRows.add(row);
totalBufferedRowCount++;
if (totalBufferedRowCount >= maxBufferRow) {
finalizeSchemaAndFlush();
}
} else {
actualWriter.addElement(row);
}
}

@Override
public void writeBundle(BundleRecords bundle) throws IOException {
if (!schemaFinalized) {
bufferedBundles.add(bundle);
totalBufferedRowCount += bundle.rowCount();
if (totalBufferedRowCount >= maxBufferRow) {
finalizeSchemaAndFlush();
}
} else {
((BundleFormatWriter) actualWriter).writeBundle(bundle);
}
}

@Override
public boolean reachTargetSize(boolean suggestedCheck, long targetSize) throws IOException {
if (!schemaFinalized) {
return false;
}
return actualWriter.reachTargetSize(suggestedCheck, targetSize);
}

@Override
public void close() throws IOException {
try {
if (!schemaFinalized) {
finalizeSchemaAndFlush();
}
} finally {
if (actualWriter != null) {
actualWriter.close();
}
}
}

private void finalizeSchemaAndFlush() throws IOException {
RowType inferredShreddingSchema = shreddingSchemaInfer.inferSchema(collectAllRows());
actualWriter =
writerFactory.createWithShreddingSchema(out, compression, inferredShreddingSchema);
schemaFinalized = true;

if (!bufferedBundles.isEmpty()) {
BundleFormatWriter bundleWriter = (BundleFormatWriter) actualWriter;
for (BundleRecords bundle : bufferedBundles) {
bundleWriter.writeBundle(bundle);
}
bufferedBundles.clear();
} else {
for (InternalRow row : bufferedRows) {
actualWriter.addElement(row);
}
bufferedRows.clear();
}
}

private List<InternalRow> collectAllRows() {
if (!bufferedBundles.isEmpty()) {
List<InternalRow> allRows = new ArrayList<>();
for (BundleRecords bundle : bufferedBundles) {
for (InternalRow row : bundle) {
allRows.add(row);
}
}
return allRows;
} else {
return bufferedRows;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.format.variant;

import org.apache.paimon.format.FormatWriter;
import org.apache.paimon.fs.PositionOutputStream;
import org.apache.paimon.types.RowType;

import java.io.IOException;

/**
* Interface for FormatWriterFactory implementations that support variant schema inference.
*
* <p>Writers implementing this interface can dynamically update their schema based on inferred
* variant shredding schemas.
*/
public interface SupportsVariantInference {

/**
* Create the writer with the inferred shredding schema using the same output stream and
* compression settings.
*
* @param out The output stream to write to
* @param compression The compression codec
* @param inferredShreddingSchema The inferred shredding schema for variant fields
* @return A new FormatWriter configured with the inferred schema
* @throws IOException If the writer cannot be created
*/
FormatWriter createWithShreddingSchema(
PositionOutputStream out, String compression, RowType inferredShreddingSchema)
throws IOException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.format.variant;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.variant.InferVariantShreddingSchema;
import org.apache.paimon.options.Options;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.RowType;
import org.apache.paimon.types.VariantType;

/** Variant schema inference configuration. */
public class VariantInferenceConfig {

private final RowType rowType;
private final Options options;

public VariantInferenceConfig(RowType rowType, Options options) {
this.rowType = rowType;
this.options = options;
}

/** Determines whether variant schema inference should be enabled. */
public boolean shouldEnableInference() {
if (options.contains(CoreOptions.VARIANT_SHREDDING_SCHEMA)) {
return false;
}

if (!options.get(CoreOptions.VARIANT_INFER_SHREDDING_SCHEMA)) {
return false;
}

return containsVariantFields(rowType);
}

private boolean containsVariantFields(RowType rowType) {
for (DataField field : rowType.getFields()) {
if (field.type() instanceof VariantType) {
return true;
}
}
return false;
}

/** Create a schema inferrer. */
public InferVariantShreddingSchema createInferrer() {
return new InferVariantShreddingSchema(
rowType,
options.get(CoreOptions.VARIANT_SHREDDING_MAX_SCHEMA_WIDTH),
options.get(CoreOptions.VARIANT_SHREDDING_MAX_SCHEMA_DEPTH),
options.get(CoreOptions.VARIANT_SHREDDING_MIN_FIELD_CARDINALITY_RATIO));
}

/** Get the maximum number of rows to buffer for inference. */
public int getMaxBufferRow() {
return options.get(CoreOptions.VARIANT_SHREDDING_MAX_INFER_BUFFER_ROW);
}
}
Loading