Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,50 @@ case class MergeIntoPaimonDataEvolutionTable(
import MergeIntoPaimonDataEvolutionTable._

override val table: FileStoreTable = v2Table.getTable.asInstanceOf[FileStoreTable]

private val updateColumns: Set[AttributeReference] = {
val columns = mutable.Set[AttributeReference]()
for (action <- matchedActions) {
action match {
case updateAction: UpdateAction =>
for (assignment <- updateAction.assignments) {
if (!assignment.key.equals(assignment.value)) {
val key = assignment.key.asInstanceOf[AttributeReference]
columns ++= Seq(key)
}
}
}
}
columns.toSet
}

Option(table.snapshotManager().latestSnapshot()) match {
case Some(snapshot) => {
val updatedColNames: Set[String] = updateColumns.map(_.name)
val rowType = table.rowType()

val globalIndexedFieldNames: Set[String] = table
.store()
.newIndexFileHandler()
.scan(snapshot, entry => entry.indexFile().globalIndexMeta() != null)
.asScala
.map(entry => entry.indexFile().globalIndexMeta().indexFieldId())
.map(fieldId => rowType.getField(fieldId).name())
.toSet

val conflicted = updatedColNames.intersect(globalIndexedFieldNames)
if (conflicted.nonEmpty) {
throw new RuntimeException(
s"""MergeInto: update columns contain globally indexed columns, not supported now.
|Updated columns: ${updatedColNames.toSeq.sorted.mkString("[", ", ", "]")}
|Global-indexed columns: ${globalIndexedFieldNames.toSeq.sorted.mkString("[", ", ", "]")}
|Conflicted columns: ${conflicted.toSeq.sorted.mkString("[", ", ", "]")}
|""".stripMargin)
}
}
case None => ()
}

private val firstRowIds: Seq[Long] = table
.store()
.newScan()
Expand Down Expand Up @@ -230,18 +274,6 @@ case class MergeIntoPaimonDataEvolutionTable(
(o1, o2) => {
o1.toString().compareTo(o2.toString())
}) ++ mergeFields
val updateColumns = mutable.Set[AttributeReference]()
for (action <- matchedActions) {
action match {
case updateAction: UpdateAction =>
for (assignment <- updateAction.assignments) {
if (!assignment.key.equals(assignment.value)) {
val key = assignment.key.asInstanceOf[AttributeReference]
updateColumns ++= Seq(key)
}
}
}
}

val updateColumnsSorted = updateColumns.toSeq.sortBy(
s => targetTable.output.map(x => x.toString()).indexOf(s.toString()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -625,4 +625,44 @@ abstract class RowTrackingTestBase extends PaimonSparkTestBase {
)
}
}

test("Data Evolution: Should not update indexed columns") {
withTable("T") {
spark.sql("""
|CREATE TABLE T (id INT, name STRING)
|TBLPROPERTIES (
| 'bucket' = '-1',
| 'global-index.row-count-per-shard' = '10000',
| 'row-tracking.enabled' = 'true',
| 'data-evolution.enabled' = 'true',
| 'btree-index.records-per-range' = '1000')
|""".stripMargin)

val values =
(0 until 100000).map(i => s"($i, 'name_$i')").mkString(",")
spark.sql(s"INSERT INTO T VALUES $values")

// create global index for table
val output =
spark
.sql(
"CALL sys.create_global_index(table => 'test.T', index_column => 'name', index_type => 'btree'," +
" options => 'btree-index.records-per-range=1000')")
.collect()
.head

assert(output.getBoolean(0))

// call merge into to update data
assert(intercept[RuntimeException] {
sql(s"""
|MERGE INTO T
|USING T AS source
|ON T._ROW_ID = source._ROW_ID
|WHEN MATCHED AND T.id = 500 THEN UPDATE SET name = "updatedName"
|""".stripMargin)
}.getMessage
.contains("MergeInto: update columns contain globally indexed columns, not supported now."))
}
}
}