Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
it will create a new rows window for lag like functions
  • Loading branch information
aceforeverd committed May 5, 2022
1 parent 63f4ff4 commit 54874fc
Show file tree
Hide file tree
Showing 10 changed files with 250 additions and 262 deletions.
89 changes: 84 additions & 5 deletions cases/function/function/test_udaf_function.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2353,7 +2353,7 @@ cases:

- id: 57
desc: |
correctness for at/lag when offset out-of-range window frame bound.
correctness for at/lag when offset out-of-range rows_range window frame bound.
keynote, lag returns value evaluated at the row that is offset rows before the current row within the partition.
refer https://github.com/4paradigm/OpenMLDB/issues/1554
inputs:
Expand Down Expand Up @@ -2393,7 +2393,7 @@ cases:
- id: 58
desc: |
correctness for at/lag when offset out-of-range window frame bound, together with other window function.
correctness for at/lag when offset out-of-range rows_range window frame bound, together with other window function.
refer https://github.com/4paradigm/OpenMLDB/issues/1554
# FIXME(ace): request/cluster run do not meet the expects
mode: request-unsupport,cluster-unsupport
Expand All @@ -2407,9 +2407,9 @@ cases:
3, 1612130402000, g1, 3
4, 1612130403000, g1, 4
5, 1612130404000, g1, 5
6, 1612130404000, g2, 4
7, 1612130405000, g2, 3
8, 1612130406000, g2, 2
6, 1612130405000, g2, 4
7, 1612130406000, g2, 3
8, 1612130407000, g2, 2
sql: |
select
`id`,
Expand All @@ -2431,3 +2431,82 @@ cases:
6, 4, 4, NULL, NULL
7, 3, 3, NULL, 4
8, 2, 2, NULL, 3
- id: 59
desc: |
correctness for at/lag when offset out-of-range window frame bound.
keynote, lag returns value evaluated at the row that is offset rows before the current row within the partition.
refer https://github.com/4paradigm/OpenMLDB/issues/1554
inputs:
- columns: [ "id int","ts timestamp","group1 string","val1 int" ]
indexs: [ "index1:group1:ts" ]
name: t1
data: |
1, 1612130400000, g1, 1
2, 1612130401000, g1, 2
3, 1612130402000, g1, 3
4, 1612130403000, g1, 4
5, 1612130404000, g1, 5
6, 1612130405000, g2, 4
7, 1612130406000, g2, 3
8, 1612130407000, g2, 2
sql: |
select
`id`,
`val1`,
lag(val1, 0) over w1 as agg1,
lag(val1, 1) over w1 as agg2,
lag(val1, 3) over w1 as agg3
from `t1` WINDOW
w1 as (partition by `group1` order by `ts` rows between 2 preceding and 1 preceding);
expect:
columns: ["id int", "val1 int", "agg1 int", "agg2 int", "agg3 int"]
order: id
data: |
1, 1, 1, NULL, NULL
2, 2, 2, 1, NULL
3, 3, 3, 2, NULL
4, 4, 4, 3, 1
5, 5, 5, 4, 2
6, 4, 4, NULL, NULL
7, 3, 3, 4, NULL
8, 2, 2, 3, NULL
- id: 60
desc: |
correctness for at/lag when offset out-of-range rows window frame bound
refer https://github.com/4paradigm/OpenMLDB/issues/1554
inputs:
- columns: [ "id int","ts timestamp","group1 string","val1 int" ]
indexs: [ "index1:group1:ts" ]
name: t1
data: |
1, 1612130400000, g1, 1
2, 1612130401000, g1, 2
3, 1612130402000, g1, 3
4, 1612130403000, g1, 4
5, 1612130404000, g1, 5
6, 1612130405000, g2, 4
7, 1612130406000, g2, 3
8, 1612130407000, g2, 2
sql: |
select
`id`,
`val1`,
lag(val1, 0) over w1 as agg1,
lag(val1, 3) over w1 as agg2,
first_value(val1) over w1 as agg3
from `t1` WINDOW
w1 as (partition by `group1` order by `ts` rows between 2 preceding and 1 preceding);
expect:
columns: ["id int", "val1 int", "agg1 int", "agg2 int", "agg3 int"]
order: id
data: |
1, 1, 1, NULL, NULL
2, 2, 2, NULL, 1
3, 3, 3, NULL, 2
4, 4, 4, 1, 3
5, 5, 5, 2, 4
6, 4, 4, NULL, NULL
7, 3, 3, NULL, 4
8, 2, 2, NULL, 3
1 change: 1 addition & 0 deletions hybridse/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ list(
absl::synchronization
absl::time
absl::status
absl::statusor
)

find_package(ICU COMPONENTS i18n io uc data)
Expand Down
5 changes: 2 additions & 3 deletions hybridse/include/node/node_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,7 @@ class NodeManager {
PlanNode *MakeMultiPlanNode(const PlanType &type);
PlanNode *MakeMergeNode(int column_size);
WindowPlanNode *MakeWindowPlanNode(int w_id);
ProjectListNode *MakeProjectListPlanNode(WindowPlanNode *w,
const bool need_agg);
ProjectListNode *MakeProjectListPlanNode(const WindowPlanNode *w, const bool need_agg);
FilterPlanNode *MakeFilterPlanNode(PlanNode *node,
const ExprNode *condition);

Expand Down Expand Up @@ -128,7 +127,7 @@ class NodeManager {
const WindowDefNode *w2);
OrderExpression* MakeOrderExpression(const ExprNode* expr, const bool is_asc);
OrderByNode *MakeOrderByNode(const ExprListNode *order_expressions);
SqlNode *MakeFrameExtent(SqlNode *start, SqlNode *end);
FrameExtent *MakeFrameExtent(SqlNode *start, SqlNode *end);
SqlNode *MakeFrameBound(BoundType bound_type);
SqlNode *MakeFrameBound(BoundType bound_type, ExprNode *offset);
SqlNode *MakeFrameBound(BoundType bound_type, int64_t offset);
Expand Down
8 changes: 3 additions & 5 deletions hybridse/include/node/plan_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
#include <utility>
#include <vector>

#include "glog/logging.h"
#include "node/node_enum.h"
#include "node/sql_node.h"

Expand Down Expand Up @@ -298,7 +297,7 @@ class ProjectListNode : public LeafPlanNode {
w_ptr_(nullptr),
having_condition_(nullptr),
projects_({}) {}
ProjectListNode(WindowPlanNode *w_ptr, const bool has_agg)
ProjectListNode(const WindowPlanNode *w_ptr, const bool has_agg)
: LeafPlanNode(kProjectList),
has_row_project_(false),
has_agg_project_(has_agg),
Expand All @@ -320,8 +319,7 @@ class ProjectListNode : public LeafPlanNode {
}
}

WindowPlanNode *GetW() const { return w_ptr_; }
void SetW(WindowPlanNode* window) { w_ptr_ = window; }
const WindowPlanNode *GetW() const { return w_ptr_; }
const ExprNode* GetHavingCondition() const { return having_condition_;}
void SetHavingCondition(const node::ExprNode* having_condition) {
this->having_condition_ = having_condition;
Expand All @@ -339,7 +337,7 @@ class ProjectListNode : public LeafPlanNode {
private:
bool has_row_project_;
bool has_agg_project_;
WindowPlanNode *w_ptr_;
const WindowPlanNode *w_ptr_;
const ExprNode* having_condition_;
PlanNodeList projects_;
};
Expand Down
13 changes: 6 additions & 7 deletions hybridse/include/node/sql_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -1230,6 +1230,7 @@ class FrameNode : public SqlNode {
void SetFrameRows(FrameExtent* ext) { frame_rows_ = ext; }

int64_t frame_maxsize() const { return frame_maxsize_; }
void set_frame_maxsize(int64_t s) { frame_maxsize_ = s; }
int64_t GetHistoryRangeStart() const {
if (nullptr == frame_rows_ && nullptr == frame_range_) {
return INT64_MIN;
Expand Down Expand Up @@ -1359,9 +1360,6 @@ class WindowDefNode : public SqlNode {
SqlNodeList *union_tables() const { return union_tables_; }
void set_union_tables(SqlNodeList *union_table) { union_tables_ = union_table; }

bool PermitWindowMerge() const { return permit_window_merge_; }
void SetPermitWindowMerge(bool permit) { permit_window_merge_ = permit; }

const bool instance_not_in_window() const { return instance_not_in_window_; }
void set_instance_not_in_window(bool instance_not_in_window) { instance_not_in_window_ = instance_not_in_window; }
const bool exclude_current_time() const { return exclude_current_time_; }
Expand All @@ -1381,10 +1379,6 @@ class WindowDefNode : public SqlNode {
SqlNodeList *union_tables_; /* union other table in window */
ExprListNode *partitions_; /* PARTITION BY expression list */
OrderByNode *orders_; /* ORDER BY (list of SortBy) */

// whether allow two window merged into single
// one can explicitly set this to false to disable window merge over this window
bool permit_window_merge_ = true;
};

class AllNode : public ExprNode {
Expand Down Expand Up @@ -2773,8 +2767,13 @@ bool SqlListEquals(const SqlNodeList *left, const SqlNodeList *right);
bool ExprEquals(const ExprNode *left, const ExprNode *right);
bool FnDefEquals(const FnDefNode *left, const FnDefNode *right);
bool TypeEquals(const TypeNode *left, const TypeNode *right);

// retrieve the `WindowDefNode` for the `ExprNode`, which is either from
// `ExprNode` itself inside if it is an anonymous window e.g `fn() over (window)`
// or find in `windows` map by window name
bool WindowOfExpression(const std::map<std::string, const WindowDefNode *>& windows, ExprNode *node_ptr,
const WindowDefNode **output);

bool IsAggregationExpression(const udf::UdfLibrary* lib, const node::ExprNode* node_ptr);
void ColumnOfExpression(const ExprNode *node_ptr,
std::vector<const node::ExprNode *> *columns); // NOLINT
Expand Down
4 changes: 2 additions & 2 deletions hybridse/src/node/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ SqlNode *NodeManager::MakeFrameBound(BoundType bound_type, int64_t offset) {
FrameBound *node_ptr = new FrameBound(bound_type, offset, false);
return RegisterNode(node_ptr);
}
SqlNode *NodeManager::MakeFrameExtent(SqlNode *start, SqlNode *end) {
FrameExtent *NodeManager::MakeFrameExtent(SqlNode *start, SqlNode *end) {
FrameExtent *node_ptr = new FrameExtent(dynamic_cast<FrameBound *>(start), dynamic_cast<FrameBound *>(end));
return RegisterNode(node_ptr);
}
Expand Down Expand Up @@ -566,7 +566,7 @@ WindowPlanNode *NodeManager::MakeWindowPlanNode(int w_id) {
return node_ptr;
}

ProjectListNode *NodeManager::MakeProjectListPlanNode(WindowPlanNode *w_ptr, const bool need_agg) {
ProjectListNode *NodeManager::MakeProjectListPlanNode(const WindowPlanNode *w_ptr, const bool need_agg) {
ProjectListNode *node_ptr = new ProjectListNode(w_ptr, need_agg);
RegisterNode(node_ptr);
return node_ptr;
Expand Down
7 changes: 4 additions & 3 deletions hybridse/src/node/sql_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -632,7 +632,7 @@ bool FrameExtent::Equals(const SqlNode *node) const {
}

FrameExtent* FrameExtent::ShadowCopy(NodeManager* nm) const {
return dynamic_cast<FrameExtent *>(nm->MakeFrameExtent(start(), end()));
return nm->MakeFrameExtent(start(), end());
}

bool FrameNode::Equals(const SqlNode *node) const {
Expand Down Expand Up @@ -887,7 +887,6 @@ void WindowDefNode::Print(std::ostream &output, const std::string &org_tab) cons

// test if two window can be merged into single one
// besides the two windows is the same one, two can also merged when all of those condition meet:
// - flag permit_window_merge_ is true
// - union table equal
// - exclude current time equal
// - instance not in window equal
Expand All @@ -901,7 +900,7 @@ bool WindowDefNode::CanMergeWith(const WindowDefNode *that, const bool enable_wi
if (Equals(that)) {
return true;
}
return PermitWindowMerge() && SqlListEquals(this->union_tables_, that->union_tables_) &&
return SqlListEquals(this->union_tables_, that->union_tables_) &&
this->exclude_current_time_ == that->exclude_current_time_ &&
this->instance_not_in_window_ == that->instance_not_in_window_ && ExprEquals(this->orders_, that->orders_) &&
ExprEquals(this->partitions_, that->partitions_) && nullptr != frame_ptr_ &&
Expand Down Expand Up @@ -1252,6 +1251,7 @@ bool IsAggregationExpression(const udf::UdfLibrary *lib, const ExprNode *node_pt
}
return false;
}

bool WindowOfExpression(const std::map<std::string, const WindowDefNode *> &windows, ExprNode *node_ptr,
const WindowDefNode **output) {
// try to resolved window ptr from expression like: call(args...) over
Expand All @@ -1260,6 +1260,7 @@ bool WindowOfExpression(const std::map<std::string, const WindowDefNode *> &wind
CallExprNode *func_node_ptr = dynamic_cast<CallExprNode *>(node_ptr);
if (nullptr != func_node_ptr->GetOver()) {
if (func_node_ptr->GetOver()->GetName().empty()) {
// anonymous over
*output = func_node_ptr->GetOver();
} else {
auto iter = windows.find(func_node_ptr->GetOver()->GetName());
Expand Down
Loading

0 comments on commit 54874fc

Please sign in to comment.