Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,12 @@ private[storage] class IcebergDocument[T >: Null <: AnyRef](
return false
}

// If the current file still has records, return immediately without touching the catalog.
// Only when the current file is exhausted do we check for more files and possibly refresh.
if (currentRecordIterator.hasNext) {
return true
}

if (!usableFileIterator.hasNext) {
usableFileIterator = seekToUsableFile()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@

package org.apache.texera.amber.storage.result.iceberg

import org.apache.texera.amber.config.StorageConfig
import org.apache.texera.amber.core.storage.model.{VirtualDocument, VirtualDocumentSpec}
import org.apache.texera.amber.core.storage.{DocumentFactory, VFSURIFactory}
import org.apache.texera.amber.core.storage.{DocumentFactory, IcebergCatalogInstance, VFSURIFactory}
import org.apache.texera.amber.core.tuple.{Attribute, AttributeType, Schema, Tuple}
import org.apache.iceberg.Table
import org.apache.texera.amber.core.virtualidentity.{
ExecutionIdentity,
OperatorIdentity,
Expand All @@ -35,9 +37,11 @@ import org.apache.iceberg.data.Record
import org.apache.iceberg.{Schema => IcebergSchema}
import org.scalatest.BeforeAndAfterAll

import java.lang.reflect.{InvocationHandler, Method, Proxy}
import java.net.URI
import java.sql.Timestamp
import java.util.UUID
import java.util.concurrent.atomic.AtomicInteger

class IcebergDocumentSpec extends VirtualDocumentSpec[Tuple] with BeforeAndAfterAll {

Expand Down Expand Up @@ -99,6 +103,78 @@ class IcebergDocumentSpec extends VirtualDocumentSpec[Tuple] with BeforeAndAfter
DocumentFactory.openDocument(uri)._1.asInstanceOf[VirtualDocument[Tuple]]
}

it should "not trigger excessive catalog seeks when reading the last file (lazy file advancement)" in {
val batchSize = StorageConfig.icebergTableCommitBatchSize
val items = generateSampleItems().take(batchSize * 2)
val (batch1, batch2) = items.splitAt(batchSize)

// Write two separate batches to produce two committed data files.
// This also initialises `document.catalog` (lazy val) with the real catalog, which
// is why we open a fresh reader document below after injecting the spy.
val writer1 = document.writer(UUID.randomUUID().toString)
writer1.open(); batch1.foreach(writer1.putOne); writer1.close()

val writer2 = document.writer(UUID.randomUUID().toString)
writer2.open(); batch2.foreach(writer2.putOne); writer2.close()

val refreshCount = new AtomicInteger(0)
val realCatalog = IcebergCatalogInstance.getInstance()
IcebergCatalogInstance.replaceInstance(catalogWithRefreshSpy(realCatalog, refreshCount))
// Open a fresh reader: its `catalog` lazy val hasn't been initialised yet, so it
// will pick up the spy catalog on first access inside seekToUsableFile.
val readerDoc = getDocument
try {
val retrieved = readerDoc.get().toList
assert(
retrieved.toSet == items.toSet,
"All records from both files should be read correctly"
)
// With lazy file advancement seekToUsableFile() (and therefore table.refresh()) is called:
// once on iterator creation, once when the last file is exhausted → 2 total.
// Without the fix it would be called once per hasNext() on the last file → O(batchSize).
assert(
refreshCount.get() <= 4,
s"table.refresh() should be called at most 4 times (lazy advancement), but was ${refreshCount.get()}"
)
} finally {
IcebergCatalogInstance.replaceInstance(realCatalog)
}
}

/** Returns a dynamic proxy for `realTable` that increments `counter` on every `refresh()` call. */
private def tableWithRefreshSpy(realTable: Table, counter: AtomicInteger): Table =
Proxy
.newProxyInstance(
classOf[Table].getClassLoader,
Array(classOf[Table]),
new InvocationHandler {
override def invoke(proxy: Object, method: Method, args: Array[Object]): Object = {
if (method.getName == "refresh") counter.incrementAndGet()
if (args == null) method.invoke(realTable) else method.invoke(realTable, args: _*)
}
}
)
.asInstanceOf[Table]

/** Returns a dynamic proxy for `realCatalog` that wraps every loaded `Table` with a refresh spy. */
private def catalogWithRefreshSpy(realCatalog: Catalog, counter: AtomicInteger): Catalog =
Proxy
.newProxyInstance(
classOf[Catalog].getClassLoader,
Array(classOf[Catalog]),
new InvocationHandler {
override def invoke(proxy: Object, method: Method, args: Array[Object]): Object = {
val result =
if (args == null) method.invoke(realCatalog) else method.invoke(realCatalog, args: _*)
if (method.getName == "loadTable" && result != null)
tableWithRefreshSpy(result.asInstanceOf[Table], counter)
else
result
}
}
)
.asInstanceOf[Catalog]

override def generateSampleItems(): List[Tuple] = {
val baseTuples = List(
Tuple
Expand Down
Loading