Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ file(GLOB source_cspa
"${test_dir}/cspa.cu"
)

file(GLOB source_file_ccomp
"${test_dir}/cc.cu"
)

add_library(gpu_ra "${source_file_gpu_ra}")
target_compile_features(gpu_ra PUBLIC cxx_std_20)
set_target_properties(gpu_ra PROPERTIES CUDA_SEPARABLE_COMPILATION ON)
Expand All @@ -52,3 +56,8 @@ add_executable(CSPA ${source_cspa})
target_link_libraries(CSPA gpu_ra)
target_compile_features(CSPA PUBLIC cxx_std_20)
set_target_properties(CSPA PROPERTIES CUDA_SEPARABLE_COMPILATION ON)

add_executable(CCOMP ${source_file_ccomp})
target_link_libraries(CCOMP gpu_ra)
target_compile_features(CCOMP PUBLIC cxx_std_20)
set_target_properties(CCOMP PROPERTIES CUDA_SEPARABLE_COMPILATION ON)
1 change: 1 addition & 0 deletions include/relation.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ struct Relation {
// column can be used to store recurisve aggreagtion/choice domain's result,
// these columns can't be used as index columns
int dependent_column_size = 0;
dependency_order dep_pred = nullptr;
bool index_flag = true;

GHashRelContainer *delta;
Expand Down
22 changes: 15 additions & 7 deletions include/tuple.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ using tuple_size_t = u64;
*/
using t_data_internal = u64 *;


typedef void (*tuple_generator_hook) (tuple_type, tuple_type, tuple_type);
typedef void (*tuple_copy_hook) (tuple_type, tuple_type);
typedef bool (*tuple_predicate) (tuple_type) ;
typedef void (*tuple_generator_hook)(tuple_type, tuple_type, tuple_type);
typedef void (*tuple_copy_hook)(tuple_type, tuple_type);
typedef bool (*tuple_predicate)(tuple_type);
typedef bool (*dependency_order)(tuple_type, tuple_type);

// struct tuple_generator_hook {
// __host__ __device__
// __host__ __device__
// void operator()(tuple_type inner, tuple_type outer, tuple_type newt) {};
// };

Expand All @@ -39,7 +39,8 @@ typedef bool (*tuple_predicate) (tuple_type) ;
* @return true
* @return false
*/
__host__ __device__ inline bool tuple_eq(tuple_type t1, tuple_type t2, tuple_size_t l) {
__host__ __device__ inline bool tuple_eq(tuple_type t1, tuple_type t2,
tuple_size_t l) {
for (int i = 0; i < l; i++) {
if (t1[i] != t2[i]) {
return false;
Expand Down Expand Up @@ -96,11 +97,14 @@ struct tuple_indexed_less {
// u64 *index_columns;
tuple_size_t index_column_size;
int arity;
dependency_order dep_order = nullptr;

tuple_indexed_less(tuple_size_t index_column_size, int arity) {
tuple_indexed_less(tuple_size_t index_column_size, int arity,
dependency_order dep_order = nullptr) {
// this->index_columns = index_columns;
this->index_column_size = index_column_size;
this->arity = arity;
this->dep_order = dep_order;
}

__host__ __device__ bool operator()(const tuple_type &lhs,
Expand All @@ -118,6 +122,10 @@ struct tuple_indexed_less {
return false;
}
}
// WARNNING: this may cause array out of bound
if (dep_order != nullptr) {
return dep_order(lhs+arity, rhs+arity);
}
return false;
} else if (prefix_hash(lhs, index_column_size) <
prefix_hash(rhs, index_column_size)) {
Expand Down
168 changes: 137 additions & 31 deletions src/lie.cu
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <thrust/execution_policy.h>
#include <thrust/merge.h>
#include <thrust/set_operations.h>
#include <thrust/unique.h>

#include <variant>

Expand Down Expand Up @@ -45,8 +46,8 @@ void LIE::fixpoint_loop() {
checkCuda(cudaMalloc((void **)&rel->tuple_full,
rel->full->tuple_counts * sizeof(tuple_type)));
checkCuda(cudaMemcpy(rel->tuple_full, rel->full->tuples,
rel->full->tuple_counts * sizeof(tuple_type),
cudaMemcpyDeviceToDevice));
rel->full->tuple_counts * sizeof(tuple_type),
cudaMemcpyDeviceToDevice));
rel->current_full_size = rel->full->tuple_counts;
copy_relation_container(rel->delta, rel->full, grid_size, block_size);
checkCuda(cudaDeviceSynchronize());
Expand Down Expand Up @@ -106,32 +107,33 @@ void LIE::fixpoint_loop() {
rel->delta->tuples = nullptr;
}

if (rel->dep_pred == nullptr) {
timer.start_timer();
if (rel->newt->tuple_counts == 0) {
rel->delta =
new GHashRelContainer(rel->arity, rel->index_column_size,
rel->dependent_column_size);
std::cout << "iteration " << iteration_counter << " relation "
<< rel->name << " no new tuple added" << std::endl;
rel->delta = new GHashRelContainer(
rel->arity, rel->index_column_size,
rel->dependent_column_size);
std::cout << "iteration " << iteration_counter
<< " relation " << rel->name
<< " no new tuple added" << std::endl;
continue;
}
tuple_type *deduplicated_newt_tuples;
u64 deduplicated_newt_tuples_mem_size =
rel->newt->tuple_counts * sizeof(tuple_type);
checkCuda(cudaMalloc((void **)&deduplicated_newt_tuples,
deduplicated_newt_tuples_mem_size));
deduplicated_newt_tuples_mem_size));
checkCuda(cudaMemset(deduplicated_newt_tuples, 0,
deduplicated_newt_tuples_mem_size));
//////
deduplicated_newt_tuples_mem_size));

tuple_type *deuplicated_end = thrust::set_difference(
thrust::device, rel->newt->tuples,
rel->newt->tuples + rel->newt->tuple_counts, rel->tuple_full,
rel->tuple_full + rel->current_full_size,
rel->newt->tuples + rel->newt->tuple_counts,
rel->tuple_full, rel->tuple_full + rel->current_full_size,
deduplicated_newt_tuples,
tuple_indexed_less(rel->full->index_column_size,
rel->full->arity -
rel->dependent_column_size));
rel->full->arity -
rel->dependent_column_size));
checkCuda(cudaDeviceSynchronize());
tuple_size_t deduplicate_size =
deuplicated_end - deduplicated_newt_tuples;
Expand All @@ -146,26 +148,28 @@ void LIE::fixpoint_loop() {
u64 dedeuplicated_raw_mem_size =
deduplicate_size * rel->newt->arity * sizeof(column_type);
checkCuda(cudaMalloc((void **)&deduplicated_raw,
dedeuplicated_raw_mem_size));
checkCuda(cudaMemset(deduplicated_raw, 0, dedeuplicated_raw_mem_size));
dedeuplicated_raw_mem_size));
checkCuda(cudaMemset(deduplicated_raw, 0,
dedeuplicated_raw_mem_size));
flatten_tuples_raw_data<<<grid_size, block_size>>>(
deduplicated_newt_tuples, deduplicated_raw, deduplicate_size,
rel->newt->arity);
deduplicated_newt_tuples, deduplicated_raw,
deduplicate_size, rel->newt->arity);
checkCuda(cudaGetLastError());
checkCuda(cudaDeviceSynchronize());
checkCuda(cudaFree(deduplicated_newt_tuples));

free_relation_container(rel->newt);

timer.start_timer();
rel->delta = new GHashRelContainer(
rel->arity, rel->index_column_size, rel->dependent_column_size);
load_relation_container(rel->delta, rel->full->arity,
deduplicated_raw, deduplicate_size,
rel->full->index_column_size,
rel->full->dependent_column_size,
rel->full->index_map_load_factor, grid_size,
block_size, true, true, true);
rel->delta =
new GHashRelContainer(rel->arity, rel->index_column_size,
rel->dependent_column_size);
load_relation_container(
rel->delta, rel->full->arity, deduplicated_raw,
deduplicate_size, rel->full->index_column_size,
rel->full->dependent_column_size,
rel->full->index_map_load_factor, grid_size, block_size,
true, true, true);
checkCuda(cudaDeviceSynchronize());
timer.stop_timer();
rebuild_delta_time += timer.get_spent_time();
Expand All @@ -176,11 +180,113 @@ void LIE::fixpoint_loop() {
merge_time += timer.get_spent_time();

// print_tuple_rows(rel->full, "Path full after load newt");
std::cout << "iteration " << iteration_counter << " relation "
<< rel->name
<< " finish dedup new tuples : " << deduplicate_size
<< " delta tuple size: " << rel->delta->tuple_counts
<< " full counts " << rel->current_full_size << std::endl;
// std::cout << "iteration " << iteration_counter << " relation
// "
// << rel->name
// << " finish dedup new tuples : " << deduplicate_size
// << " delta tuple size: " << rel->delta->tuple_counts
// << " full counts " << rel->current_full_size <<
// std::endl;
} else {
// recursive aggregation
// merge newt to full directly
tuple_size_t new_full_size =
rel->current_full_size + rel->newt->tuple_counts;
// std::cout << new_full_size << std::endl;
tuple_type *tuple_full_buf;
u64 tuple_full_buf_mem_size =
new_full_size * sizeof(tuple_type);
checkCuda(cudaMalloc((void **)&tuple_full_buf,
tuple_full_buf_mem_size));
checkCuda(
cudaMemset(tuple_full_buf, 0, tuple_full_buf_mem_size));
checkCuda(cudaDeviceSynchronize());

tuple_type *end_tuple_full_buf = thrust::merge(
thrust::device, rel->tuple_full,
rel->tuple_full + rel->current_full_size, rel->newt->tuples,
rel->newt->tuples + rel->newt->tuple_counts, tuple_full_buf,
tuple_indexed_less(rel->full->index_column_size,
rel->full->arity -
rel->dependent_column_size,
rel->dep_pred));
checkCuda(cudaDeviceSynchronize());
// after merge all tuple need aggregation has been gatered
// together a full aggregation will be reduce them, but we need
// monotonicity, so we use in place unique operation, which
// whill keep the first occurence (smallest/largest) of
// aggregated tuples sharing the same non-dependent columns
tuple_type *deduplicated_tuple_full_buf_end = thrust::unique(
thrust::device, tuple_full_buf, end_tuple_full_buf,
t_equal(rel->full->arity - rel->dependent_column_size));
tuple_size_t deduplicated_tuple_full_buf_size =
deduplicated_tuple_full_buf_end - tuple_full_buf;
// then propagate the delta by set difference new and old full
tuple_type *propogated_delta_tuples;
checkCuda(
cudaMalloc((void **)&propogated_delta_tuples,
rel->newt->tuple_counts * sizeof(tuple_type)));
tuple_type *propogated_delta_tuples_end =
thrust::set_difference(
thrust::device, tuple_full_buf,
deduplicated_tuple_full_buf_end, rel->tuple_full,
rel->tuple_full + rel->current_full_size,
propogated_delta_tuples,
tuple_indexed_less(rel->full->index_column_size,
rel->full->arity));
column_type *propogated_delta_raw;
tuple_size_t propogated_delta_size =
propogated_delta_tuples_end - propogated_delta_tuples;
u64 propogated_delta_raw_mem_size = propogated_delta_size *
rel->full->arity *
sizeof(column_type);
checkCuda(cudaMalloc((void **)&propogated_delta_raw,
propogated_delta_raw_mem_size));
flatten_tuples_raw_data<<<grid_size, block_size>>>(
propogated_delta_tuples, propogated_delta_raw,
propogated_delta_size, rel->full->arity);
checkCuda(cudaGetLastError());
checkCuda(cudaDeviceSynchronize());
checkCuda(cudaFree(propogated_delta_tuples));
rel->delta =
new GHashRelContainer(rel->arity, rel->index_column_size,
rel->dependent_column_size);
load_relation_container(
rel->delta, rel->full->arity, propogated_delta_raw,
propogated_delta_size, rel->full->index_column_size,
rel->full->dependent_column_size,
rel->full->index_map_load_factor, grid_size, block_size,
true, true, true);
rel->buffered_delta_vectors.push_back(rel->delta);

// reload full, since merge will cause tuple inside newt
// inserted into full if don't reload full, can't free newt
// this operation need huge buffer for new full
rel->current_full_size = deduplicated_tuple_full_buf_size;
checkCuda(cudaFree(rel->tuple_full));
column_type *new_full_raw_data;
u64 new_full_raw_data_mem_size = rel->current_full_size *
rel->full->arity *
sizeof(column_type);
checkCuda(cudaMalloc((void **)&new_full_raw_data,
new_full_raw_data_mem_size));
checkCuda(cudaMemset(new_full_raw_data, 0,
new_full_raw_data_mem_size));
flatten_tuples_raw_data<<<grid_size, block_size>>>(
rel->tuple_full, new_full_raw_data, rel->current_full_size,
rel->full->arity);
checkCuda(cudaGetLastError());
checkCuda(cudaDeviceSynchronize());
free_relation_container(rel->newt);
load_relation_container(
rel->full, rel->full->arity, new_full_raw_data,
rel->current_full_size, rel->full->index_column_size,
rel->full->dependent_column_size,
rel->full->index_map_load_factor, grid_size, block_size,
true, true, true);
rel->tuple_full = rel->full->tuples;
rel->current_full_size = rel->full->tuple_counts;
}
}
checkCuda(cudaDeviceSynchronize());
std::cout << "Iteration " << iteration_counter << " finish populating"
Expand Down
Loading