diff --git a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/result/iceberg/IcebergDocument.scala b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/result/iceberg/IcebergDocument.scala index e238ab7417a..e10152cdaeb 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/result/iceberg/IcebergDocument.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/result/iceberg/IcebergDocument.scala @@ -258,6 +258,12 @@ private[storage] class IcebergDocument[T >: Null <: AnyRef]( return false } + // If the current file still has records, return immediately without touching the catalog. + // Only when the current file is exhausted do we check for more files and possibly refresh. + if (currentRecordIterator.hasNext) { + return true + } + if (!usableFileIterator.hasNext) { usableFileIterator = seekToUsableFile() } diff --git a/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergDocumentSpec.scala b/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergDocumentSpec.scala index fe06a47d191..8fdf039f3ea 100644 --- a/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergDocumentSpec.scala +++ b/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergDocumentSpec.scala @@ -19,9 +19,11 @@ package org.apache.texera.amber.storage.result.iceberg +import org.apache.texera.amber.config.StorageConfig import org.apache.texera.amber.core.storage.model.{VirtualDocument, VirtualDocumentSpec} -import org.apache.texera.amber.core.storage.{DocumentFactory, VFSURIFactory} +import org.apache.texera.amber.core.storage.{DocumentFactory, IcebergCatalogInstance, VFSURIFactory} import org.apache.texera.amber.core.tuple.{Attribute, AttributeType, Schema, Tuple} +import org.apache.iceberg.Table import org.apache.texera.amber.core.virtualidentity.{ ExecutionIdentity, OperatorIdentity, @@ -35,9 +37,11 @@ import org.apache.iceberg.data.Record import org.apache.iceberg.{Schema => IcebergSchema} import org.scalatest.BeforeAndAfterAll +import java.lang.reflect.{InvocationHandler, Method, Proxy} import java.net.URI import java.sql.Timestamp import java.util.UUID +import java.util.concurrent.atomic.AtomicInteger class IcebergDocumentSpec extends VirtualDocumentSpec[Tuple] with BeforeAndAfterAll { @@ -99,6 +103,78 @@ class IcebergDocumentSpec extends VirtualDocumentSpec[Tuple] with BeforeAndAfter DocumentFactory.openDocument(uri)._1.asInstanceOf[VirtualDocument[Tuple]] } + it should "not trigger excessive catalog seeks when reading the last file (lazy file advancement)" in { + val batchSize = StorageConfig.icebergTableCommitBatchSize + val items = generateSampleItems().take(batchSize * 2) + val (batch1, batch2) = items.splitAt(batchSize) + + // Write two separate batches to produce two committed data files. + // This also initialises `document.catalog` (lazy val) with the real catalog, which + // is why we open a fresh reader document below after injecting the spy. + val writer1 = document.writer(UUID.randomUUID().toString) + writer1.open(); batch1.foreach(writer1.putOne); writer1.close() + + val writer2 = document.writer(UUID.randomUUID().toString) + writer2.open(); batch2.foreach(writer2.putOne); writer2.close() + + val refreshCount = new AtomicInteger(0) + val realCatalog = IcebergCatalogInstance.getInstance() + IcebergCatalogInstance.replaceInstance(catalogWithRefreshSpy(realCatalog, refreshCount)) + // Open a fresh reader: its `catalog` lazy val hasn't been initialised yet, so it + // will pick up the spy catalog on first access inside seekToUsableFile. + val readerDoc = getDocument + try { + val retrieved = readerDoc.get().toList + assert( + retrieved.toSet == items.toSet, + "All records from both files should be read correctly" + ) + // With lazy file advancement seekToUsableFile() (and therefore table.refresh()) is called: + // once on iterator creation, once when the last file is exhausted → 2 total. + // Without the fix it would be called once per hasNext() on the last file → O(batchSize). + assert( + refreshCount.get() <= 4, + s"table.refresh() should be called at most 4 times (lazy advancement), but was ${refreshCount.get()}" + ) + } finally { + IcebergCatalogInstance.replaceInstance(realCatalog) + } + } + + /** Returns a dynamic proxy for `realTable` that increments `counter` on every `refresh()` call. */ + private def tableWithRefreshSpy(realTable: Table, counter: AtomicInteger): Table = + Proxy + .newProxyInstance( + classOf[Table].getClassLoader, + Array(classOf[Table]), + new InvocationHandler { + override def invoke(proxy: Object, method: Method, args: Array[Object]): Object = { + if (method.getName == "refresh") counter.incrementAndGet() + if (args == null) method.invoke(realTable) else method.invoke(realTable, args: _*) + } + } + ) + .asInstanceOf[Table] + + /** Returns a dynamic proxy for `realCatalog` that wraps every loaded `Table` with a refresh spy. */ + private def catalogWithRefreshSpy(realCatalog: Catalog, counter: AtomicInteger): Catalog = + Proxy + .newProxyInstance( + classOf[Catalog].getClassLoader, + Array(classOf[Catalog]), + new InvocationHandler { + override def invoke(proxy: Object, method: Method, args: Array[Object]): Object = { + val result = + if (args == null) method.invoke(realCatalog) else method.invoke(realCatalog, args: _*) + if (method.getName == "loadTable" && result != null) + tableWithRefreshSpy(result.asInstanceOf[Table], counter) + else + result + } + } + ) + .asInstanceOf[Catalog] + override def generateSampleItems(): List[Tuple] = { val baseTuples = List( Tuple