Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

Commit

Permalink
[NSE-417] fix sort spill on inplsace sort (#423)
Browse files Browse the repository at this point in the history
* fix sort spill on inplsace sort

Signed-off-by: Yuan Zhou <[email protected]>

* fix format

Signed-off-by: Yuan Zhou <[email protected]>

* adding unit test

Signed-off-by: Yuan Zhou <[email protected]>
  • Loading branch information
zhouyuan authored Jul 23, 2021
1 parent 4da9887 commit fe6c798
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,6 @@ class SortArraysToIndicesVisitorImpl : public ExprVisitorImpl {
}

arrow::Status Spill(int64_t size, int64_t* spilled_size) override {
std::cout << "target size: " << size << std::endl;
RETURN_NOT_OK(kernel_->Spill(size, spilled_size));

if (*spilled_size != 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -839,8 +839,6 @@ extern "C" void MakeCodeGen(arrow::compute::ExecContext* ctx,
if (!spillablecachestore_) {
spillablecachestore_ = std::make_shared<SpillableCacheStore>(cached_in_, schema_);
}
std::cout << "call on: " << spillablecachestore_->GetSpillDir() << "|"
<< is_spilled_ << "\n";
if (is_spilled_) {
// TODO: this should be fixed when spill in sorting
*spilled_size = 0;
Expand Down Expand Up @@ -984,6 +982,13 @@ class SortInplaceKernel : public SortArraysToIndicesKernel::Impl {
return arrow::Status::OK();
}

arrow::Status Spill(int64_t size, int64_t* spilled_size) {
// inplace sort does not support spill
*spilled_size = 0;

return arrow::Status::OK();
}

// This function is used for float/double data without null value.
// If NaN_check_ is true, we need to do partition for NaN before sort.
template <typename TYPE>
Expand Down
95 changes: 91 additions & 4 deletions native-sql-engine/cpp/src/tests/arrow_compute_test_sort.cc
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,95 @@ TEST(TestArrowComputeSort, SortTestInplaceDesc) {
ASSERT_NOT_OK(Equals(*expected_result.get(), *result_batch.get()));
}
}

TEST(TestArrowComputeSort, SortTestInplaceDescWithSpill) {
////////////////////// prepare expr_vector ///////////////////////
auto f0 = field("f0", float64());
auto arg_0 = TreeExprBuilder::MakeField(f0);
auto true_literal = TreeExprBuilder::MakeLiteral(true);
auto false_literal = TreeExprBuilder::MakeLiteral(false);

auto f_res = field("res", uint32());
auto indices_type = std::make_shared<FixedSizeBinaryType>(16);
auto f_indices = field("indices", indices_type);

auto n_key_func = TreeExprBuilder::MakeFunction("key_function", {arg_0}, uint32());
auto n_key_field = TreeExprBuilder::MakeFunction("key_field", {arg_0}, uint32());
auto n_dir =
TreeExprBuilder::MakeFunction("sort_directions", {false_literal}, uint32());
auto n_nulls_order =
TreeExprBuilder::MakeFunction("sort_nulls_order", {false_literal}, uint32());
auto NaN_check = TreeExprBuilder::MakeFunction("NaN_check", {true_literal}, uint32());
auto do_codegen = TreeExprBuilder::MakeFunction("codegen", {false_literal}, uint32());
auto n_sort_to_indices = TreeExprBuilder::MakeFunction(
"sortArraysToIndices",
{n_key_func, n_key_field, n_dir, n_nulls_order, NaN_check, do_codegen}, uint32());
auto n_sort =
TreeExprBuilder::MakeFunction("standalone", {n_sort_to_indices}, uint32());
auto sortArrays_expr = TreeExprBuilder::MakeExpression(n_sort, f_res);

auto sch = arrow::schema({f0});
std::vector<std::shared_ptr<Field>> ret_types = {f0};
///////////////////// Calculation //////////////////
std::shared_ptr<CodeGenerator> sort_expr;
arrow::compute::ExecContext ctx;
ASSERT_NOT_OK(CreateCodeGenerator(ctx.memory_pool(), sch, {sortArrays_expr}, ret_types,
&sort_expr, true));

std::shared_ptr<arrow::RecordBatch> input_batch;
std::vector<std::shared_ptr<arrow::RecordBatch>> input_batch_list;
std::vector<std::shared_ptr<arrow::RecordBatch>> dummy_result_batches;
std::shared_ptr<ResultIteratorBase> sort_result_iterator_base;

std::vector<std::string> input_data_string = {"[10, NaN, 4, 50, 52, 32, 11]"};
MakeInputBatch(input_data_string, sch, &input_batch);
input_batch_list.push_back(input_batch);

std::vector<std::string> input_data_string_2 = {"[1, 14, 43, 42, 6, 45, 2]"};
MakeInputBatch(input_data_string_2, sch, &input_batch);
input_batch_list.push_back(input_batch);

std::vector<std::string> input_data_string_3 = {"[3, 64, NaN, 7, 9, 19, 33]"};
MakeInputBatch(input_data_string_3, sch, &input_batch);
input_batch_list.push_back(input_batch);

std::vector<std::string> input_data_string_4 = {"[23, 17, 41, 18, 20, 35, 30]"};
MakeInputBatch(input_data_string_4, sch, &input_batch);
input_batch_list.push_back(input_batch);

std::vector<std::string> input_data_string_5 = {"[37, 12, 22, 13, 8, 59, 21]"};
MakeInputBatch(input_data_string_5, sch, &input_batch);
input_batch_list.push_back(input_batch);

////////////////////////////////// calculation
//////////////////////////////////////
std::shared_ptr<arrow::RecordBatch> expected_result;
std::vector<std::string> expected_result_string = {
"[NaN, NaN, 64, 59, 52, 50, 45, 43, 42, 41, 37, 35, 33, 32, 30, 23, "
"22, 21, 20, 19, 18, 17, 14, 13, 12, 11, 10, 9, 8, 7, 6, 4, 3, 2, 1]"};
MakeInputBatch(expected_result_string, sch, &expected_result);

for (auto batch : input_batch_list) {
ASSERT_NOT_OK(sort_expr->evaluate(batch, &dummy_result_batches));
}
ASSERT_NOT_OK(sort_expr->finish(&sort_result_iterator_base));

auto sort_result_iterator =
std::dynamic_pointer_cast<ResultIterator<arrow::RecordBatch>>(
sort_result_iterator_base);

std::shared_ptr<arrow::RecordBatch> dummy_result_batch;
std::shared_ptr<arrow::RecordBatch> result_batch;

int64_t size = -1;
if (sort_result_iterator->HasNext()) {
sort_expr->Spill(100, false, &size);
EXPECT_TRUE(size == 0);
ASSERT_NOT_OK(sort_result_iterator->Next(&result_batch));
ASSERT_NOT_OK(Equals(*expected_result.get(), *result_batch.get()));
}
}

TEST(TestArrowComputeSort, SortTestOnekeyNullsFirstAscWithSpill) {
////////////////////// prepare expr_vector ///////////////////////
auto f0 = field("f0", float64());
Expand Down Expand Up @@ -647,15 +736,13 @@ TEST(TestArrowComputeSort, SortTestOnekeyNullsFirstAscWithSpill) {
std::shared_ptr<arrow::RecordBatch> dummy_result_batch;
std::shared_ptr<arrow::RecordBatch> result_batch;
int64_t size;
bool firstspill = true;
auto result_iter = result_batch_list.begin();
while (sort_result_iterator->HasNext()) {
ASSERT_NOT_OK(sort_result_iterator->Next(&result_batch));
sort_expr->Spill(100, false, &size);
// should spill all record batches
EXPECT_TRUE(size == 960);

EXPECT_TRUE(size >= gap);

firstspill = false;
expected_result = *result_iter;
ASSERT_NOT_OK(Equals(*expected_result.get(), *result_batch.get()));
result_iter++;
Expand Down

0 comments on commit fe6c798

Please sign in to comment.