Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 103 additions & 43 deletions src/common/arrow/arrow_array_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include "common/types/interval_t.h"
#include "common/types/types.h"
#include "common/vector/value_vector.h"
#include "function/cast/functions/numeric_limits.h"

namespace lbug {
namespace common {
Expand All @@ -13,134 +14,169 @@ namespace common {

// all offsets are measured by value, not physical size

template<typename Func>
static void rowIter(const ValueVector& outputVector, uint64_t count, Func&& func) {
if (outputVector.state != nullptr) {
outputVector.state->getSelVector().forEach(func);
} else {
for (uint64_t i = 0; i < count; i++) {
func(i);
}
}
}

template<typename T>
static void scanArrowArrayFixedSizePrimitive(const ArrowArray* array, ValueVector& outputVector,
ArrowNullMaskTree* mask, uint64_t srcOffset, uint64_t dstOffset, uint64_t count) {
auto arrayBuffer = (const uint8_t*)array->buffers[1];
auto arrayBuffer = (const T*)array->buffers[1];

mask->copyToValueVector(&outputVector, dstOffset, count);
memcpy(outputVector.getData() + dstOffset * outputVector.getNumBytesPerValue(),
arrayBuffer + srcOffset * sizeof(T), count * sizeof(T));

rowIter(outputVector, count, [&](auto i) {
if (!mask->isNull(i)) {
auto curValue = arrayBuffer[i + srcOffset];
outputVector.setValue<T>(i + dstOffset, curValue);
}
});
}

template<typename SRC, typename DST>
static void scanArrowArrayFixedSizePrimitiveAndCastTo(const ArrowArray* array,
ValueVector& outputVector, ArrowNullMaskTree* mask, uint64_t srcOffset, uint64_t dstOffset,
uint64_t count) {
auto arrayBuffer = (const SRC*)array->buffers[1];

mask->copyToValueVector(&outputVector, dstOffset, count);
for (uint64_t i = 0; i < count; i++) {

rowIter(outputVector, count, [&](auto i) {
if (!mask->isNull(i)) {
auto curValue = arrayBuffer[i + srcOffset];
outputVector.setValue<DST>(i + dstOffset, (DST)curValue);
}
}
});
}

template<>
void scanArrowArrayFixedSizePrimitive<bool>(const ArrowArray* array, ValueVector& outputVector,
ArrowNullMaskTree* mask, uint64_t srcOffset, uint64_t dstOffset, uint64_t count) {
auto arrayBuffer = (const uint8_t*)array->buffers[1];

mask->copyToValueVector(&outputVector, dstOffset, count);
for (uint64_t i = 0; i < count; i++) {

rowIter(outputVector, count, [&](auto i) {
outputVector.setValue<bool>(i + dstOffset,
NullMask::isNull((const uint64_t*)arrayBuffer, i + srcOffset));
}
});
}

static void scanArrowArrayDurationScaledUp(const ArrowArray* array, ValueVector& outputVector,
ArrowNullMaskTree* mask, int64_t scaleFactor, uint64_t srcOffset, uint64_t dstOffset,
uint64_t count) {
auto arrayBuffer = ((const int64_t*)array->buffers[1]) + srcOffset;

mask->copyToValueVector(&outputVector, dstOffset, count);
for (uint64_t i = 0; i < count; i++) {

rowIter(outputVector, count, [&](auto i) {
if (!mask->isNull(i)) {
auto curValue = arrayBuffer[i];
outputVector.setValue<interval_t>(i + dstOffset,
interval_t(0, 0, curValue * scaleFactor));
}
}
});
}

static void scanArrowArrayDurationScaledDown(const ArrowArray* array, ValueVector& outputVector,
ArrowNullMaskTree* mask, int64_t scaleFactor, uint64_t srcOffset, uint64_t dstOffset,
uint64_t count) {
auto arrayBuffer = ((const int64_t*)array->buffers[1]) + srcOffset;

mask->copyToValueVector(&outputVector, dstOffset, count);
for (uint64_t i = 0; i < count; i++) {

rowIter(outputVector, count, [&](auto i) {
if (!mask->isNull(i)) {
auto curValue = arrayBuffer[i];
outputVector.setValue<interval_t>(i + dstOffset,
interval_t(0, 0, curValue / scaleFactor));
}
}
});
}

static void scanArrowArrayMonthInterval(const ArrowArray* array, ValueVector& outputVector,
ArrowNullMaskTree* mask, uint64_t srcOffset, uint64_t dstOffset, uint64_t count) {
auto arrayBuffer = ((const int32_t*)array->buffers[1]) + srcOffset;

mask->copyToValueVector(&outputVector, dstOffset, count);
for (uint64_t i = 0; i < count; i++) {

rowIter(outputVector, count, [&](auto i) {
if (!mask->isNull(i)) {
auto curValue = arrayBuffer[i];
outputVector.setValue<interval_t>(i + dstOffset, interval_t(curValue, 0, 0));
}
}
});
}

static void scanArrowArrayDayTimeInterval(const ArrowArray* array, ValueVector& outputVector,
ArrowNullMaskTree* mask, uint64_t srcOffset, uint64_t dstOffset, uint64_t count) {
auto arrayBuffer = ((const int64_t*)array->buffers[1]) + srcOffset;

mask->copyToValueVector(&outputVector, dstOffset, count);
for (uint64_t i = 0; i < count; i++) {

rowIter(outputVector, count, [&](auto i) {
if (!mask->isNull(i)) {
int64_t curValue = arrayBuffer[i];
int32_t day = curValue;
int64_t micros = (curValue >> (4 * sizeof(int64_t))) * 1000;
// arrow stores ms, while we store us
outputVector.setValue<interval_t>(i + dstOffset, interval_t(0, day, micros));
}
}
});
}

static void scanArrowArrayMonthDayNanoInterval(const ArrowArray* array, ValueVector& outputVector,
ArrowNullMaskTree* mask, uint64_t srcOffset, uint64_t dstOffset, uint64_t count) {
auto arrayBuffer =
(const int64_t*)((const uint8_t*)array->buffers[1] + srcOffset * 16); // 16 bits per value

mask->copyToValueVector(&outputVector, dstOffset, count);
for (uint64_t i = 0; i < count; i++) {

rowIter(outputVector, count, [&](auto i) {
if (!mask->isNull(i)) {
int64_t curValue = arrayBuffer[2 * i];
int32_t month = curValue;
int32_t day = curValue >> (4 * sizeof(int64_t));
int64_t micros = arrayBuffer[2 * i + 1] / 1000;
outputVector.setValue<interval_t>(i + dstOffset, interval_t(month, day, micros));
}
}
});
}

template<typename offsetsT>
static void scanArrowArrayBLOB(const ArrowArray* array, ValueVector& outputVector,
ArrowNullMaskTree* mask, uint64_t srcOffset, uint64_t dstOffset, uint64_t count) {
auto offsets = ((const offsetsT*)array->buffers[1]) + srcOffset;
auto arrayBuffer = (const uint8_t*)array->buffers[2];

mask->copyToValueVector(&outputVector, dstOffset, count);
for (uint64_t i = 0; i < count; i++) {

rowIter(outputVector, count, [&](auto i) {
if (!mask->isNull(i)) {
auto curOffset = offsets[i], nextOffset = offsets[i + 1];
const uint8_t* data = arrayBuffer + curOffset;
auto length = nextOffset - curOffset;
BlobVector::addBlob(&outputVector, i + dstOffset, data, length);
}
}
});
}

static void scanArrowArrayBLOBView(const ArrowArray* array, ValueVector& outputVector,
ArrowNullMaskTree* mask, uint64_t srcOffset, uint64_t dstOffset, uint64_t count) {
auto arrayBuffer = (const uint8_t*)(array->buffers[1]);
auto valueBuffs = (const uint8_t**)(array->buffers + 2);
// BLOB value buffers begin from index 2 onwards

mask->copyToValueVector(&outputVector, dstOffset, count);
for (uint64_t i = 0; i < count; i++) {

rowIter(outputVector, count, [&](auto i) {
if (!mask->isNull(i)) {
auto curView = (const int32_t*)(arrayBuffer + (i + srcOffset) * 16);
// view structures are 16 bytes long
Expand All @@ -155,37 +191,46 @@ static void scanArrowArrayBLOBView(const ArrowArray* array, ValueVector& outputV
viewLength);
}
}
}
});
}

static void scanArrowArrayFixedBLOB(const ArrowArray* array, ValueVector& outputVector,
ArrowNullMaskTree* mask, int64_t BLOBsize, uint64_t srcOffset, uint64_t dstOffset,
uint64_t count) {
auto arrayBuffer = ((const uint8_t*)array->buffers[1]) + srcOffset * BLOBsize;

mask->copyToValueVector(&outputVector, dstOffset, count);
for (uint64_t i = 0; i < count; i++) {

rowIter(outputVector, count, [&](auto i) {
if (!mask->isNull(i)) {
BlobVector::addBlob(&outputVector, i + dstOffset, arrayBuffer + i * BLOBsize, BLOBsize);
}
}
});
}

template<typename offsetsT>
static void scanArrowArrayList(const ArrowSchema* schema, const ArrowArray* array,
ValueVector& outputVector, ArrowNullMaskTree* mask, uint64_t srcOffset, uint64_t dstOffset,
uint64_t count) {
auto offsets = ((const offsetsT*)array->buffers[1]) + srcOffset;
uint64_t auxDstPosition = function::NumericLimits<uint64_t>::maximum();

mask->copyToValueVector(&outputVector, dstOffset, count);
uint64_t auxDstPosition = 0;
for (uint64_t i = 0; i < count; i++) {

rowIter(outputVector, count, [&](auto i) {
auto curOffset = offsets[i], nextOffset = offsets[i + 1];
// don't check for validity, since we still need to update the offsets
auto newEntry = ListVector::addList(&outputVector, nextOffset - curOffset);
outputVector.setValue<list_entry_t>(i + dstOffset, newEntry);
if (i == 0) {
if (auxDstPosition == function::NumericLimits<uint64_t>::maximum()) {
auxDstPosition = newEntry.offset;
}
});

if (auxDstPosition == function::NumericLimits<uint64_t>::maximum()) {
auxDstPosition = 0;
}

ValueVector* auxiliaryBuffer = ListVector::getDataVector(&outputVector);
ArrowConverter::fromArrowArray(schema->children[0], array->children[0], *auxiliaryBuffer,
mask->getChild(0), offsets[0] + array->children[0]->offset, auxDstPosition,
Expand All @@ -198,9 +243,11 @@ static void scanArrowArrayListView(const ArrowSchema* schema, const ArrowArray*
uint64_t count) {
auto offsets = ((const offsetsT*)array->buffers[1]) + srcOffset;
auto sizes = ((const offsetsT*)array->buffers[2]) + srcOffset;
mask->copyToValueVector(&outputVector, dstOffset, count);
ValueVector* auxiliaryBuffer = ListVector::getDataVector(&outputVector);
for (uint64_t i = 0; i < count; i++) {

mask->copyToValueVector(&outputVector, dstOffset, count);

rowIter(outputVector, count, [&](auto i) {
if (!mask->isNull(i)) {
auto curOffset = offsets[i], size = sizes[i];
auto newEntry = ListVector::addList(&outputVector, size);
Expand All @@ -210,18 +257,21 @@ static void scanArrowArrayListView(const ArrowSchema* schema, const ArrowArray*
ArrowConverter::fromArrowArray(schema->children[0], array->children[0],
*auxiliaryBuffer, &childTree, curOffset, newEntry.offset, newEntry.size);
}
}
});
}

static void scanArrowArrayFixedList(const ArrowSchema* schema, const ArrowArray* array,
ValueVector& outputVector, ArrowNullMaskTree* mask, uint64_t srcOffset, uint64_t dstOffset,
uint64_t count) {
mask->copyToValueVector(&outputVector, dstOffset, count);
auto numElements = ArrayType::getNumElements(outputVector.dataType);
for (auto i = 0u; i < count; ++i) {

mask->copyToValueVector(&outputVector, dstOffset, count);

rowIter(outputVector, count, [&](auto i) {
auto newEntry = ListVector::addList(&outputVector, numElements);
outputVector.setValue<list_entry_t>(i + dstOffset, newEntry);
}
});

auto auxiliaryBuffer = ListVector::getDataVector(&outputVector);
ArrowConverter::fromArrowArray(schema->children[0], array->children[0], *auxiliaryBuffer,
mask->getChild(0), srcOffset * numElements + array->children[0]->offset,
Expand All @@ -231,13 +281,16 @@ static void scanArrowArrayFixedList(const ArrowSchema* schema, const ArrowArray*
static void scanArrowArrayStruct(const ArrowSchema* schema, const ArrowArray* array,
ValueVector& outputVector, ArrowNullMaskTree* mask, uint64_t srcOffset, uint64_t dstOffset,
uint64_t count) {

mask->copyToValueVector(&outputVector, dstOffset, count);
for (uint64_t i = 0; i < count; i++) {

rowIter(outputVector, count, [&](auto i) {
if (!mask->isNull(i)) {
outputVector.setValue<int64_t>(i + dstOffset,
i + dstOffset); // struct_entry_t doesn't work for some reason
}
}
});

for (int64_t j = 0; j < schema->n_children; j++) {
ArrowConverter::fromArrowArray(schema->children[j], array->children[j],
*StructVector::getFieldVector(&outputVector, j).get(), mask->getChild(j),
Expand All @@ -251,9 +304,11 @@ static void scanArrowArrayDenseUnion(const ArrowSchema* schema, const ArrowArray
auto types = ((const uint8_t*)array->buffers[0]) + srcOffset;
auto dstTypes = (uint16_t*)UnionVector::getTagVector(&outputVector)->getData();
auto offsets = ((const int32_t*)array->buffers[1]) + srcOffset;
mask->copyToValueVector(&outputVector, dstOffset, count);
std::vector<int32_t> firstIncident(array->n_children, INT32_MAX);
for (auto i = 0u; i < count; i++) {

mask->copyToValueVector(&outputVector, dstOffset, count);

rowIter(outputVector, count, [&](auto i) {
auto curType = types[i];
auto curOffset = offsets[i];
if (curOffset < firstIncident[curType]) {
Expand All @@ -268,20 +323,23 @@ static void scanArrowArrayDenseUnion(const ArrowSchema* schema, const ArrowArray
curOffset + array->children[curType]->offset, i + dstOffset, 1);
// may be inefficient, since we're only scanning a single value
}
}
});
}

static void scanArrowArraySparseUnion(const ArrowSchema* schema, const ArrowArray* array,
ValueVector& outputVector, ArrowNullMaskTree* mask, uint64_t srcOffset, uint64_t dstOffset,
uint64_t count) {
auto types = ((const uint8_t*)array->buffers[0]) + srcOffset;
auto dstTypes = (uint16_t*)UnionVector::getTagVector(&outputVector)->getData();

mask->copyToValueVector(&outputVector, dstOffset, count);
for (uint64_t i = 0; i < count; i++) {

rowIter(outputVector, count, [&](auto i) {
if (!mask->isNull(i)) {
dstTypes[i] = types[i];
}
}
});

// it is specified that values that aren't selected in the type buffer
// must also be semantically correct. this is why this scanning works.
// however, there is possibly room for optimization here.
Expand All @@ -299,15 +357,17 @@ static void scanArrowArrayDictionaryEncoded(const ArrowSchema* schema, const Arr
uint64_t count) {

auto values = ((const offsetsT*)array->buffers[1]) + srcOffset;

mask->copyToValueVector(&outputVector, dstOffset, count);
for (uint64_t i = 0; i < count; i++) {

rowIter(outputVector, count, [&](auto i) {
if (!mask->isNull(i)) {
auto dictOffseted = mask->getDictionary()->offsetBy(values[i]);
ArrowConverter::fromArrowArray(schema->dictionary, array->dictionary, outputVector,
&dictOffseted, values[i] + array->dictionary->offset, i + dstOffset,
1); // possibly inefficient?
}
}
});
}

static void scanArrowArrayRunEndEncoded(const ArrowSchema* schema, const ArrowArray* array,
Expand All @@ -332,15 +392,15 @@ static void scanArrowArrayRunEndEncoded(const ArrowSchema* schema, const ArrowAr
}
}

for (uint64_t i = 0; i < count; i++) {
rowIter(outputVector, count, [&](auto i) {
while (i + srcOffset >= runEndBuffer[runEndIdx + 1]) {
runEndIdx++;
}
auto valuesOffseted = mask->getChild(1)->offsetBy(runEndIdx);
ArrowConverter::fromArrowArray(schema->children[1], array->children[1], outputVector,
&valuesOffseted, runEndIdx, i + dstOffset,
1); // there is optimization to be made here...
}
});
}

void ArrowConverter::fromArrowArray(const ArrowSchema* schema, const ArrowArray* array,
Expand Down
Loading