From 0a4054704875d8d78e8526fdaee247d3c8190ab7 Mon Sep 17 00:00:00 2001 From: Ally Heev Date: Mon, 23 Feb 2026 21:13:55 +0530 Subject: [PATCH] implement semi-mask filtering in arrow table scan --- src/common/arrow/arrow_array_scan.cpp | 146 +++++++++++++------ src/include/storage/table/arrow_node_table.h | 6 + src/storage/table/arrow_node_table.cpp | 89 ++++++++--- 3 files changed, 177 insertions(+), 64 deletions(-) diff --git a/src/common/arrow/arrow_array_scan.cpp b/src/common/arrow/arrow_array_scan.cpp index ae23c27074..95d9b3137a 100644 --- a/src/common/arrow/arrow_array_scan.cpp +++ b/src/common/arrow/arrow_array_scan.cpp @@ -4,6 +4,7 @@ #include "common/types/interval_t.h" #include "common/types/types.h" #include "common/vector/value_vector.h" +#include "function/cast/functions/numeric_limits.h" namespace lbug { namespace common { @@ -13,13 +14,30 @@ namespace common { // all offsets are measured by value, not physical size +template +static void rowIter(const ValueVector& outputVector, uint64_t count, Func&& func) { + if (outputVector.state != nullptr) { + outputVector.state->getSelVector().forEach(func); + } else { + for (uint64_t i = 0; i < count; i++) { + func(i); + } + } +} + template static void scanArrowArrayFixedSizePrimitive(const ArrowArray* array, ValueVector& outputVector, ArrowNullMaskTree* mask, uint64_t srcOffset, uint64_t dstOffset, uint64_t count) { - auto arrayBuffer = (const uint8_t*)array->buffers[1]; + auto arrayBuffer = (const T*)array->buffers[1]; + mask->copyToValueVector(&outputVector, dstOffset, count); - memcpy(outputVector.getData() + dstOffset * outputVector.getNumBytesPerValue(), - arrayBuffer + srcOffset * sizeof(T), count * sizeof(T)); + + rowIter(outputVector, count, [&](auto i) { + if (!mask->isNull(i)) { + auto curValue = arrayBuffer[i + srcOffset]; + outputVector.setValue(i + dstOffset, curValue); + } + }); } template @@ -27,71 +45,83 @@ static void scanArrowArrayFixedSizePrimitiveAndCastTo(const ArrowArray* array, ValueVector& outputVector, ArrowNullMaskTree* mask, uint64_t srcOffset, uint64_t dstOffset, uint64_t count) { auto arrayBuffer = (const SRC*)array->buffers[1]; + mask->copyToValueVector(&outputVector, dstOffset, count); - for (uint64_t i = 0; i < count; i++) { + + rowIter(outputVector, count, [&](auto i) { if (!mask->isNull(i)) { auto curValue = arrayBuffer[i + srcOffset]; outputVector.setValue(i + dstOffset, (DST)curValue); } - } + }); } template<> void scanArrowArrayFixedSizePrimitive(const ArrowArray* array, ValueVector& outputVector, ArrowNullMaskTree* mask, uint64_t srcOffset, uint64_t dstOffset, uint64_t count) { auto arrayBuffer = (const uint8_t*)array->buffers[1]; + mask->copyToValueVector(&outputVector, dstOffset, count); - for (uint64_t i = 0; i < count; i++) { + + rowIter(outputVector, count, [&](auto i) { outputVector.setValue(i + dstOffset, NullMask::isNull((const uint64_t*)arrayBuffer, i + srcOffset)); - } + }); } static void scanArrowArrayDurationScaledUp(const ArrowArray* array, ValueVector& outputVector, ArrowNullMaskTree* mask, int64_t scaleFactor, uint64_t srcOffset, uint64_t dstOffset, uint64_t count) { auto arrayBuffer = ((const int64_t*)array->buffers[1]) + srcOffset; + mask->copyToValueVector(&outputVector, dstOffset, count); - for (uint64_t i = 0; i < count; i++) { + + rowIter(outputVector, count, [&](auto i) { if (!mask->isNull(i)) { auto curValue = arrayBuffer[i]; outputVector.setValue(i + dstOffset, interval_t(0, 0, curValue * scaleFactor)); } - } + }); } static void scanArrowArrayDurationScaledDown(const ArrowArray* array, ValueVector& outputVector, ArrowNullMaskTree* mask, int64_t scaleFactor, uint64_t srcOffset, uint64_t dstOffset, uint64_t count) { auto arrayBuffer = ((const int64_t*)array->buffers[1]) + srcOffset; + mask->copyToValueVector(&outputVector, dstOffset, count); - for (uint64_t i = 0; i < count; i++) { + + rowIter(outputVector, count, [&](auto i) { if (!mask->isNull(i)) { auto curValue = arrayBuffer[i]; outputVector.setValue(i + dstOffset, interval_t(0, 0, curValue / scaleFactor)); } - } + }); } static void scanArrowArrayMonthInterval(const ArrowArray* array, ValueVector& outputVector, ArrowNullMaskTree* mask, uint64_t srcOffset, uint64_t dstOffset, uint64_t count) { auto arrayBuffer = ((const int32_t*)array->buffers[1]) + srcOffset; + mask->copyToValueVector(&outputVector, dstOffset, count); - for (uint64_t i = 0; i < count; i++) { + + rowIter(outputVector, count, [&](auto i) { if (!mask->isNull(i)) { auto curValue = arrayBuffer[i]; outputVector.setValue(i + dstOffset, interval_t(curValue, 0, 0)); } - } + }); } static void scanArrowArrayDayTimeInterval(const ArrowArray* array, ValueVector& outputVector, ArrowNullMaskTree* mask, uint64_t srcOffset, uint64_t dstOffset, uint64_t count) { auto arrayBuffer = ((const int64_t*)array->buffers[1]) + srcOffset; + mask->copyToValueVector(&outputVector, dstOffset, count); - for (uint64_t i = 0; i < count; i++) { + + rowIter(outputVector, count, [&](auto i) { if (!mask->isNull(i)) { int64_t curValue = arrayBuffer[i]; int32_t day = curValue; @@ -99,15 +129,17 @@ static void scanArrowArrayDayTimeInterval(const ArrowArray* array, ValueVector& // arrow stores ms, while we store us outputVector.setValue(i + dstOffset, interval_t(0, day, micros)); } - } + }); } static void scanArrowArrayMonthDayNanoInterval(const ArrowArray* array, ValueVector& outputVector, ArrowNullMaskTree* mask, uint64_t srcOffset, uint64_t dstOffset, uint64_t count) { auto arrayBuffer = (const int64_t*)((const uint8_t*)array->buffers[1] + srcOffset * 16); // 16 bits per value + mask->copyToValueVector(&outputVector, dstOffset, count); - for (uint64_t i = 0; i < count; i++) { + + rowIter(outputVector, count, [&](auto i) { if (!mask->isNull(i)) { int64_t curValue = arrayBuffer[2 * i]; int32_t month = curValue; @@ -115,7 +147,7 @@ static void scanArrowArrayMonthDayNanoInterval(const ArrowArray* array, ValueVec int64_t micros = arrayBuffer[2 * i + 1] / 1000; outputVector.setValue(i + dstOffset, interval_t(month, day, micros)); } - } + }); } template @@ -123,15 +155,17 @@ static void scanArrowArrayBLOB(const ArrowArray* array, ValueVector& outputVecto ArrowNullMaskTree* mask, uint64_t srcOffset, uint64_t dstOffset, uint64_t count) { auto offsets = ((const offsetsT*)array->buffers[1]) + srcOffset; auto arrayBuffer = (const uint8_t*)array->buffers[2]; + mask->copyToValueVector(&outputVector, dstOffset, count); - for (uint64_t i = 0; i < count; i++) { + + rowIter(outputVector, count, [&](auto i) { if (!mask->isNull(i)) { auto curOffset = offsets[i], nextOffset = offsets[i + 1]; const uint8_t* data = arrayBuffer + curOffset; auto length = nextOffset - curOffset; BlobVector::addBlob(&outputVector, i + dstOffset, data, length); } - } + }); } static void scanArrowArrayBLOBView(const ArrowArray* array, ValueVector& outputVector, @@ -139,8 +173,10 @@ static void scanArrowArrayBLOBView(const ArrowArray* array, ValueVector& outputV auto arrayBuffer = (const uint8_t*)(array->buffers[1]); auto valueBuffs = (const uint8_t**)(array->buffers + 2); // BLOB value buffers begin from index 2 onwards + mask->copyToValueVector(&outputVector, dstOffset, count); - for (uint64_t i = 0; i < count; i++) { + + rowIter(outputVector, count, [&](auto i) { if (!mask->isNull(i)) { auto curView = (const int32_t*)(arrayBuffer + (i + srcOffset) * 16); // view structures are 16 bytes long @@ -155,19 +191,21 @@ static void scanArrowArrayBLOBView(const ArrowArray* array, ValueVector& outputV viewLength); } } - } + }); } static void scanArrowArrayFixedBLOB(const ArrowArray* array, ValueVector& outputVector, ArrowNullMaskTree* mask, int64_t BLOBsize, uint64_t srcOffset, uint64_t dstOffset, uint64_t count) { auto arrayBuffer = ((const uint8_t*)array->buffers[1]) + srcOffset * BLOBsize; + mask->copyToValueVector(&outputVector, dstOffset, count); - for (uint64_t i = 0; i < count; i++) { + + rowIter(outputVector, count, [&](auto i) { if (!mask->isNull(i)) { BlobVector::addBlob(&outputVector, i + dstOffset, arrayBuffer + i * BLOBsize, BLOBsize); } - } + }); } template @@ -175,17 +213,24 @@ static void scanArrowArrayList(const ArrowSchema* schema, const ArrowArray* arra ValueVector& outputVector, ArrowNullMaskTree* mask, uint64_t srcOffset, uint64_t dstOffset, uint64_t count) { auto offsets = ((const offsetsT*)array->buffers[1]) + srcOffset; + uint64_t auxDstPosition = function::NumericLimits::maximum(); + mask->copyToValueVector(&outputVector, dstOffset, count); - uint64_t auxDstPosition = 0; - for (uint64_t i = 0; i < count; i++) { + + rowIter(outputVector, count, [&](auto i) { auto curOffset = offsets[i], nextOffset = offsets[i + 1]; // don't check for validity, since we still need to update the offsets auto newEntry = ListVector::addList(&outputVector, nextOffset - curOffset); outputVector.setValue(i + dstOffset, newEntry); - if (i == 0) { + if (auxDstPosition == function::NumericLimits::maximum()) { auxDstPosition = newEntry.offset; } + }); + + if (auxDstPosition == function::NumericLimits::maximum()) { + auxDstPosition = 0; } + ValueVector* auxiliaryBuffer = ListVector::getDataVector(&outputVector); ArrowConverter::fromArrowArray(schema->children[0], array->children[0], *auxiliaryBuffer, mask->getChild(0), offsets[0] + array->children[0]->offset, auxDstPosition, @@ -198,9 +243,11 @@ static void scanArrowArrayListView(const ArrowSchema* schema, const ArrowArray* uint64_t count) { auto offsets = ((const offsetsT*)array->buffers[1]) + srcOffset; auto sizes = ((const offsetsT*)array->buffers[2]) + srcOffset; - mask->copyToValueVector(&outputVector, dstOffset, count); ValueVector* auxiliaryBuffer = ListVector::getDataVector(&outputVector); - for (uint64_t i = 0; i < count; i++) { + + mask->copyToValueVector(&outputVector, dstOffset, count); + + rowIter(outputVector, count, [&](auto i) { if (!mask->isNull(i)) { auto curOffset = offsets[i], size = sizes[i]; auto newEntry = ListVector::addList(&outputVector, size); @@ -210,18 +257,21 @@ static void scanArrowArrayListView(const ArrowSchema* schema, const ArrowArray* ArrowConverter::fromArrowArray(schema->children[0], array->children[0], *auxiliaryBuffer, &childTree, curOffset, newEntry.offset, newEntry.size); } - } + }); } static void scanArrowArrayFixedList(const ArrowSchema* schema, const ArrowArray* array, ValueVector& outputVector, ArrowNullMaskTree* mask, uint64_t srcOffset, uint64_t dstOffset, uint64_t count) { - mask->copyToValueVector(&outputVector, dstOffset, count); auto numElements = ArrayType::getNumElements(outputVector.dataType); - for (auto i = 0u; i < count; ++i) { + + mask->copyToValueVector(&outputVector, dstOffset, count); + + rowIter(outputVector, count, [&](auto i) { auto newEntry = ListVector::addList(&outputVector, numElements); outputVector.setValue(i + dstOffset, newEntry); - } + }); + auto auxiliaryBuffer = ListVector::getDataVector(&outputVector); ArrowConverter::fromArrowArray(schema->children[0], array->children[0], *auxiliaryBuffer, mask->getChild(0), srcOffset * numElements + array->children[0]->offset, @@ -231,13 +281,16 @@ static void scanArrowArrayFixedList(const ArrowSchema* schema, const ArrowArray* static void scanArrowArrayStruct(const ArrowSchema* schema, const ArrowArray* array, ValueVector& outputVector, ArrowNullMaskTree* mask, uint64_t srcOffset, uint64_t dstOffset, uint64_t count) { + mask->copyToValueVector(&outputVector, dstOffset, count); - for (uint64_t i = 0; i < count; i++) { + + rowIter(outputVector, count, [&](auto i) { if (!mask->isNull(i)) { outputVector.setValue(i + dstOffset, i + dstOffset); // struct_entry_t doesn't work for some reason } - } + }); + for (int64_t j = 0; j < schema->n_children; j++) { ArrowConverter::fromArrowArray(schema->children[j], array->children[j], *StructVector::getFieldVector(&outputVector, j).get(), mask->getChild(j), @@ -251,9 +304,11 @@ static void scanArrowArrayDenseUnion(const ArrowSchema* schema, const ArrowArray auto types = ((const uint8_t*)array->buffers[0]) + srcOffset; auto dstTypes = (uint16_t*)UnionVector::getTagVector(&outputVector)->getData(); auto offsets = ((const int32_t*)array->buffers[1]) + srcOffset; - mask->copyToValueVector(&outputVector, dstOffset, count); std::vector firstIncident(array->n_children, INT32_MAX); - for (auto i = 0u; i < count; i++) { + + mask->copyToValueVector(&outputVector, dstOffset, count); + + rowIter(outputVector, count, [&](auto i) { auto curType = types[i]; auto curOffset = offsets[i]; if (curOffset < firstIncident[curType]) { @@ -268,7 +323,7 @@ static void scanArrowArrayDenseUnion(const ArrowSchema* schema, const ArrowArray curOffset + array->children[curType]->offset, i + dstOffset, 1); // may be inefficient, since we're only scanning a single value } - } + }); } static void scanArrowArraySparseUnion(const ArrowSchema* schema, const ArrowArray* array, @@ -276,12 +331,15 @@ static void scanArrowArraySparseUnion(const ArrowSchema* schema, const ArrowArra uint64_t count) { auto types = ((const uint8_t*)array->buffers[0]) + srcOffset; auto dstTypes = (uint16_t*)UnionVector::getTagVector(&outputVector)->getData(); + mask->copyToValueVector(&outputVector, dstOffset, count); - for (uint64_t i = 0; i < count; i++) { + + rowIter(outputVector, count, [&](auto i) { if (!mask->isNull(i)) { dstTypes[i] = types[i]; } - } + }); + // it is specified that values that aren't selected in the type buffer // must also be semantically correct. this is why this scanning works. // however, there is possibly room for optimization here. @@ -299,15 +357,17 @@ static void scanArrowArrayDictionaryEncoded(const ArrowSchema* schema, const Arr uint64_t count) { auto values = ((const offsetsT*)array->buffers[1]) + srcOffset; + mask->copyToValueVector(&outputVector, dstOffset, count); - for (uint64_t i = 0; i < count; i++) { + + rowIter(outputVector, count, [&](auto i) { if (!mask->isNull(i)) { auto dictOffseted = mask->getDictionary()->offsetBy(values[i]); ArrowConverter::fromArrowArray(schema->dictionary, array->dictionary, outputVector, &dictOffseted, values[i] + array->dictionary->offset, i + dstOffset, 1); // possibly inefficient? } - } + }); } static void scanArrowArrayRunEndEncoded(const ArrowSchema* schema, const ArrowArray* array, @@ -332,7 +392,7 @@ static void scanArrowArrayRunEndEncoded(const ArrowSchema* schema, const ArrowAr } } - for (uint64_t i = 0; i < count; i++) { + rowIter(outputVector, count, [&](auto i) { while (i + srcOffset >= runEndBuffer[runEndIdx + 1]) { runEndIdx++; } @@ -340,7 +400,7 @@ static void scanArrowArrayRunEndEncoded(const ArrowSchema* schema, const ArrowAr ArrowConverter::fromArrowArray(schema->children[1], array->children[1], outputVector, &valuesOffseted, runEndIdx, i + dstOffset, 1); // there is optimization to be made here... - } + }); } void ArrowConverter::fromArrowArray(const ArrowSchema* schema, const ArrowArray* array, diff --git a/src/include/storage/table/arrow_node_table.h b/src/include/storage/table/arrow_node_table.h index d8b233798d..bdb1b6d5be 100644 --- a/src/include/storage/table/arrow_node_table.h +++ b/src/include/storage/table/arrow_node_table.h @@ -51,6 +51,12 @@ class ArrowNodeTable final : public ColumnarNodeTableBase { std::string getColumnarFormatName() const override { return "Arrow"; } common::row_idx_t getTotalRowCount(const transaction::Transaction* transaction) const override; +private: + void copyArrowBatchToOutputVectors(const ArrowArrayWrapper& batch, + const size_t currentBatchOffset, const uint64_t numRowsToCopy, + const std::vector& outputVectors, + const std::vector& outputToArrowColumnIdx) const; + private: ArrowSchemaWrapper schema; std::vector arrays; diff --git a/src/storage/table/arrow_node_table.cpp b/src/storage/table/arrow_node_table.cpp index cd60450f41..29c067bb7c 100644 --- a/src/storage/table/arrow_node_table.cpp +++ b/src/storage/table/arrow_node_table.cpp @@ -2,6 +2,7 @@ #include "common/arrow/arrow_converter.h" #include "common/arrow/arrow_nullmask_tree.h" +#include "common/data_chunk/sel_vector.h" #include "common/system_config.h" #include "common/types/types.h" #include "storage/storage_manager.h" @@ -82,6 +83,32 @@ void ArrowNodeTable::initScanState([[maybe_unused]] transaction::Transaction* tr arrowScanState.initialized = true; } +static void applySemiMaskFilter(const TableScanState& state, const size_t startOffset, + const uint64_t numRowsToScan, common::SelectionVector& selVector) { + const auto endOffset = startOffset + numRowsToScan; + const auto& arr = state.semiMask->range(startOffset, endOffset); + if (arr.empty()) { + selVector.setSelSize(0); + } else { + auto stat = selVector.getMutableBuffer(); + uint64_t numSelectedValues = 0; + size_t i = 0, j = 0; + while (i < numRowsToScan && j < arr.size()) { + auto temp = arr[j] - startOffset; + if (selVector[i] < temp) { + ++i; + } else if (selVector[i] > temp) { + ++j; + } else { + stat[numSelectedValues++] = temp; + ++i; + ++j; + } + } + selVector.setToFiltered(numSelectedValues); + } +} + bool ArrowNodeTable::scanInternal([[maybe_unused]] transaction::Transaction* transaction, TableScanState& scanState) { auto& arrowScanState = scanState.cast(); @@ -97,29 +124,24 @@ bool ArrowNodeTable::scanInternal([[maybe_unused]] transaction::Transaction* tra return false; } - auto batchRemaining = batchLength - arrowScanState.currentBatchOffset; - auto outputSize = std::min(1, batchRemaining); - auto numChildren = batch.n_children < 0 ? 0u : static_cast(batch.n_children); - for (uint64_t outCol = 0; outCol < scanState.outputVectors.size(); ++outCol) { - if (!scanState.outputVectors[outCol] || - outCol >= arrowScanState.outputToArrowColumnIdx.size()) { - continue; - } - auto arrowColIdx = arrowScanState.outputToArrowColumnIdx[outCol]; - if (arrowColIdx < 0 || static_cast(arrowColIdx) >= numChildren || - !batch.children || !schema.children || !batch.children[arrowColIdx] || - !schema.children[arrowColIdx]) { - continue; + auto outputSize = static_cast(batchLength - arrowScanState.currentBatchOffset); + + scanState.outState->getSelVectorUnsafe().setSelSize(outputSize); + + if (scanState.semiMask && scanState.semiMask->isEnabled()) { + applySemiMaskFilter(scanState, arrowScanState.nextGlobalRowOffset, outputSize, + scanState.outState->getSelVectorUnsafe()); + if (scanState.outState->getSelVector().getSelSize() == 0) { + arrowScanState.currentBatchOffset += outputSize; + arrowScanState.nextGlobalRowOffset += outputSize; + return true; } - auto& outputVector = *scanState.outputVectors[outCol]; - auto* childArray = batch.children[arrowColIdx]; - auto* childSchema = schema.children[arrowColIdx]; - common::ArrowNullMaskTree nullMask(childSchema, childArray, childArray->offset, - childArray->length); - common::ArrowConverter::fromArrowArray(childSchema, childArray, outputVector, &nullMask, - childArray->offset + arrowScanState.currentBatchOffset, 0, outputSize); } + KU_ASSERT(scanState.outputVectors.size() == arrowScanState.outputToArrowColumnIdx.size()); + copyArrowBatchToOutputVectors(batch, arrowScanState.currentBatchOffset, outputSize, + scanState.outputVectors, arrowScanState.outputToArrowColumnIdx); + auto tableID = this->getTableID(); for (uint64_t i = 0; i < outputSize; ++i) { auto& nodeID = scanState.nodeIDVector->getValue(i); @@ -127,7 +149,6 @@ bool ArrowNodeTable::scanInternal([[maybe_unused]] transaction::Transaction* tra nodeID.offset = arrowScanState.nextGlobalRowOffset + i; } - scanState.outState->getSelVectorUnsafe().setSelSize(outputSize); arrowScanState.currentBatchOffset += outputSize; arrowScanState.nextGlobalRowOffset += outputSize; return true; @@ -143,5 +164,31 @@ common::row_idx_t ArrowNodeTable::getTotalRowCount( return totalRows; } +void ArrowNodeTable::copyArrowBatchToOutputVectors(const ArrowArrayWrapper& batch, + const size_t currentBatchOffset, const uint64_t numRowsToCopy, + const std::vector& outputVectors, + const std::vector& outputToArrowColumnIdx) const { + auto numChildren = batch.n_children < 0 ? 0u : static_cast(batch.n_children); + + for (uint64_t outCol = 0; outCol < outputVectors.size(); ++outCol) { + if (!outputVectors[outCol]) { + continue; + } + auto arrowColIdx = outputToArrowColumnIdx[outCol]; + if (arrowColIdx < 0 || static_cast(arrowColIdx) >= numChildren || + !batch.children || !schema.children || !batch.children[arrowColIdx] || + !schema.children[arrowColIdx]) { + continue; + } + auto& outputVector = *outputVectors[outCol]; + auto* childArray = batch.children[arrowColIdx]; + auto* childSchema = schema.children[arrowColIdx]; + common::ArrowNullMaskTree nullMask(childSchema, childArray, childArray->offset, + childArray->length); + common::ArrowConverter::fromArrowArray(childSchema, childArray, outputVector, &nullMask, + childArray->offset + currentBatchOffset, 0, numRowsToCopy); + } +} + } // namespace storage } // namespace lbug