Skip to content

Commit

Permalink
fix: reset aggregator (#1784)
Browse files Browse the repository at this point in the history
* bugfix: reset aggregator

* add ut
  • Loading branch information
zhanghaohit authored May 10, 2022
1 parent 0e94f70 commit f105a59
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 3 deletions.
5 changes: 4 additions & 1 deletion hybridse/src/vm/aggregator.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,10 @@ class Aggregator : public BaseAggregator {
}

Row Output() override {
return OutputInternal();
auto row = OutputInternal();
// reset the aggregator
Reset();
return row;
}

void Reset() override {
Expand Down
2 changes: 0 additions & 2 deletions hybridse/src/vm/runner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3051,8 +3051,6 @@ std::shared_ptr<TableHandler> RequestAggUnionRunner::RequestUnionWindow(

window_table->AddRow(start, aggregator_->Output());
DLOG(INFO) << "REQUEST AGG UNION cnt = " << window_table->GetCount();
// reset the aggregator
aggregator_->Reset();
return window_table;
}

Expand Down
100 changes: 100 additions & 0 deletions src/cmd/sql_cmd_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -619,6 +619,106 @@ void PrepareRequestRowForLongWindow(const std::string& base_db, const std::strin
ASSERT_TRUE(req->Build());
}

TEST_P(DBSDKTest, DeployLongWindowsEmpty) {
auto cli = GetParam();
cs = cli->cs;
sr = cli->sr;
::hybridse::sdk::Status status;
sr->ExecuteSQL("SET @@execute_mode='online';", &status);
std::string base_table = "t_lw" + GenRand();
std::string base_db = "d_lw" + GenRand();
bool ok;
std::string msg;
CreateDBTableForLongWindow(base_db, base_table);

std::string deploy_sql = "deploy test_aggr options(LONG_WINDOWS='w1:2') select col1, col2,"
" sum(i64_col) over w1 as w1_sum_i64_col,"
" sum(i16_col) over w1 as w1_sum_i16_col,"
" sum(i32_col) over w1 as w1_sum_i32_col,"
" sum(f_col) over w1 as w1_sum_f_col,"
" sum(d_col) over w1 as w1_sum_d_col,"
" sum(t_col) over w1 as w1_sum_t_col,"
" sum(col3) over w2 as w2_sum_col3"
" from " + base_table +
" WINDOW w1 AS (PARTITION BY " + base_table + ".col1," + base_table + ".col2 ORDER BY col3"
" ROWS_RANGE BETWEEN 5 PRECEDING AND CURRENT ROW), "
" w2 AS (PARTITION BY col1,col2 ORDER BY i64_col"
" ROWS BETWEEN 6 PRECEDING AND CURRENT ROW);";
sr->ExecuteSQL(base_db, "use " + base_db + ";", &status);
ASSERT_TRUE(status.IsOK()) << status.msg;
sr->ExecuteSQL(base_db, deploy_sql, &status);
ASSERT_TRUE(status.IsOK()) << status.msg;

std::string pre_aggr_db = openmldb::nameserver::PRE_AGG_DB;
std::string result_sql = "select * from pre_test_aggr_w1_sum_i64_col;";
auto rs = sr->ExecuteSQL(pre_aggr_db, result_sql, &status);
ASSERT_EQ(0, rs->Size());

result_sql = "select * from pre_test_aggr_w1_sum_i16_col;";
rs = sr->ExecuteSQL(pre_aggr_db, result_sql, &status);
ASSERT_EQ(0, rs->Size());

result_sql = "select * from pre_test_aggr_w1_sum_i32_col;";
rs = sr->ExecuteSQL(pre_aggr_db, result_sql, &status);
ASSERT_EQ(0, rs->Size());

result_sql = "select * from pre_test_aggr_w1_sum_f_col;";
rs = sr->ExecuteSQL(pre_aggr_db, result_sql, &status);
ASSERT_EQ(0, rs->Size());

result_sql = "select * from pre_test_aggr_w1_sum_d_col;";
rs = sr->ExecuteSQL(pre_aggr_db, result_sql, &status);
ASSERT_EQ(0, rs->Size());

result_sql = "select * from pre_test_aggr_w1_sum_t_col;";
rs = sr->ExecuteSQL(pre_aggr_db, result_sql, &status);
ASSERT_EQ(0, rs->Size());

int req_num = 2;
for (int i = 0; i < req_num; i++) {
std::shared_ptr<sdk::SQLRequestRow> req;
PrepareRequestRowForLongWindow(base_db, "test_aggr", req);
auto res = sr->CallProcedure(base_db, "test_aggr", req, &status);
ASSERT_TRUE(status.IsOK());
ASSERT_EQ(1, res->Size());
ASSERT_TRUE(res->Next());
ASSERT_EQ("str1", res->GetStringUnsafe(0));
ASSERT_EQ("str2", res->GetStringUnsafe(1));
int64_t exp = 11;
ASSERT_EQ(exp, res->GetInt64Unsafe(2));
ASSERT_EQ(exp, res->GetInt16Unsafe(3));
ASSERT_EQ(exp, res->GetInt32Unsafe(4));
ASSERT_EQ(exp, res->GetFloatUnsafe(5));
ASSERT_EQ(exp, res->GetDoubleUnsafe(6));
ASSERT_EQ(exp, res->GetTimeUnsafe(7));
ASSERT_EQ(exp, res->GetInt64Unsafe(8));
}

ASSERT_TRUE(cs->GetNsClient()->DropProcedure(base_db, "test_aggr", msg));
std::string pre_aggr_table = "pre_test_aggr_w1_sum_i64_col";
ok = sr->ExecuteDDL(pre_aggr_db, "drop table " + pre_aggr_table + ";", &status);
ASSERT_TRUE(ok);
pre_aggr_table = "pre_test_aggr_w1_sum_i16_col";
ok = sr->ExecuteDDL(pre_aggr_db, "drop table " + pre_aggr_table + ";", &status);
ASSERT_TRUE(ok);
pre_aggr_table = "pre_test_aggr_w1_sum_i32_col";
ok = sr->ExecuteDDL(pre_aggr_db, "drop table " + pre_aggr_table + ";", &status);
ASSERT_TRUE(ok);
pre_aggr_table = "pre_test_aggr_w1_sum_f_col";
ok = sr->ExecuteDDL(pre_aggr_db, "drop table " + pre_aggr_table + ";", &status);
ASSERT_TRUE(ok);
pre_aggr_table = "pre_test_aggr_w1_sum_d_col";
ok = sr->ExecuteDDL(pre_aggr_db, "drop table " + pre_aggr_table + ";", &status);
ASSERT_TRUE(ok);
pre_aggr_table = "pre_test_aggr_w1_sum_t_col";
ok = sr->ExecuteDDL(pre_aggr_db, "drop table " + pre_aggr_table + ";", &status);
ASSERT_TRUE(ok);
ok = sr->ExecuteDDL(base_db, "drop table " + base_table + ";", &status);
ASSERT_TRUE(ok);
ok = sr->DropDB(base_db, &status);
ASSERT_TRUE(ok);
}

TEST_P(DBSDKTest, DeployLongWindowsExecuteSum) {
auto cli = GetParam();
cs = cli->cs;
Expand Down

0 comments on commit f105a59

Please sign in to comment.