diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 234a66c00d87..1d58528cf705 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -48,6 +48,7 @@ if(POLICY CMP0074) endif() set(ARROW_VERSION "4.0.0") +#add_compile_options(-g -O0) string(REGEX MATCH "^[0-9]+\\.[0-9]+\\.[0-9]+" ARROW_BASE_VERSION "${ARROW_VERSION}") @@ -937,3 +938,7 @@ config_summary_message() if(${ARROW_BUILD_CONFIG_SUMMARY_JSON}) config_summary_json() endif() + + + + diff --git a/cpp/README.md b/cpp/README.md index b083f3fe78e7..9f563149beb7 100644 --- a/cpp/README.md +++ b/cpp/README.md @@ -32,3 +32,17 @@ to install pre-compiled binary versions of the library. Please refer to our latest [C++ Development Documentation][1]. [1]: https://github.com/apache/arrow/blob/master/docs/source/developers/cpp + +## Run parquet string scan benchmark +#### Minimal benchmark build +cd arrow +mkdir -p cpp/debug +cd cpp/debug +cmake -DCMAKE_BUILD_TYPE=Release -DARROW_BUILD_BENCHMARKS=ON -DARROW_WITH_ZLIB=ON -DARROW_JEMALLOC=OFF -DARROW_PARQUET=ON -DARROW_COMPUTE=ON -DARROW_DATASET=ON -DARROW_WITH_SNAPPY=ON -DARROW_FILESYSTEM=ON .. + +#### Run benchmark and collect perf data +cpp/debug +./release/parquet-arrow-parquet-scan-string-benchmark --iterations 10 --threads 1 --file {parquet_path} --cpu 0 & +perf record -e cycles:ppp -C 0 sleep 10 +perf report + diff --git a/cpp/src/parquet/CMakeLists.txt b/cpp/src/parquet/CMakeLists.txt index 3f3ca5a52991..8c3e8633f6fa 100644 --- a/cpp/src/parquet/CMakeLists.txt +++ b/cpp/src/parquet/CMakeLists.txt @@ -399,6 +399,10 @@ add_parquet_benchmark(column_io_benchmark) add_parquet_benchmark(encoding_benchmark) add_parquet_benchmark(level_conversion_benchmark) add_parquet_benchmark(arrow/reader_writer_benchmark PREFIX "parquet-arrow") +add_parquet_benchmark(arrow/parquet_scan_benchmark PREFIX "parquet-arrow") +add_parquet_benchmark(arrow/parquet_scan_string_benchmark PREFIX "parquet-arrow") +add_parquet_benchmark(arrow/parquet_scan_string_benchmark_backtrace PREFIX "parquet-arrow") +add_parquet_benchmark(arrow/parquet_scan_int64_benchmark PREFIX "parquet-arrow") if(ARROW_WITH_BROTLI) add_definitions(-DARROW_WITH_BROTLI) diff --git a/cpp/src/parquet/arrow/parquet_scan_benchmark.cc b/cpp/src/parquet/arrow/parquet_scan_benchmark.cc new file mode 100644 index 000000000000..2ab95e1c380d --- /dev/null +++ b/cpp/src/parquet/arrow/parquet_scan_benchmark.cc @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +#include "arrow/record_batch.h" +#include "parquet/arrow/utils/macros.h" +#include "parquet/arrow/test_utils.h" + + +// namespace parquet { +// namespace benchmark { + +const int batch_buffer_size = 32768; + +class GoogleBenchmarkColumnarToRow { + public: + GoogleBenchmarkColumnarToRow(std::string file_name) { GetRecordBatchReader(file_name); } + + void GetRecordBatchReader(const std::string& input_file) { + std::unique_ptr<::parquet::arrow::FileReader> parquet_reader; + std::shared_ptr record_batch_reader; + + std::shared_ptr fs; + std::string file_name; + ARROW_ASSIGN_OR_THROW(fs, arrow::fs::FileSystemFromUriOrPath(input_file, &file_name)) + + ARROW_ASSIGN_OR_THROW(file, fs->OpenInputFile(file_name)); + + properties.set_batch_size(batch_buffer_size); + properties.set_pre_buffer(false); + properties.set_use_threads(false); + + ASSERT_NOT_OK(::parquet::arrow::FileReader::Make( + arrow::default_memory_pool(), ::parquet::ParquetFileReader::Open(file), + properties, &parquet_reader)); + + ASSERT_NOT_OK(parquet_reader->GetSchema(&schema)); + + auto num_rowgroups = parquet_reader->num_row_groups(); + + for (int i = 0; i < num_rowgroups; ++i) { + row_group_indices.push_back(i); + } + + auto num_columns = schema->num_fields(); + for (int i = 0; i < num_columns; ++i) { + column_indices.push_back(i); + } + } + + virtual void operator()(benchmark::State& state) {} + + protected: + long SetCPU(uint32_t cpuindex) { + cpu_set_t cs; + CPU_ZERO(&cs); + CPU_SET(cpuindex, &cs); + return sched_setaffinity(0, sizeof(cs), &cs); + } + + protected: + std::string file_name; + std::shared_ptr file; + std::vector row_group_indices; + std::vector column_indices; + std::shared_ptr schema; + parquet::ArrowReaderProperties properties; +}; +class GoogleBenchmarkColumnarToRow_CacheScan_Benchmark + : public GoogleBenchmarkColumnarToRow { + public: + GoogleBenchmarkColumnarToRow_CacheScan_Benchmark(std::string filename) + : GoogleBenchmarkColumnarToRow(filename) {} + void operator()(benchmark::State& state) { + if (state.range(0) == 0xffffffff) { + SetCPU(state.thread_index()); + } else { + SetCPU(state.range(0)); + } + + arrow::Compression::type compression_type = (arrow::Compression::type)1; + + std::shared_ptr record_batch; + int64_t elapse_read = 0; + int64_t num_batches = 0; + int64_t num_rows = 0; + int64_t init_time = 0; + int64_t write_time = 0; + + + std::vector local_column_indices = column_indices; + + std::shared_ptr local_schema; + local_schema = std::make_shared(*schema.get()); + + if (state.thread_index() == 0) std::cout << local_schema->ToString() << std::endl; + + for (auto _ : state) { + std::unique_ptr<::parquet::arrow::FileReader> parquet_reader; + std::shared_ptr record_batch_reader; + ASSERT_NOT_OK(::parquet::arrow::FileReader::Make( + ::arrow::default_memory_pool(), ::parquet::ParquetFileReader::Open(file), + properties, &parquet_reader)); + + std::vector> batches; + ASSERT_NOT_OK(parquet_reader->GetRecordBatchReader( + row_group_indices, local_column_indices, &record_batch_reader)); + do { + TIME_NANO_OR_THROW(elapse_read, record_batch_reader->ReadNext(&record_batch)); + + if (record_batch) { + // batches.push_back(record_batch); + num_batches += 1; + num_rows += record_batch->num_rows(); + } + } while (record_batch); + + std::cout << " parquet parse done elapsed time = " << elapse_read / 1000000 + << " rows = " << num_rows << std::endl; + } + + state.counters["rowgroups"] = + benchmark::Counter(row_group_indices.size(), benchmark::Counter::kAvgThreads, + benchmark::Counter::OneK::kIs1000); + state.counters["columns"] = + benchmark::Counter(column_indices.size(), benchmark::Counter::kAvgThreads, + benchmark::Counter::OneK::kIs1000); + state.counters["batches"] = benchmark::Counter( + num_batches, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); + state.counters["num_rows"] = benchmark::Counter( + num_rows, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); + state.counters["batch_buffer_size"] = + benchmark::Counter(batch_buffer_size, benchmark::Counter::kAvgThreads, + benchmark::Counter::OneK::kIs1024); + + state.counters["parquet_parse"] = benchmark::Counter( + elapse_read, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); + state.counters["init_time"] = benchmark::Counter( + init_time, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); + state.counters["write_time"] = benchmark::Counter( + write_time, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); + } +}; + +// } // namespace columnartorow +// } // namespace sparkcolumnarplugin + +int main(int argc, char** argv) { + uint32_t iterations = 1; + uint32_t threads = 1; + std::string datafile; + uint32_t cpu = 0xffffffff; + + for (int i = 0; i < argc; i++) { + if (strcmp(argv[i], "--iterations") == 0) { + iterations = atol(argv[i + 1]); + } else if (strcmp(argv[i], "--threads") == 0) { + threads = atol(argv[i + 1]); + } else if (strcmp(argv[i], "--file") == 0) { + datafile = argv[i + 1]; + } else if (strcmp(argv[i], "--cpu") == 0) { + cpu = atol(argv[i + 1]); + } + } + std::cout << "iterations = " << iterations << std::endl; + std::cout << "threads = " << threads << std::endl; + std::cout << "datafile = " << datafile << std::endl; + std::cout << "cpu = " << cpu << std::endl; + + GoogleBenchmarkColumnarToRow_CacheScan_Benchmark + bck(datafile); + + benchmark::RegisterBenchmark("GoogleBenchmarkColumnarToRow::CacheScan", bck) + ->Args({ + cpu, + }) + ->Iterations(iterations) + ->Threads(threads) + ->ReportAggregatesOnly(false) + ->MeasureProcessCPUTime() + ->Unit(benchmark::kSecond); + + benchmark::Initialize(&argc, argv); + benchmark::RunSpecifiedBenchmarks(); + benchmark::Shutdown(); +} diff --git a/cpp/src/parquet/arrow/parquet_scan_int64_benchmark.cc b/cpp/src/parquet/arrow/parquet_scan_int64_benchmark.cc new file mode 100644 index 000000000000..9d75ad10666d --- /dev/null +++ b/cpp/src/parquet/arrow/parquet_scan_int64_benchmark.cc @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +#include "arrow/record_batch.h" +#include "parquet/arrow/utils/macros.h" +#include "parquet/arrow/test_utils.h" + + +// namespace parquet { +// namespace benchmark { + +const int batch_buffer_size = 32768; + +class GoogleBenchmarkParquetStringScan { + public: + GoogleBenchmarkParquetStringScan(std::string file_name) { GetRecordBatchReader(file_name); } + + void GetRecordBatchReader(const std::string& input_file) { + std::unique_ptr<::parquet::arrow::FileReader> parquet_reader; + std::shared_ptr record_batch_reader; + + std::shared_ptr fs; + std::string file_name; + ARROW_ASSIGN_OR_THROW(fs, arrow::fs::FileSystemFromUriOrPath(input_file, &file_name)) + + ARROW_ASSIGN_OR_THROW(file, fs->OpenInputFile(file_name)); + + properties.set_batch_size(batch_buffer_size); + properties.set_pre_buffer(false); + properties.set_use_threads(false); + + ASSERT_NOT_OK(::parquet::arrow::FileReader::Make( + arrow::default_memory_pool(), ::parquet::ParquetFileReader::Open(file), + properties, &parquet_reader)); + + ASSERT_NOT_OK(parquet_reader->GetSchema(&schema)); + + auto num_rowgroups = parquet_reader->num_row_groups(); + + for (int i = 0; i < num_rowgroups; ++i) { + row_group_indices.push_back(i); + } + + auto num_columns = schema->num_fields(); + std::cout << "Enter Type::INT64 Check: " << std::endl; + for (int i = 0; i < num_columns; ++i) { + auto field = schema->field(i); + auto type = field->type(); + if (type->id() == Type::INT64) { + std::cout << "Type::INT64 colIndex: " << i << std::endl; + column_indices.push_back(i); + } + } + // column_indices.push_back(0); + } + + virtual void operator()(benchmark::State& state) {} + + protected: + long SetCPU(uint32_t cpuindex) { + cpu_set_t cs; + CPU_ZERO(&cs); + CPU_SET(cpuindex, &cs); + return sched_setaffinity(0, sizeof(cs), &cs); + } + + protected: + std::string file_name; + std::shared_ptr file; + std::vector row_group_indices; + std::vector column_indices; + std::shared_ptr schema; + parquet::ArrowReaderProperties properties; +}; +class GoogleBenchmarkParquetStringScan_IteratorScan_Benchmark + : public GoogleBenchmarkParquetStringScan { + public: + GoogleBenchmarkParquetStringScan_IteratorScan_Benchmark(std::string filename) + : GoogleBenchmarkParquetStringScan(filename) {} + void operator()(benchmark::State& state) { + if (state.range(0) == 0xffffffff) { + SetCPU(state.thread_index()); + } else { + SetCPU(state.range(0)); + } + + arrow::Compression::type compression_type = (arrow::Compression::type)1; + + std::shared_ptr record_batch; + int64_t elapse_read = 0; + int64_t num_batches = 0; + int64_t num_rows = 0; + int64_t init_time = 0; + int64_t write_time = 0; + + + std::vector local_column_indices = column_indices; + + for (auto val : local_column_indices){ + std::cout << "local_column_indices: is_binary_like colIndex: " << val << std::endl; + } + + std::shared_ptr local_schema; + local_schema = std::make_shared(*schema.get()); + + if (state.thread_index() == 0) std::cout << local_schema->ToString() << std::endl; + + for (auto _ : state) { + std::unique_ptr<::parquet::arrow::FileReader> parquet_reader; + std::shared_ptr record_batch_reader; + ASSERT_NOT_OK(::parquet::arrow::FileReader::Make( + ::arrow::default_memory_pool(), ::parquet::ParquetFileReader::Open(file), + properties, &parquet_reader)); + + std::vector> batches; + ASSERT_NOT_OK(parquet_reader->GetRecordBatchReader( + row_group_indices, local_column_indices, &record_batch_reader)); + do { + TIME_NANO_OR_THROW(elapse_read, record_batch_reader->ReadNext(&record_batch)); + + if (record_batch) { + // batches.push_back(record_batch); + num_batches += 1; + num_rows += record_batch->num_rows(); + } + } while (record_batch); + + std::cout << " parquet parse done elapsed time = " << elapse_read / 1000000 + << " rows = " << num_rows << std::endl; + } + + state.counters["rowgroups"] = + benchmark::Counter(row_group_indices.size(), benchmark::Counter::kAvgThreads, + benchmark::Counter::OneK::kIs1000); + state.counters["columns"] = + benchmark::Counter(column_indices.size(), benchmark::Counter::kAvgThreads, + benchmark::Counter::OneK::kIs1000); + state.counters["batches"] = benchmark::Counter( + num_batches, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); + state.counters["num_rows"] = benchmark::Counter( + num_rows, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); + state.counters["batch_buffer_size"] = + benchmark::Counter(batch_buffer_size, benchmark::Counter::kAvgThreads, + benchmark::Counter::OneK::kIs1024); + + state.counters["parquet_parse"] = benchmark::Counter( + elapse_read, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); + state.counters["init_time"] = benchmark::Counter( + init_time, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); + state.counters["write_time"] = benchmark::Counter( + write_time, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); + } +}; + +// } // namespace ParquetStringScan +// } // namespace sparkcolumnarplugin + +int main(int argc, char** argv) { + uint32_t iterations = 1; + uint32_t threads = 1; + std::string datafile; + uint32_t cpu = 0xffffffff; + + for (int i = 0; i < argc; i++) { + if (strcmp(argv[i], "--iterations") == 0) { + iterations = atol(argv[i + 1]); + } else if (strcmp(argv[i], "--threads") == 0) { + threads = atol(argv[i + 1]); + } else if (strcmp(argv[i], "--file") == 0) { + datafile = argv[i + 1]; + } else if (strcmp(argv[i], "--cpu") == 0) { + cpu = atol(argv[i + 1]); + } + } + std::cout << "iterations = " << iterations << std::endl; + std::cout << "threads = " << threads << std::endl; + std::cout << "datafile = " << datafile << std::endl; + std::cout << "cpu = " << cpu << std::endl; + + GoogleBenchmarkParquetStringScan_IteratorScan_Benchmark + bck(datafile); + + benchmark::RegisterBenchmark("GoogleBenchmarkParquetStringScan::IteratorScan", bck) + ->Args({ + cpu, + }) + ->Iterations(iterations) + ->Threads(threads) + ->ReportAggregatesOnly(false) + ->MeasureProcessCPUTime() + ->Unit(benchmark::kSecond); + + benchmark::Initialize(&argc, argv); + benchmark::RunSpecifiedBenchmarks(); + benchmark::Shutdown(); +} diff --git a/cpp/src/parquet/arrow/parquet_scan_string_benchmark.cc b/cpp/src/parquet/arrow/parquet_scan_string_benchmark.cc new file mode 100644 index 000000000000..58763e2edfee --- /dev/null +++ b/cpp/src/parquet/arrow/parquet_scan_string_benchmark.cc @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +#include "arrow/record_batch.h" +#include "parquet/arrow/utils/macros.h" +#include "parquet/arrow/test_utils.h" + + +// namespace parquet { +// namespace benchmark { + +const int batch_buffer_size = 32768; + +class GoogleBenchmarkParquetStringScan { + public: + GoogleBenchmarkParquetStringScan(std::string file_name) { GetRecordBatchReader(file_name); } + + void GetRecordBatchReader(const std::string& input_file) { + std::unique_ptr<::parquet::arrow::FileReader> parquet_reader; + std::shared_ptr record_batch_reader; + + std::shared_ptr fs; + std::string file_name; + ARROW_ASSIGN_OR_THROW(fs, arrow::fs::FileSystemFromUriOrPath(input_file, &file_name)) + + ARROW_ASSIGN_OR_THROW(file, fs->OpenInputFile(file_name)); + + properties.set_batch_size(batch_buffer_size); + properties.set_pre_buffer(false); + properties.set_use_threads(false); + + ASSERT_NOT_OK(::parquet::arrow::FileReader::Make( + arrow::default_memory_pool(), ::parquet::ParquetFileReader::Open(file), + properties, &parquet_reader)); + + ASSERT_NOT_OK(parquet_reader->GetSchema(&schema)); + + auto num_rowgroups = parquet_reader->num_row_groups(); + + for (int i = 0; i < num_rowgroups; ++i) { + row_group_indices.push_back(i); + } + + auto num_columns = schema->num_fields(); + std::cout << "Enter Is_binary_like Check: " << std::endl; + for (int i = 0; i < num_columns; ++i) { + auto field = schema->field(i); + auto type = field->type(); + if (arrow::is_binary_like(type->id())) { + std::cout << "Is_binary_like colIndex: " << i << std::endl; + column_indices.push_back(i); + } + } + } + + virtual void operator()(benchmark::State& state) {} + + protected: + long SetCPU(uint32_t cpuindex) { + cpu_set_t cs; + CPU_ZERO(&cs); + CPU_SET(cpuindex, &cs); + return sched_setaffinity(0, sizeof(cs), &cs); + } + + protected: + std::string file_name; + std::shared_ptr file; + std::vector row_group_indices; + std::vector column_indices; + std::shared_ptr schema; + parquet::ArrowReaderProperties properties; +}; +class GoogleBenchmarkParquetStringScan_IteratorScan_Benchmark + : public GoogleBenchmarkParquetStringScan { + public: + GoogleBenchmarkParquetStringScan_IteratorScan_Benchmark(std::string filename) + : GoogleBenchmarkParquetStringScan(filename) {} + void operator()(benchmark::State& state) { + if (state.range(0) == 0xffffffff) { + SetCPU(state.thread_index()); + } else { + SetCPU(state.range(0)); + } + + arrow::Compression::type compression_type = (arrow::Compression::type)1; + + std::shared_ptr record_batch; + int64_t elapse_read = 0; + int64_t num_batches = 0; + int64_t num_rows = 0; + int64_t init_time = 0; + int64_t write_time = 0; + + + std::vector local_column_indices = column_indices; + + for (auto val : local_column_indices){ + std::cout << "local_column_indices: is_binary_like colIndex: " << val << std::endl; + } + + std::shared_ptr local_schema; + local_schema = std::make_shared(*schema.get()); + + if (state.thread_index() == 0) std::cout << local_schema->ToString() << std::endl; + + for (auto _ : state) { + std::unique_ptr<::parquet::arrow::FileReader> parquet_reader; + std::shared_ptr record_batch_reader; + ASSERT_NOT_OK(::parquet::arrow::FileReader::Make( + ::arrow::default_memory_pool(), ::parquet::ParquetFileReader::Open(file), + properties, &parquet_reader)); + + std::vector> batches; + ASSERT_NOT_OK(parquet_reader->GetRecordBatchReader( + row_group_indices, local_column_indices, &record_batch_reader)); + do { + TIME_NANO_OR_THROW(elapse_read, record_batch_reader->ReadNext(&record_batch)); + + if (record_batch) { + // batches.push_back(record_batch); + num_batches += 1; + num_rows += record_batch->num_rows(); + } + } while (record_batch); + + std::cout << " parquet parse done elapsed time = " << elapse_read / 1000000 + << " rows = " << num_rows << std::endl; + } + + state.counters["rowgroups"] = + benchmark::Counter(row_group_indices.size(), benchmark::Counter::kAvgThreads, + benchmark::Counter::OneK::kIs1000); + state.counters["columns"] = + benchmark::Counter(column_indices.size(), benchmark::Counter::kAvgThreads, + benchmark::Counter::OneK::kIs1000); + state.counters["batches"] = benchmark::Counter( + num_batches, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); + state.counters["num_rows"] = benchmark::Counter( + num_rows, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); + state.counters["batch_buffer_size"] = + benchmark::Counter(batch_buffer_size, benchmark::Counter::kAvgThreads, + benchmark::Counter::OneK::kIs1024); + + state.counters["parquet_parse"] = benchmark::Counter( + elapse_read, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); + state.counters["init_time"] = benchmark::Counter( + init_time, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); + state.counters["write_time"] = benchmark::Counter( + write_time, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); + } +}; + +// } // namespace ParquetStringScan +// } // namespace sparkcolumnarplugin + +int main(int argc, char** argv) { + uint32_t iterations = 1; + uint32_t threads = 1; + std::string datafile; + uint32_t cpu = 0xffffffff; + + for (int i = 0; i < argc; i++) { + if (strcmp(argv[i], "--iterations") == 0) { + iterations = atol(argv[i + 1]); + } else if (strcmp(argv[i], "--threads") == 0) { + threads = atol(argv[i + 1]); + } else if (strcmp(argv[i], "--file") == 0) { + datafile = argv[i + 1]; + } else if (strcmp(argv[i], "--cpu") == 0) { + cpu = atol(argv[i + 1]); + } + } + std::cout << "iterations = " << iterations << std::endl; + std::cout << "threads = " << threads << std::endl; + std::cout << "datafile = " << datafile << std::endl; + std::cout << "cpu = " << cpu << std::endl; + + GoogleBenchmarkParquetStringScan_IteratorScan_Benchmark + bck(datafile); + + benchmark::RegisterBenchmark("GoogleBenchmarkParquetStringScan::IteratorScan", bck) + ->Args({ + cpu, + }) + ->Iterations(iterations) + ->Threads(threads) + ->ReportAggregatesOnly(false) + ->MeasureProcessCPUTime() + ->Unit(benchmark::kSecond); + + benchmark::Initialize(&argc, argv); + benchmark::RunSpecifiedBenchmarks(); + benchmark::Shutdown(); +} diff --git a/cpp/src/parquet/arrow/parquet_scan_string_benchmark_backtrace.cc b/cpp/src/parquet/arrow/parquet_scan_string_benchmark_backtrace.cc new file mode 100644 index 000000000000..03b63efe1149 --- /dev/null +++ b/cpp/src/parquet/arrow/parquet_scan_string_benchmark_backtrace.cc @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#define BOOST_STACKTRACE_USE_ADDR2LINE +#include + +#include + +#include "arrow/record_batch.h" +#include "parquet/arrow/utils/macros.h" +#include "parquet/arrow/test_utils.h" + + +// namespace parquet { +// namespace benchmark { + +const int batch_buffer_size = 32768; + +class GoogleBenchmarkParquetStringScan { + public: + GoogleBenchmarkParquetStringScan(std::string file_name) { GetRecordBatchReader(file_name); } + + void GetRecordBatchReader(const std::string& input_file) { + std::unique_ptr<::parquet::arrow::FileReader> parquet_reader; + std::shared_ptr record_batch_reader; + + std::shared_ptr fs; + std::string file_name; + ARROW_ASSIGN_OR_THROW(fs, arrow::fs::FileSystemFromUriOrPath(input_file, &file_name)) + + ARROW_ASSIGN_OR_THROW(file, fs->OpenInputFile(file_name)); + + properties.set_batch_size(batch_buffer_size); + properties.set_pre_buffer(false); + properties.set_use_threads(false); + + ASSERT_NOT_OK(::parquet::arrow::FileReader::Make( + arrow::default_memory_pool(), ::parquet::ParquetFileReader::Open(file), + properties, &parquet_reader)); + + ASSERT_NOT_OK(parquet_reader->GetSchema(&schema)); + + auto num_rowgroups = parquet_reader->num_row_groups(); + + for (int i = 0; i < num_rowgroups; ++i) { + row_group_indices.push_back(i); + } + + auto num_columns = schema->num_fields(); + std::cout << "Enter Is_binary_like Check: " << std::endl; + // for (int i = 0; i < num_columns; ++i) { + // auto field = schema->field(i); + // auto type = field->type(); + // if (arrow::is_binary_like(type->id())) { + // std::cout << "Is_binary_like colIndex: " << i << std::endl; + // column_indices.push_back(i); + // } + // } + column_indices.push_back(15); + } + + virtual void operator()(benchmark::State& state) {} + + protected: + long SetCPU(uint32_t cpuindex) { + cpu_set_t cs; + CPU_ZERO(&cs); + CPU_SET(cpuindex, &cs); + return sched_setaffinity(0, sizeof(cs), &cs); + } + + protected: + std::string file_name; + std::shared_ptr file; + std::vector row_group_indices; + std::vector column_indices; + std::shared_ptr schema; + parquet::ArrowReaderProperties properties; +}; +class GoogleBenchmarkParquetStringScan_IteratorScan_Benchmark + : public GoogleBenchmarkParquetStringScan { + public: + GoogleBenchmarkParquetStringScan_IteratorScan_Benchmark(std::string filename) + : GoogleBenchmarkParquetStringScan(filename) {} + void operator()(benchmark::State& state) { + if (state.range(0) == 0xffffffff) { + SetCPU(state.thread_index()); + } else { + SetCPU(state.range(0)); + } + + arrow::Compression::type compression_type = (arrow::Compression::type)1; + + std::shared_ptr record_batch; + int64_t elapse_read = 0; + int64_t num_batches = 0; + int64_t num_rows = 0; + int64_t init_time = 0; + int64_t write_time = 0; + + + std::vector local_column_indices = column_indices; + + for (auto val : local_column_indices){ + std::cout << "local_column_indices: is_binary_like colIndex: " << val << std::endl; + } + + std::shared_ptr local_schema; + local_schema = std::make_shared(*schema.get()); + + if (state.thread_index() == 0) std::cout << local_schema->ToString() << std::endl; + + for (auto _ : state) { + std::unique_ptr<::parquet::arrow::FileReader> parquet_reader; + std::shared_ptr record_batch_reader; + ASSERT_NOT_OK(::parquet::arrow::FileReader::Make( + ::arrow::default_memory_pool(), ::parquet::ParquetFileReader::Open(file), + properties, &parquet_reader)); + + std::vector> batches; + ASSERT_NOT_OK(parquet_reader->GetRecordBatchReader( + row_group_indices, local_column_indices, &record_batch_reader)); + // do { + TIME_NANO_OR_THROW(elapse_read, record_batch_reader->ReadNext(&record_batch)); + + if (record_batch) { + // batches.push_back(record_batch); + num_batches += 1; + num_rows += record_batch->num_rows(); + } + // } while (record_batch); + + std::cout << " parquet parse done elapsed time = " << elapse_read / 1000000 + << " rows = " << num_rows << std::endl; + } + + state.counters["rowgroups"] = + benchmark::Counter(row_group_indices.size(), benchmark::Counter::kAvgThreads, + benchmark::Counter::OneK::kIs1000); + state.counters["columns"] = + benchmark::Counter(column_indices.size(), benchmark::Counter::kAvgThreads, + benchmark::Counter::OneK::kIs1000); + state.counters["batches"] = benchmark::Counter( + num_batches, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); + state.counters["num_rows"] = benchmark::Counter( + num_rows, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); + state.counters["batch_buffer_size"] = + benchmark::Counter(batch_buffer_size, benchmark::Counter::kAvgThreads, + benchmark::Counter::OneK::kIs1024); + + state.counters["parquet_parse"] = benchmark::Counter( + elapse_read, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); + state.counters["init_time"] = benchmark::Counter( + init_time, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); + state.counters["write_time"] = benchmark::Counter( + write_time, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); + } +}; + +// } // namespace ParquetStringScan +// } // namespace sparkcolumnarplugin + +int main(int argc, char** argv) { + uint32_t iterations = 1; + uint32_t threads = 1; + std::string datafile; + uint32_t cpu = 0xffffffff; + + for (int i = 0; i < argc; i++) { + if (strcmp(argv[i], "--iterations") == 0) { + iterations = atol(argv[i + 1]); + } else if (strcmp(argv[i], "--threads") == 0) { + threads = atol(argv[i + 1]); + } else if (strcmp(argv[i], "--file") == 0) { + datafile = argv[i + 1]; + } else if (strcmp(argv[i], "--cpu") == 0) { + cpu = atol(argv[i + 1]); + } + } + std::cout << "iterations = " << iterations << std::endl; + std::cout << "threads = " << threads << std::endl; + std::cout << "datafile = " << datafile << std::endl; + std::cout << "cpu = " << cpu << std::endl; + + GoogleBenchmarkParquetStringScan_IteratorScan_Benchmark + bck(datafile); + + benchmark::RegisterBenchmark("GoogleBenchmarkParquetStringScan::IteratorScan", bck) + ->Args({ + cpu, + }) + ->Iterations(iterations) + ->Threads(threads) + ->ReportAggregatesOnly(false) + ->MeasureProcessCPUTime() + ->Unit(benchmark::kSecond); + + benchmark::Initialize(&argc, argv); + benchmark::RunSpecifiedBenchmarks(); + benchmark::Shutdown(); +} diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc index 016ceacb0ef6..2d44c589e5e8 100644 --- a/cpp/src/parquet/arrow/reader.cc +++ b/cpp/src/parquet/arrow/reader.cc @@ -22,6 +22,9 @@ #include #include #include +#include +#include +#include #include "arrow/array.h" #include "arrow/buffer.h" @@ -43,6 +46,7 @@ #include "parquet/metadata.h" #include "parquet/properties.h" #include "parquet/schema.h" +#include "parquet/types.h" using arrow::Array; using arrow::ArrayData; @@ -129,7 +133,8 @@ std::shared_ptr> VectorToSharedSet( // Forward declaration Status GetReader(const SchemaField& field, const std::shared_ptr& context, - std::unique_ptr* out); + std::unique_ptr* out, + Metrics *metrics); // ---------------------------------------------------------------------- // FileReaderImpl forward declaration @@ -142,6 +147,17 @@ class FileReaderImpl : public FileReader { reader_(std::move(reader)), reader_properties_(std::move(properties)) {} + ~FileReaderImpl() { + std::time_t t = std::time(nullptr); + // std::cout << std::put_time(std::localtime(&t), "%Y-%m-%d %H:%M:%S") << std::endl; + + std::cout << std::put_time(std::localtime(&t), "%Y-%m-%d %H:%M:%S") << " ~FileReaderImpl(): " << std::endl; + std::cout << std::put_time(std::localtime(&t), "%Y-%m-%d %H:%M:%S") << " metrics_.elapse_page_read: " << metrics_.elapse_page_read << std::endl; + std::cout << std::put_time(std::localtime(&t), "%Y-%m-%d %H:%M:%S") << " metrics_.elapse_decompress: " << metrics_.elapse_decompress << std::endl; + std::cout << std::put_time(std::localtime(&t), "%Y-%m-%d %H:%M:%S") << " metrics_.dict_elapse_array_build: " << metrics_.dict_elapse_array_build << std::endl; + std::cout << std::put_time(std::localtime(&t), "%Y-%m-%d %H:%M:%S") << " metrics_.plain_elapse_array_build: " << metrics_.plain_elapse_array_build << std::endl; + } + Status Init() { return SchemaManifest::Make(reader_->metadata()->schema(), reader_->metadata()->key_value_metadata(), @@ -199,20 +215,22 @@ class FileReaderImpl : public FileReader { Status GetFieldReader(int i, const std::shared_ptr>& included_leaves, const std::vector& row_groups, - std::unique_ptr* out) { + std::unique_ptr* out, + Metrics *metrics) { auto ctx = std::make_shared(); ctx->reader = reader_.get(); ctx->pool = pool_; ctx->iterator_factory = SomeRowGroupsFactory(row_groups); ctx->filter_leaves = true; ctx->included_leaves = included_leaves; - return GetReader(manifest_.schema_fields[i], ctx, out); + return GetReader(manifest_.schema_fields[i], ctx, out, metrics); } Status GetFieldReaders(const std::vector& column_indices, const std::vector& row_groups, std::vector>* out, - std::shared_ptr<::arrow::Schema>* out_schema) { + std::shared_ptr<::arrow::Schema>* out_schema, + Metrics *metrics) { // We only need to read schema fields which have columns indicated // in the indices vector ARROW_ASSIGN_OR_RAISE(std::vector field_indices, @@ -225,8 +243,9 @@ class FileReaderImpl : public FileReader { for (size_t i = 0; i < out->size(); ++i) { std::unique_ptr reader; RETURN_NOT_OK( - GetFieldReader(field_indices[i], included_leaves, row_groups, &reader)); + GetFieldReader(field_indices[i], included_leaves, row_groups, &reader, metrics)); + // reader->set_metrics(&metrics_); out_fields[i] = reader->field(); out->at(i) = std::move(reader); } @@ -252,7 +271,7 @@ class FileReaderImpl : public FileReader { std::vector row_groups = Iota(reader_->metadata()->num_row_groups()); std::unique_ptr reader; - RETURN_NOT_OK(GetFieldReader(i, included_leaves, row_groups, &reader)); + RETURN_NOT_OK(GetFieldReader(i, included_leaves, row_groups, &reader, NULLPTR)); return ReadColumn(i, row_groups, reader.get(), out); } @@ -340,6 +359,15 @@ class FileReaderImpl : public FileReader { return Status::OK(); END_PARQUET_CATCH_EXCEPTIONS } + + Metrics metrics_ = {0, + 0, + 0, + 0, + 0, + 0, + 0, + }; MemoryPool* pool_; std::unique_ptr reader_; @@ -414,13 +442,15 @@ class LeafReader : public ColumnReaderImpl { public: LeafReader(std::shared_ptr ctx, std::shared_ptr field, std::unique_ptr input, - ::parquet::internal::LevelInfo leaf_info) + ::parquet::internal::LevelInfo leaf_info, Metrics *metrics) : ctx_(std::move(ctx)), field_(std::move(field)), input_(std::move(input)), - descr_(input_->descr()) { + descr_(input_->descr()), + metrics_(metrics) { record_reader_ = RecordReader::Make( descr_, leaf_info, ctx_->pool, field_->type()->id() == ::arrow::Type::DICTIONARY); + record_reader_->set_metrics(metrics); NextRowGroup(); } @@ -468,10 +498,14 @@ class LeafReader : public ColumnReaderImpl { const std::shared_ptr field() override { return field_; } + + Metrics *metrics_; + private: std::shared_ptr out_; void NextRowGroup() { std::unique_ptr page_reader = input_->NextChunk(); + page_reader->set_metrics(record_reader_->metrics_); record_reader_->SetPageReader(std::move(page_reader)); } @@ -791,7 +825,8 @@ Status StructReader::BuildArray(int64_t length_upper_bound, Status GetReader(const SchemaField& field, const std::shared_ptr& arrow_field, const std::shared_ptr& ctx, - std::unique_ptr* out) { + std::unique_ptr* out, + Metrics *metrics) { BEGIN_PARQUET_CATCH_EXCEPTIONS auto type_id = arrow_field->type()->id(); @@ -799,7 +834,7 @@ Status GetReader(const SchemaField& field, const std::shared_ptr& arrow_f if (type_id == ::arrow::Type::EXTENSION) { auto storage_field = arrow_field->WithType( checked_cast(*arrow_field->type()).storage_type()); - RETURN_NOT_OK(GetReader(field, storage_field, ctx, out)); + RETURN_NOT_OK(GetReader(field, storage_field, ctx, out, metrics)); out->reset(new ExtensionReader(arrow_field, std::move(*out))); return Status::OK(); } @@ -814,14 +849,14 @@ Status GetReader(const SchemaField& field, const std::shared_ptr& arrow_f } std::unique_ptr input( ctx->iterator_factory(field.column_index, ctx->reader)); - out->reset(new LeafReader(ctx, arrow_field, std::move(input), field.level_info)); + out->reset(new LeafReader(ctx, arrow_field, std::move(input), field.level_info, metrics)); } else if (type_id == ::arrow::Type::LIST || type_id == ::arrow::Type::MAP || type_id == ::arrow::Type::FIXED_SIZE_LIST || type_id == ::arrow::Type::LARGE_LIST) { auto list_field = arrow_field; auto child = &field.children[0]; std::unique_ptr child_reader; - RETURN_NOT_OK(GetReader(*child, ctx, &child_reader)); + RETURN_NOT_OK(GetReader(*child, ctx, &child_reader, metrics)); if (child_reader == nullptr) { *out = nullptr; return Status::OK(); @@ -850,7 +885,7 @@ Status GetReader(const SchemaField& field, const std::shared_ptr& arrow_f std::vector> child_readers; for (const auto& child : field.children) { std::unique_ptr child_reader; - RETURN_NOT_OK(GetReader(child, ctx, &child_reader)); + RETURN_NOT_OK(GetReader(child, ctx, &child_reader, metrics)); if (!child_reader) { // If all children were pruned, then we do not try to read this field continue; @@ -876,8 +911,9 @@ Status GetReader(const SchemaField& field, const std::shared_ptr& arrow_f } Status GetReader(const SchemaField& field, const std::shared_ptr& ctx, - std::unique_ptr* out) { - return GetReader(field, field.field, ctx, out); + std::unique_ptr* out, + Metrics *metrics) { + return GetReader(field, field.field, ctx, out, metrics); } } // namespace @@ -898,7 +934,7 @@ Status FileReaderImpl::GetRecordBatchReader(const std::vector& row_groups, std::vector> readers; std::shared_ptr<::arrow::Schema> batch_schema; - RETURN_NOT_OK(GetFieldReaders(column_indices, row_groups, &readers, &batch_schema)); + RETURN_NOT_OK(GetFieldReaders(column_indices, row_groups, &readers, &batch_schema, &metrics_)); if (readers.empty()) { // Just generate all batches right now; they're cheap since they have no columns. @@ -977,7 +1013,7 @@ Status FileReaderImpl::GetColumn(int i, FileColumnIteratorFactory iterator_facto ctx->iterator_factory = iterator_factory; ctx->filter_leaves = false; std::unique_ptr result; - RETURN_NOT_OK(GetReader(manifest_.schema_fields[i], ctx, &result)); + RETURN_NOT_OK(GetReader(manifest_.schema_fields[i], ctx, &result, &metrics_)); out->reset(result.release()); return Status::OK(); } @@ -998,7 +1034,7 @@ Status FileReaderImpl::ReadRowGroups(const std::vector& row_groups, std::vector> readers; std::shared_ptr<::arrow::Schema> result_schema; - RETURN_NOT_OK(GetFieldReaders(column_indices, row_groups, &readers, &result_schema)); + RETURN_NOT_OK(GetFieldReaders(column_indices, row_groups, &readers, &result_schema, &metrics_)); ::arrow::ChunkedArrayVector columns(readers.size()); RETURN_NOT_OK(::arrow::internal::OptionalParallelFor( diff --git a/cpp/src/parquet/arrow/reader.h b/cpp/src/parquet/arrow/reader.h index 4e75b25a4aef..1809cb230712 100644 --- a/cpp/src/parquet/arrow/reader.h +++ b/cpp/src/parquet/arrow/reader.h @@ -20,6 +20,7 @@ #include #include #include +#include #include "parquet/file_reader.h" #include "parquet/platform.h" @@ -256,6 +257,10 @@ class PARQUET_EXPORT ColumnReader { public: virtual ~ColumnReader() = default; + // virtual ~ColumnReader() { + // std::cout << "~ColumnReader() destructor" << std::endl; + // } + // Scan the next array of the indicated size. The actual size of the // returned array may be less than the passed size depending how much data is // available in the file. diff --git a/cpp/src/parquet/arrow/test_utils.h b/cpp/src/parquet/arrow/test_utils.h new file mode 100644 index 000000000000..d3afa459dfdf --- /dev/null +++ b/cpp/src/parquet/arrow/test_utils.h @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +#include "utils/macros.h" +using namespace arrow; + +using TreeExprBuilder = gandiva::TreeExprBuilder; +using FunctionNode = gandiva::FunctionNode; + +#define ASSERT_NOT_OK(status) \ + do { \ + ::arrow::Status __s = (status); \ + if (!__s.ok()) { \ + throw std::runtime_error(__s.message()); \ + } \ + } while (false); + +#define ARROW_ASSIGN_OR_THROW_IMPL(status_name, lhs, rexpr) \ + do { \ + auto status_name = (rexpr); \ + auto __s = status_name.status(); \ + if (!__s.ok()) { \ + throw std::runtime_error(__s.message()); \ + } \ + lhs = std::move(status_name).ValueOrDie(); \ + } while (false); + +#define ARROW_ASSIGN_OR_THROW_NAME(x, y) ARROW_CONCAT(x, y) + +#define ARROW_ASSIGN_OR_THROW(lhs, rexpr) \ + ARROW_ASSIGN_OR_THROW_IMPL(ARROW_ASSIGN_OR_THROW_NAME(_error_or_value, __COUNTER__), \ + lhs, rexpr); + +template +Status Equals(const T& expected, const T& actual) { + if (expected.Equals(actual)) { + return arrow::Status::OK(); + } + std::stringstream pp_expected; + std::stringstream pp_actual; + ::arrow::PrettyPrintOptions options(/*indent=*/2); + options.window = 50; + ASSERT_NOT_OK(PrettyPrint(expected, options, &pp_expected)); + ASSERT_NOT_OK(PrettyPrint(actual, options, &pp_actual)); + if (pp_expected.str() == pp_actual.str()) { + return arrow::Status::OK(); + } + return Status::Invalid("Expected RecordBatch is ", pp_expected.str(), " with schema ", + expected.schema()->ToString(), ", while actual is ", + pp_actual.str(), " with schema ", actual.schema()->ToString()); +} + +void MakeInputBatch(std::vector input_data, + std::shared_ptr sch, + std::shared_ptr* input_batch) { + // prepare input record Batch + std::vector> array_list; + int length = -1; + int i = 0; + for (auto data : input_data) { + std::shared_ptr a0; + ASSERT_NOT_OK(arrow::ipc::internal::json::ArrayFromJSON(sch->field(i++)->type(), + data.c_str(), &a0)); + if (length == -1) { + length = a0->length(); + } + assert(length == a0->length()); + array_list.push_back(a0); + } + + *input_batch = RecordBatch::Make(sch, length, array_list); + return; +} + +void ConstructNullInputBatch(std::shared_ptr* null_batch) { + std::vector> columns; + arrow::Int64Builder builder1; + builder1.AppendNull(); + builder1.Append(1); + + arrow::Int64Builder builder2; + builder2.Append(1); + builder2.AppendNull(); + + std::shared_ptr array1; + builder1.Finish(&array1); + std::shared_ptr array2; + builder2.Finish(&array2); + + columns.push_back(array1); + columns.push_back(array2); + + std::vector> schema_vec{ + arrow::field("col1", arrow::int64()), + arrow::field("col2", arrow::int64()), + }; + + std::shared_ptr schema{std::make_shared(schema_vec)}; + *null_batch = arrow::RecordBatch::Make(schema, 2, columns); + return; +} diff --git a/cpp/src/parquet/arrow/utils/exception.h b/cpp/src/parquet/arrow/utils/exception.h new file mode 100644 index 000000000000..582903d0ef0f --- /dev/null +++ b/cpp/src/parquet/arrow/utils/exception.h @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +class JniPendingException : public std::runtime_error { + public: + explicit JniPendingException(const std::string& arg) : runtime_error(arg) {} +}; \ No newline at end of file diff --git a/cpp/src/parquet/arrow/utils/macros.h b/cpp/src/parquet/arrow/utils/macros.h new file mode 100644 index 000000000000..ef4937adbe1c --- /dev/null +++ b/cpp/src/parquet/arrow/utils/macros.h @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +#include + +#include "parquet/arrow/utils/exception.h" + +// extern int64_t elapse_page_read = 0; +// extern int64_t elapse_decompress = 0; +// extern int64_t elapse_decode = 0; +// extern int64_t elapse_array_build = 0; + + +#define TIME_NANO_DIFF(finish, start) \ + (finish.tv_sec - start.tv_sec) * 1000000000 + (finish.tv_nsec - start.tv_nsec) + +#define TIME_MICRO_OR_RAISE(time, expr) \ + do { \ + auto start = std::chrono::steady_clock::now(); \ + auto __s = (expr); \ + if (!__s.ok()) { \ + return __s; \ + } \ + auto end = std::chrono::steady_clock::now(); \ + time += std::chrono::duration_cast(end - start).count(); \ + } while (false); + +#define TIME_MICRO_OR_THROW(time, expr) \ + do { \ + auto start = std::chrono::steady_clock::now(); \ + auto __s = (expr); \ + if (!__s.ok()) { \ + throw JniPendingException(__s.message()); \ + } \ + auto end = std::chrono::steady_clock::now(); \ + time += std::chrono::duration_cast(end - start).count(); \ + } while (false); + +#define TIME_MICRO(time, res, expr) \ + do { \ + auto start = std::chrono::steady_clock::now(); \ + res = (expr); \ + auto end = std::chrono::steady_clock::now(); \ + time += std::chrono::duration_cast(end - start).count(); \ + } while (false); + +#define TIME_NANO_OR_RAISE(time, expr) \ + do { \ + auto start = std::chrono::steady_clock::now(); \ + auto __s = (expr); \ + if (!__s.ok()) { \ + return __s; \ + } \ + auto end = std::chrono::steady_clock::now(); \ + time += std::chrono::duration_cast(end - start).count(); \ + } while (false); + +#define TIME_NANO_OR_THROW(time, expr) \ + do { \ + auto start = std::chrono::steady_clock::now(); \ + auto __s = (expr); \ + if (!__s.ok()) { \ + throw JniPendingException(__s.message()); \ + } \ + auto end = std::chrono::steady_clock::now(); \ + time += std::chrono::duration_cast(end - start).count(); \ + } while (false); + +#define VECTOR_PRINT(v, name) \ + std::cout << "[" << name << "]:"; \ + for (int i = 0; i < v.size(); i++) { \ + if (i != v.size() - 1) \ + std::cout << v[i] << ","; \ + else \ + std::cout << v[i]; \ + } \ + std::cout << std::endl; + +#define THROW_NOT_OK(expr) \ + do { \ + auto __s = (expr); \ + if (!__s.ok()) { \ + throw JniPendingException(__s.message()); \ + } \ + } while (false); + +#define TIME_TO_STRING(time) \ + (time > 10000 ? time / 1000 : time) << (time > 10000 ? " ms" : " us") + +#define TIME_NANO_TO_STRING(time) \ + (time > 1e7 ? time / 1e6 : ((time > 1e4) ? time / 1e3 : time)) \ + << (time > 1e7 ? "ms" : (time > 1e4 ? "us" : "ns")) diff --git a/cpp/src/parquet/column_reader.cc b/cpp/src/parquet/column_reader.cc index ec205f3d3f93..955204a3e054 100644 --- a/cpp/src/parquet/column_reader.cc +++ b/cpp/src/parquet/column_reader.cc @@ -52,12 +52,21 @@ #include "parquet/thrift_internal.h" // IWYU pragma: keep // Required after "arrow/util/int_util_internal.h" (for OPTIONAL) #include "parquet/windows_compatibility.h" +#include "parquet/arrow/utils/macros.h" using arrow::MemoryPool; using arrow::internal::AddWithOverflow; using arrow::internal::checked_cast; using arrow::internal::MultiplyWithOverflow; +int64_t elapse_page_read = 0; +int64_t elapse_decompress = 0; +int64_t elapse_decode = 0; +int64_t elapse_array_build = 0; +int64_t plain_elapse_array_build = 0; +int64_t plain_elapse_buffer_memcpy = 0; +int64_t dict_elapse_buffer_memcpy = 0; + namespace BitUtil = arrow::BitUtil; namespace parquet { @@ -370,7 +379,12 @@ std::shared_ptr SerializedPageReader::NextPage() { } // Read the compressed data page. + auto start = std::chrono::steady_clock::now(); PARQUET_ASSIGN_OR_THROW(auto page_buffer, stream_->Read(compressed_len)); + auto end = std::chrono::steady_clock::now(); + metrics_->elapse_page_read += std::chrono::duration_cast(end - start).count(); + // std::cout << "metrics_->elapse_page_read: " << metrics_->elapse_page_read << std::endl; + if (page_buffer->size() != compressed_len) { std::stringstream ss; ss << "Page was smaller (" << page_buffer->size() << ") than expected (" @@ -483,6 +497,8 @@ std::shared_ptr SerializedPageReader::DecompressIfNeeded( PARQUET_THROW_NOT_OK(decompression_buffer_->Resize(uncompressed_len, false)); } + auto startd = std::chrono::steady_clock::now(); + if (levels_byte_len > 0) { // First copy the levels as-is uint8_t* decompressed = decompression_buffer_->mutable_data(); @@ -495,6 +511,10 @@ std::shared_ptr SerializedPageReader::DecompressIfNeeded( uncompressed_len - levels_byte_len, decompression_buffer_->mutable_data() + levels_byte_len)); + auto endd = std::chrono::steady_clock::now(); + metrics_->elapse_decompress += std::chrono::duration_cast(endd - startd).count(); + // std::cout << "elapse_decompress: " << metrics_->elapse_decompress << std::endl; + return decompression_buffer_; } @@ -537,7 +557,8 @@ class ColumnReaderImplBase { virtual ~ColumnReaderImplBase() = default; - protected: +// protected: + public: // Read up to batch_size values from the current data page into the // pre-allocated memory T* // @@ -590,7 +611,7 @@ class ColumnReaderImplBase { } // Advance to the next data page - bool ReadNewPage() { + virtual bool ReadNewPage() { // Loop until we find the next data page. while (true) { current_page_ = pager_->NextPage(); @@ -1109,6 +1130,42 @@ class TypedRecordReader : public ColumnReaderImplBase, Reset(); } + // Advance to the next data page + bool ReadNewPage() { + // Loop until we find the next data page. + while (true) { + this->current_page_ = this->pager_->NextPage(); + if (!this->current_page_) { + // EOS + return false; + } + + if (this->current_page_->type() == PageType::DICTIONARY_PAGE) { + this->ConfigureDictionary(static_cast(this->current_page_.get())); + continue; + } else if (this->current_page_->type() == PageType::DATA_PAGE) { + const auto page = std::static_pointer_cast(this->current_page_); + const int64_t levels_byte_size = this->InitializeLevelDecoders( + *page, page->repetition_level_encoding(), page->definition_level_encoding()); + this->InitializeDataDecoder(*page, levels_byte_size); + this->current_decoder_->set_metrics(metrics_); + return true; + } else if (this->current_page_->type() == PageType::DATA_PAGE_V2) { + const auto page = std::static_pointer_cast(this->current_page_); + int64_t levels_byte_size = this->InitializeLevelDecodersV2(*page); + this->InitializeDataDecoder(*page, levels_byte_size); + this->current_decoder_->set_metrics(metrics_); + return true; + } else { + // We don't know what this page type is. We're allowed to skip non-data + // pages. + continue; + } + } + this->current_decoder_->set_metrics(metrics_); + return true; + } + int64_t available_values_current_page() const { return this->num_buffered_values_ - this->num_decoded_values_; } diff --git a/cpp/src/parquet/column_reader.h b/cpp/src/parquet/column_reader.h index a73bba6cb4e9..702aaada59ea 100644 --- a/cpp/src/parquet/column_reader.h +++ b/cpp/src/parquet/column_reader.h @@ -21,6 +21,7 @@ #include #include #include +#include #include "parquet/exception.h" #include "parquet/level_conversion.h" @@ -112,6 +113,12 @@ class PARQUET_EXPORT PageReader { virtual std::shared_ptr NextPage() = 0; virtual void set_max_page_header_size(uint32_t size) = 0; + + void set_metrics(Metrics *metrics) { + metrics_ = metrics; + } + + Metrics *metrics_; }; class PARQUET_EXPORT ColumnReader { @@ -219,6 +226,10 @@ class RecordReader { virtual ~RecordReader() = default; + // virtual ~RecordReader() { + // std::cout << "~RecordReader() destructor" << std::endl; + // } + /// \brief Attempt to read indicated number of records from column chunk /// \return number of records read virtual int64_t ReadRecords(int64_t num_records) = 0; @@ -281,6 +292,13 @@ class RecordReader { /// \brief True if reading directly as Arrow dictionary-encoded bool read_dictionary() const { return read_dictionary_; } + + void set_metrics(Metrics *metrics) { + metrics_ = metrics; + } + + Metrics *metrics_; + protected: bool nullable_values_; diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index eeeff1c8f9b7..f1046c1c4cba 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -48,6 +48,17 @@ #include "parquet/schema.h" #include "parquet/types.h" +#include +#include "parquet/arrow/utils/macros.h" + +extern int64_t elapse_page_read; +extern int64_t elapse_decompress; +extern int64_t elapse_decode; +extern int64_t elapse_array_build; +extern int64_t plain_elapse_array_build; +extern int64_t plain_elapse_buffer_memcpy; +extern int64_t dict_elapse_buffer_memcpy; + namespace BitUtil = arrow::BitUtil; using arrow::Status; @@ -1054,10 +1065,15 @@ inline int DecodePlain(const uint8_t* data, int64_t data_size, int num_values, if (bytes_to_decode > data_size || bytes_to_decode > INT_MAX) { ParquetException::EofException(); } + // auto start = std::chrono::steady_clock::now(); // If bytes_to_decode == 0, data could be null if (bytes_to_decode > 0) { memcpy(out, data, bytes_to_decode); } + // auto end = std::chrono::steady_clock::now(); + // metrics_->plain_elapse_buffer_memcpy += std::chrono::duration_cast(end - start).count(); + // std::cout << "PlainDecoder DecodeArrowDense: plain_elapse_buffer_memcpy: " << metrics_->plain_elapse_buffer_memcpy << std::endl; + return static_cast(bytes_to_decode); } @@ -1363,6 +1379,8 @@ class PlainByteArrayDecoder : public PlainDecoder, RETURN_NOT_OK(helper.builder->ReserveData( std::min(len_, helper.chunk_space_remaining))); + auto start = std::chrono::steady_clock::now(); + int i = 0; RETURN_NOT_OK(VisitNullBitmapInline( valid_bits, valid_bits_offset, num_values, null_count, @@ -1398,6 +1416,10 @@ class PlainByteArrayDecoder : public PlainDecoder, return Status::OK(); })); + auto end = std::chrono::steady_clock::now(); + metrics_->plain_elapse_array_build += std::chrono::duration_cast(end - start).count(); + // std::cout << "PlainDecoder DecodeArrowDense: plain_elapse_array_build: " << metrics_->plain_elapse_array_build << std::endl; + num_values_ -= values_decoded; *out_values_decoded = values_decoded; return Status::OK(); @@ -1496,12 +1518,17 @@ class DictDecoderImpl : public DecoderImpl, virtual public DictDecoder { int DecodeSpaced(T* buffer, int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset) override { num_values = std::min(num_values, num_values_); + auto start = std::chrono::steady_clock::now(); if (num_values != idx_decoder_.GetBatchWithDictSpaced( reinterpret_cast(dictionary_->data()), dictionary_length_, buffer, num_values, null_count, valid_bits, valid_bits_offset)) { ParquetException::EofException(); } + auto end = std::chrono::steady_clock::now(); + metrics_->dict_elapse_buffer_memcpy += std::chrono::duration_cast(end - start).count(); + // std::cout << "PlainDecoder DecodeSpaced: dict_elapse_buffer_memcpy: " << metrics_->dict_elapse_buffer_memcpy << std::endl; + num_values_ -= num_values; return num_values; } @@ -1635,6 +1662,7 @@ void DictDecoderImpl::SetDict(TypedDecoder* dictio /*shrink_to_fit=*/false)); int32_t offset = 0; + // auto start = std::chrono::steady_clock::now(); uint8_t* bytes_data = byte_array_data_->mutable_data(); int32_t* bytes_offsets = reinterpret_cast(byte_array_offsets_->mutable_data()); @@ -1644,6 +1672,10 @@ void DictDecoderImpl::SetDict(TypedDecoder* dictio dict_values[i].ptr = bytes_data + offset; offset += dict_values[i].len; } + // auto end = std::chrono::steady_clock::now(); + // metrics_->elapse_decode += std::chrono::duration_cast(end - start).count(); + // std::cout << "ByteArrayType :elapse_decode: " << metrics_->elapse_decode << std::endl; + bytes_offsets[dictionary_length_] = offset; } @@ -1656,6 +1688,7 @@ inline void DictDecoderImpl::SetDict(TypedDecoder* dictionar int fixed_len = descr_->type_length(); int total_size = dictionary_length_ * fixed_len; + // auto start = std::chrono::steady_clock::now(); PARQUET_THROW_NOT_OK(byte_array_data_->Resize(total_size, /*shrink_to_fit=*/false)); uint8_t* bytes_data = byte_array_data_->mutable_data(); @@ -1663,6 +1696,10 @@ inline void DictDecoderImpl::SetDict(TypedDecoder* dictionar memcpy(bytes_data + offset, dict_values[i].ptr, fixed_len); dict_values[i].ptr = bytes_data + offset; } + // auto end = std::chrono::steady_clock::now(); + // metrics_->elapse_decode += std::chrono::duration_cast(end - start).count(); + // std::cout << "FLBAType :elapse_decode: " << metrics_->elapse_decode << std::endl; + } template <> @@ -1871,6 +1908,7 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl, auto dict_values = reinterpret_cast(dictionary_->data()); int values_decoded = 0; int num_appended = 0; + auto start = std::chrono::steady_clock::now(); while (num_appended < num_values) { bool is_valid = bit_reader.IsSet(); bit_reader.Next(); @@ -1916,6 +1954,13 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl, ++num_appended; } } + auto end = std::chrono::steady_clock::now(); + metrics_->dict_elapse_array_build += std::chrono::duration_cast(end - start).count(); + // std::cout << "DecodeArrowDense:dict_elapse_array_build: " << metrics_->dict_elapse_array_build << std::endl; + + // elapse_array_build += std::chrono::duration_cast(end - start).count(); + // std::cout << "DecodeArrowDense:elapse_array_build: " << elapse_array_build << std::endl; + *out_num_values = values_decoded; return Status::OK(); } @@ -1930,6 +1975,7 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl, ArrowBinaryHelper helper(out); auto dict_values = reinterpret_cast(dictionary_->data()); + auto start = std::chrono::steady_clock::now(); while (values_decoded < num_values) { int32_t batch_size = std::min(kBufferSize, num_values - values_decoded); int num_indices = idx_decoder_.GetBatch(indices, batch_size); @@ -1945,6 +1991,14 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl, } values_decoded += num_indices; } + auto end = std::chrono::steady_clock::now(); + metrics_->dict_elapse_array_build += std::chrono::duration_cast(end - start).count(); + // std::cout << "DecodeArrowDenseNonNull: dict_elapse_array_build: " << metrics_->dict_elapse_array_build << std::endl; + + // elapse_array_build += std::chrono::duration_cast(end - start).count(); + // std::cout << "DecodeArrowDenseNonNull: elapse_array_build: " << elapse_array_build << std::endl; + + *out_num_values = values_decoded; return Status::OK(); } diff --git a/cpp/src/parquet/encoding.h b/cpp/src/parquet/encoding.h index a3d8e012b6a5..cc07efb698b2 100644 --- a/cpp/src/parquet/encoding.h +++ b/cpp/src/parquet/encoding.h @@ -28,6 +28,17 @@ #include "parquet/platform.h" #include "parquet/types.h" +// struct Metrics +// { +// int64_t elapse_page_read = 0; +// int64_t elapse_decompress = 0; +// int64_t elapse_decode = 0; +// int64_t elapse_array_build = 0; +// int64_t plain_elapse_array_build = 0; +// int64_t plain_elapse_buffer_memcpy = 0; +// int64_t dict_elapse_buffer_memcpy = 0; +// }; + namespace arrow { class Array; @@ -262,6 +273,12 @@ class Decoder { // the number of values left in this page. virtual int values_left() const = 0; virtual Encoding::type encoding() const = 0; + + void set_metrics(Metrics *metrics) { + metrics_ = metrics; + } + + Metrics *metrics_; }; template diff --git a/cpp/src/parquet/types.h b/cpp/src/parquet/types.h index f3d3abfc918c..928e2fb144a7 100644 --- a/cpp/src/parquet/types.h +++ b/cpp/src/parquet/types.h @@ -29,6 +29,28 @@ #include "parquet/platform.h" #include "parquet/type_fwd.h" +// struct Metrics +// { +// int64_t elapse_page_read = 0; +// int64_t elapse_decompress = 0; +// int64_t elapse_decode = 0; +// int64_t dict_elapse_array_build = 0; +// int64_t plain_elapse_array_build = 0; +// int64_t dict_elapse_buffer_memcpy = 0; +// int64_t plain_elapse_buffer_memcpy = 0; +// }; + +struct Metrics +{ + int64_t elapse_page_read; + int64_t elapse_decompress; + int64_t elapse_decode; + int64_t dict_elapse_array_build; + int64_t plain_elapse_array_build; + int64_t dict_elapse_buffer_memcpy; + int64_t plain_elapse_buffer_memcpy; +}; + namespace arrow { namespace util {