Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backend support for lattice in slog #33

Open
wants to merge 39 commits into
base: feature/aggregation
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
58d176b
add negative number support
Oct 3, 2022
acc7b36
Merge pull request #30 from harp-lab/feature/negative_number
kmicinski Oct 6, 2022
a0f26d2
add already compiled check in client
Nov 2, 2022
4c9c255
remove recompile message
Nov 2, 2022
e73f130
Merge pull request #32 from harp-lab/repl_compile_twice
kmicinski Nov 2, 2022
5342dd5
Merge branch 'master' into manual_rec_agg
Nov 8, 2022
9b81caf
single threaded
Nov 11, 2022
357c7c4
use real sssp
Nov 14, 2022
d2c7ed6
add loop
Nov 14, 2022
649a9d7
w
Nov 14, 2022
80f2a14
add loop script to run sssp
Nov 14, 2022
a707d53
w
Nov 15, 2022
058f1dc
fix reorder
Nov 15, 2022
329e941
change to compute multi sssp
Nov 16, 2022
f155e41
delete copy rule
Nov 20, 2022
61b9eba
add debug
Nov 21, 2022
a11ac84
add support for multi dependent column
Nov 23, 2022
336e1a1
add page rank test
Nov 25, 2022
a3efb3a
fix dependent index
Nov 27, 2022
ec9250c
stage
Nov 29, 2022
ab530e6
finish impl page rank
Dec 2, 2022
b88f779
add CC
Dec 5, 2022
5ea4961
add debug info
Dec 9, 2022
fa9941b
freez
Dec 12, 2022
ba89394
local
Jan 5, 2023
f043516
theta gcc + mpich
Dec 28, 2022
13a0a41
add more log
Dec 28, 2022
3c6221e
more stat
Dec 30, 2022
6a68658
more hash function
Jan 4, 2023
61a4d7f
add manual sub rank split
Jan 6, 2023
0370c18
fix bucket in join
Jan 17, 2023
62b721c
add sssp opt
Jan 17, 2023
94043c5
try change insert
Jan 26, 2023
3b929d1
theta gcc + mpich
Dec 28, 2022
08dce3c
add more log
Dec 28, 2022
2b34dcb
more stat
Dec 30, 2022
271e9c4
more hash function
Jan 4, 2023
1c8f027
stage change
Jan 5, 2023
b4d1b7f
add manual sub rank split
Jan 6, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,4 @@ temp-out/
test-input
souffle-out
local/
evaluation
4 changes: 2 additions & 2 deletions backend/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@ find_package(MPI REQUIRED)
# set (CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} ${OpenMP_EXE_LINKER_FLAGS}")
# endif()

set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -std=c++17 -lstdc++fs -Werror=class-memaccess -fpermissive")
set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -std=c++20 -lstdc++fs -Werror=class-memaccess -fpermissive")
set (source_dir "${PROJECT_SOURCE_DIR}/src")
set (tests_dir "${PROJECT_SOURCE_DIR}/tests")
set (data_dir "${PROJECT_SOURCE_DIR}/data")
set (utility_dir "${PROJECT_SOURCE_DIR}/utility")

file (GLOB source_files_parallel_RA "${source_dir}/parallel_RA_inc.h" "${source_dir}/log/logger.cpp" "${source_dir}/hash/hash.h" "${source_dir}/buffer/vector_buffer.cpp" "${source_dir}/comm/comm.cpp" "${source_dir}/relation/shmap_relation_exp.cpp" "${source_dir}/comm/all_to_all_comm.cpp" "${source_dir}/comm/all_to_allv_comm.cpp" "${source_dir}/IO/parallel_io.cpp" "${source_dir}/RA/parallel_join.cpp" "${source_dir}/RA/parallel_agg.cpp" "${source_dir}/comm/intra_bucket_comm.cpp" "${source_dir}/RA/parallel_copy.cpp" "${source_dir}/RA/parallel_copy_filter.cpp" "${source_dir}/RA/parallel_copy_generate.cpp" "${source_dir}/RA/parallel_RA.h" "${source_dir}/RA/parallel_acopy.cpp" "${source_dir}/relation/balanced_hash_relation.cpp" "${source_dir}/relation/relation_load_balancer.cpp" "${source_dir}/RAM/RA_tasks.cpp" "${source_dir}/lie/lie.cpp")
file (GLOB source_files_parallel_RA "${source_dir}/parallel_RA_inc.h" "${source_dir}/log/logger.cpp" "${source_dir}/hash/hash.h" "${source_dir}/hash/hash.cpp" "${source_dir}/hash/xxhash.cpp" "${source_dir}/hash/spooky-c.cpp" "${source_dir}/hash/fashhash.cpp" "${source_dir}/buffer/vector_buffer.cpp" "${source_dir}/comm/comm.cpp" "${source_dir}/relation/shmap_relation_exp.cpp" "${source_dir}/comm/all_to_all_comm.cpp" "${source_dir}/comm/all_to_allv_comm.cpp" "${source_dir}/IO/parallel_io.cpp" "${source_dir}/RA/parallel_join.cpp" "${source_dir}/RA/parallel_agg.cpp" "${source_dir}/comm/intra_bucket_comm.cpp" "${source_dir}/RA/parallel_copy.cpp" "${source_dir}/RA/parallel_copy_filter.cpp" "${source_dir}/RA/parallel_copy_generate.cpp" "${source_dir}/RA/parallel_RA.h" "${source_dir}/RA/parallel_acopy.cpp" "${source_dir}/relation/balanced_hash_relation.cpp" "${source_dir}/relation/relation_load_balancer.cpp" "${source_dir}/RAM/RA_tasks.cpp" "${source_dir}/lie/lie.cpp")
file (GLOB source_files_ata "${tests_dir}/all_to_all_benchmark.cpp")
file (GLOB source_files_tc "${tests_dir}/transitive_closure.cpp")
#file (GLOB source_files_builtin "${tests_dir}/builtin.cpp")
Expand Down
2 changes: 1 addition & 1 deletion backend/src/IO/parallel_io.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ void parallel_io::parallel_read_input_relation_from_file_to_local_buffer(u32 ari

/* Read all data in parallel */
uint64_t read_offset;
read_offset = ceil((float)global_row_count / nprocs) * rank;
read_offset = (int)ceil((float)global_row_count / nprocs) * rank;

if (read_offset > (uint64_t)global_row_count)
{
Expand Down
34 changes: 21 additions & 13 deletions backend/src/RA/parallel_agg.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ void parallel_join_aggregate::local_aggregate(

u32* output_sub_bucket_count = output->get_sub_bucket_per_bucket_count();
u32** output_sub_bucket_rank = output->get_sub_bucket_rank();
int real_join_count = output->get_join_column_count() - 1;
agg_buffer.width[ra_counter] = real_join_count + 1;
u32 real_join_count = output->get_join_column_count();
agg_buffer.width[ra_counter] = output->get_arity();

shmap_relation* agg_target;
if (*(target->get_sub_bucket_per_bucket_count()) == 1) {
Expand All @@ -93,11 +93,10 @@ void parallel_join_aggregate::local_aggregate(
agg_target->insert_tuple_from_array(input0_buffer+k1, target->get_arity()+1);
}
}

btree::btree_map<std::vector<u64>, u64, shmap_relation::t_comparator> res_map;
for (int bucket=0; bucket < buckets; bucket ++) {
for (u32 bucket=0; bucket < buckets; bucket ++) {
for (auto tuple: input->get_full()[bucket]) {
std::vector<u64> data_v(tuple.begin(), tuple.begin()+target->get_join_column_count());
std::vector<u64> data_v(tuple.begin(), tuple.begin()+input->get_join_column_count());
// std::cout << "On rank " << mcomm.get_rank() << " bucket " << *(target->get_sub_bucket_per_bucket_count()) << std::endl;
auto joined_range = agg_target->prefix_range(data_v);
auto agg_data = local_func(joined_range);
Expand All @@ -110,20 +109,29 @@ void parallel_join_aggregate::local_aggregate(
}
}
}

// std::cout << ">>>>>>>>>>>>>>>>>>>>> " << input->get_full()[0].size() << std::endl;

for (int bucket=0; bucket < buckets; bucket ++) {
for (u32 bucket=0; bucket < buckets; bucket ++) {
for (auto input_tuple: input->get_full()[bucket]) {
std::vector<u64> joined_input_tuple(input_tuple.begin(), input_tuple.begin()+input->get_join_column_count());
auto agg_res = res_map[joined_input_tuple];
std::vector<u64> tuple(reorder_mapping.size(), 0);
std::vector<u64> tuple(output->get_arity(), 0);
// std::cout << "wwwwwwwwwwwwwwwwwwwwwwww " << output->get_arity() << std::endl;
int reorder_agg_index = input->get_arity() + 1;
for (int j = 0; j < reorder_mapping.size(); j++) {
if (reorder_mapping[j] == reorder_agg_index) {
tuple[j] = agg_res;
} else {
tuple[j] = input_tuple[reorder_mapping[j]];
}
for (long unsigned int j = 0; j < reorder_mapping.size(); j++) {
// std::cout << reorder_mapping[j] << " " << reorder_agg_index << std::endl;
if (reorder_mapping[j] == reorder_agg_index) {
tuple[j] = agg_res;
} else {
tuple[j] = input_tuple[reorder_mapping[j]];
}
}
// std::cout << "aggregated tuple <<<" << reorder_mapping.size() << " >>> ";
// for (auto c: tuple) {
// std::cout << c << " ";
// }
// std::cout << std::endl;

uint64_t bucket_id = tuple_hash(tuple.data(), output->get_join_column_count()) % buckets;
uint64_t sub_bucket_id = 0;
Expand Down
2 changes: 1 addition & 1 deletion backend/src/RA/parallel_agg.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ class parallel_join_aggregate : public parallel_RA
local_agg_func_t local_func;
reduce_agg_func_t reduce_func;
global_agg_func_t global_func;
std::vector<u64> reorder_mapping;
std::vector<int> reorder_mapping;

parallel_join_aggregate(relation* output, relation* target_rel, relation* input,
int t_type, local_agg_func_t local_agg_func,
Expand Down
1 change: 1 addition & 0 deletions backend/src/RA/parallel_copy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@


#include "../parallel_RA_inc.h"
#include <iostream>

#ifdef GOOGLE_MAP
void parallel_copy::local_copy(u32 buckets, google_relation* input, u32* input_bucket_map, relation* output, std::vector<int> reorder_map, u32 arity, u32 join_column_count, all_to_allv_buffer& copy_buffer, int ra_counter)
Expand Down
160 changes: 134 additions & 26 deletions backend/src/RA/parallel_join.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,30 +6,49 @@


#include "../parallel_RA_inc.h"
#include "mpi.h"
#include <cstddef>
#include <iostream>
#include <vector>


bool parallel_join::local_join(int threshold, int* offset,
int join_order,
u32 buckets,
shmap_relation *input0,
int input0_buffer_size, int input0_buffer_width, u64 *input0_buffer,
shmap_relation *input1, u32 i1_size, int input1_buffer_width,
std::vector<int> reorder_map_array,
relation* output,
relation* input0_rel,
relation* input1_rel,
all_to_allv_buffer& join_buffer,
int counter,
int join_column_count,
u32* global_join_duplicates,
u32* global_join_inserts)
u32* global_join_inserts,
std::vector<double>& time_stat)
{
join_buffer.width[counter] = reorder_map_array.size();

shmap_relation deduplicate(join_column_count, false);
auto out_dep_cols = output->get_dependent_column();
if (out_dep_cols.size() != 0) {
for (size_t i = 0; i < out_dep_cols.size() - 1; i++) {
deduplicate.dependent_column_indices.push_back(out_dep_cols[i]);
}
deduplicate.update_compare_func = output->get_update_compare_func();
}
u32* output_sub_bucket_count = output->get_sub_bucket_per_bucket_count();
u32** output_sub_bucket_rank = output->get_sub_bucket_rank();
// std::cout << "wwwwwwwww " << input0_buffer_size << " " << input0_buffer_size << " " << i1_size << std::endl;

if (*offset > input0_buffer_size || input0_buffer_size == 0 || i1_size == 0)
if (*offset > input0_buffer_size || input0_buffer_size == 0 || i1_size == 0) {
time_stat.push_back(0);
return true;
}

double join_time_total = 0;
int local_join_count=0;
if (join_order == LEFT)
{
Expand All @@ -42,18 +61,24 @@ bool parallel_join::local_join(int threshold, int* offset,
//std::cout << "PREFIX " << input0_buffer[k1 + jc] << std::endl;
}

u64 bucket_id = tuple_hash(input0_buffer + k1, join_column_count) % buckets;

input1[bucket_id].as_all_to_allv_left_join_buffer(
prefix, join_buffer,
input0_buffer + k1,input0_buffer_width,
input1_buffer_width, counter,
buckets, output_sub_bucket_count,
output_sub_bucket_rank, reorder_map_array,
join_column_count, deduplicate,
&local_join_count, global_join_duplicates,
global_join_inserts, output->get_join_column_count(),
output->get_is_canonical());
// u64 bucket_id = tuple_hash(input0_buffer + k1, join_column_count) % buckets;

auto before_actual_join = MPI_Wtime();
for (u32 bucket_id = 0; bucket_id < buckets; bucket_id++) {
input1[bucket_id].as_all_to_allv_left_join_buffer(
prefix, join_buffer,
input0_buffer + k1,input0_buffer_width,
input1_buffer_width, counter,
buckets, output_sub_bucket_count,
output_sub_bucket_rank, reorder_map_array,
join_column_count, deduplicate,
&local_join_count, global_join_duplicates,
global_join_inserts, output->get_join_column_count(),
output->get_is_canonical(),
generator_mode, generator_func);
}
auto after_actual_join = MPI_Wtime();
join_time_total += after_actual_join - before_actual_join;

// std::cout << "local_join_count " << local_join_count << " Threshold " << threshold << " k1 " << k1 << " offset " << *offset << " " << input0_buffer_width << std::endl;
if (local_join_count > threshold)
Expand All @@ -67,24 +92,104 @@ bool parallel_join::local_join(int threshold, int* offset,

else if (join_order == RIGHT)
{
if (input0->dependent_column_indices.size() > 0 && generator_mode) {
// right lattice join
std::vector<std::vector<u64>> input_ts;
std::vector<u64> prev_non_dependent_columns;
for (int k1 = *offset; k1 < input0_buffer_size; k1 = k1 + input0_buffer_width) {
std::vector<u64> cur_non_dependent_columns(
input0_buffer+k1,
input0_buffer+k1+input0_buffer_width-input0->dependent_column_indices.size()
);
// std::vector<u64> prefix;
// for (int jc=0; jc < join_column_count; jc++)
// prefix.push_back(input0_buffer[k1 + jc]);

std::vector<u64> input_t(input0_buffer+k1, input0_buffer+k1+input0_buffer_width);
// std::cout << "LT >>> ";
// for (auto c: input_t) {
// std::cout << c << " ";
// }
// std::cout << std::endl;
if (cur_non_dependent_columns == prev_non_dependent_columns) {
input_ts.push_back(input_t);
} else {
if (input_ts.size() != 0) {
auto before_actual_join = MPI_Wtime();
// u64 bucket_id = tuple_hash(input0_buffer + k1, join_column_count) % buckets;
for (u32 bucket_id = 0; bucket_id < buckets; bucket_id++) {
input1[bucket_id].as_all_to_allv_right_join_buffer(
std::vector<u64>(prev_non_dependent_columns.begin(),
prev_non_dependent_columns.begin()+join_column_count),
join_buffer,
input_ts,
input1_buffer_width, counter,
buckets, output_sub_bucket_count,
output_sub_bucket_rank, reorder_map_array,
join_column_count, deduplicate,
&local_join_count, global_join_duplicates,
global_join_inserts,
output->get_join_column_count(),output->get_is_canonical(),
generator_mode, generator_func);
}
auto after_actual_join = MPI_Wtime();
join_time_total += after_actual_join - before_actual_join;
input_ts.clear();
}
prev_non_dependent_columns = cur_non_dependent_columns;
input_ts.push_back(input_t);
}
}
if (input_ts.size() != 0) {
// u64 bucket_id = tuple_hash(prev_non_dependent_columns.data(), join_column_count) % buckets;
auto before_actual_join = MPI_Wtime();
for (u32 bucket_id = 0; bucket_id < buckets; bucket_id++) {
input1[bucket_id].as_all_to_allv_right_join_buffer(
std::vector<u64>(prev_non_dependent_columns.begin(),
prev_non_dependent_columns.begin()+join_column_count),
join_buffer,
input_ts,
input1_buffer_width, counter,
buckets, output_sub_bucket_count,
output_sub_bucket_rank, reorder_map_array,
join_column_count, deduplicate,
&local_join_count, global_join_duplicates,
global_join_inserts,
output->get_join_column_count(),output->get_is_canonical(),
generator_mode, generator_func);
}
auto after_actual_join = MPI_Wtime();
join_time_total += after_actual_join - before_actual_join;
input_ts.clear();
}
} else {
// original code
for (int k1 = *offset; k1 < input0_buffer_size; k1 = k1 + input0_buffer_width)
{
std::vector<u64> prefix;
for (int jc=0; jc < join_column_count; jc++)
prefix.push_back(input0_buffer[k1 + jc]);

u64 bucket_id = tuple_hash(input0_buffer + k1, join_column_count) % buckets;

input1[bucket_id].as_all_to_allv_right_join_buffer(
prefix, join_buffer,
input0_buffer + k1, input0_buffer_width,
input1_buffer_width, counter,
buckets, output_sub_bucket_count,
output_sub_bucket_rank, reorder_map_array,
join_column_count, deduplicate,
&local_join_count, global_join_duplicates,
global_join_inserts,
output->get_join_column_count(),output->get_is_canonical());
// u64 bucket_id = tuple_hash(input0_buffer + k1, join_column_count) % buckets;
std::vector<std::vector<u64>> input_ts;
input_ts.push_back(std::vector<u64>(input0_buffer+k1, input0_buffer+k1+input0_buffer_width));
auto before_actual_join = MPI_Wtime();
for (u32 bucket_id = 0; bucket_id < buckets; bucket_id++) {
input1[bucket_id].as_all_to_allv_right_join_buffer(
prefix, join_buffer,
// input0_buffer + k1, input0_buffer_width,
input_ts,
input1_buffer_width, counter,
buckets, output_sub_bucket_count,
output_sub_bucket_rank, reorder_map_array,
join_column_count, deduplicate,
&local_join_count, global_join_duplicates,
global_join_inserts,
output->get_join_column_count(),output->get_is_canonical(),
generator_mode, generator_func);
}
auto after_actual_join = MPI_Wtime();
join_time_total += after_actual_join - before_actual_join;

// std::cout << "local_join_count " << local_join_count << " Threshold " << threshold << " k1 " << k1 << " offset " << *offset << " " << input0_buffer_width << std::endl;
if (local_join_count > threshold)
Expand All @@ -95,8 +200,11 @@ bool parallel_join::local_join(int threshold, int* offset,
return false;
}
}

}
}

time_stat.push_back(join_time_total);
deduplicate.remove_tuple();
return true;
}
Loading