diff --git a/cases/function/function/test_udaf_function.yaml b/cases/function/function/test_udaf_function.yaml index 7f858025bf3..41357464b69 100644 --- a/cases/function/function/test_udaf_function.yaml +++ b/cases/function/function/test_udaf_function.yaml @@ -1918,6 +1918,8 @@ cases: - id: 46 desc: window lag functions sqlDialect: ["HybridSQL"] + # FIXME(ace): cluster & batch-request mode + mode: cluster-unsupport, batch-request-unsupport inputs: - columns : ["id int","pk bigint","c1 string","c2 int","c3 bigint","c4 float", @@ -1946,26 +1948,25 @@ cases: lag(c7, 0) OVER w1 as m13, lag(c7, 2) OVER w1 as m14, lag(c8, 0) OVER w1 as m15, - lag(c8, 2) OVER w1 as m16, - lag(pk, -1) OVER w1 as m17 + lag(c8, 2) OVER w1 as m16 FROM {0} WINDOW w1 AS (PARTITION BY {0}.pk ORDER BY {0}.c6 ROWS BETWEEN 2 PRECEDING AND CURRENT ROW); expect: order: id columns: ["id int","m1 string", "m2 string", "m3 int", "m4 int", "m5 bigint", "m6 bigint", "m7 float", "m8 float", "m9 double", "m10 double", - "m11 timestamp", "m12 timestamp", "m13 date", "m14 date", "m15 bool", "m16 bool", "m17 bigint"] + "m11 timestamp", "m12 timestamp", "m13 date", "m14 date", "m15 bool", "m16 bool"] rows: - [1, "a", NULL, 1, NULL, 30, NULL, 1.1, NULL, 2.1, NULL, - 1590738990000, NULL, "2020-05-01", NULL, true, NULL, NULL] + 1590738990000, NULL, "2020-05-01", NULL, true, NULL] - [2, "c", NULL, 4, NULL, 33, NULL, 1.4, NULL, 2.4, NULL, - 1590738991000, NULL, "2020-05-03", NULL, false, NULL, NULL] + 1590738991000, NULL, "2020-05-03", NULL, false, NULL] - [3, "b", "a", 3, 1, 32, 30, 1.3, 1.1, 2.3, 2.1, - 1590738992000, 1590738990000, "2020-05-02", "2020-05-01", true, true, NULL] + 1590738992000, 1590738990000, "2020-05-02", "2020-05-01", true, true] - [4, NULL, "c", NULL, 4, NULL, 33, NULL, 1.4, NULL, 2.4, - 1590738993000, 1590738991000, NULL, "2020-05-03", NULL, false, NULL] + 1590738993000, 1590738991000, NULL, "2020-05-03", NULL, false] - [5, "d", "b", 5, 3, 35, 32, 1.5, 1.3, 2.5, 2.3, - 1590738994000, 1590738992000, "2020-05-04", "2020-05-02", false, true, NULL] + 1590738994000, 1590738992000, "2020-05-04", "2020-05-02", false, true] - id: 47 desc: count where value equals first value @@ -2303,6 +2304,8 @@ cases: - [4,"aa",1,1,30,1.1,2.1,1590738990000,"2020-05-01","a"] - id: 56 desc: window at functions, at is synonym to lag + # FIXME(ace): cluster & batch-request mode + mode: cluster-unsupport, batch-request-unsupport sqlDialect: ["HybridSQL"] inputs: - @@ -2310,11 +2313,11 @@ cases: "c5 double","c6 timestamp","c7 date","c8 bool"] indexs: ["index1:pk:c6"] rows: - - [1, 1, "a", 1, 30, 1.1, 2.1, 1590738990000, "2020-05-01", true] - - [2, 1, "c", 4, 33, 1.4, 2.4, 1590738991000, "2020-05-03", false] - - [3, 1, "b", 3, 32, 1.3, 2.3, 1590738992000, "2020-05-02", true,] + - [1, 1, "a", 1, 30, 1.1, 2.1, 1590738990000, "2020-05-01", true] + - [2, 1, "c", 4, 33, 1.4, 2.4, 1590738991000, "2020-05-03", false] + - [3, 1, "b", 3, 32, 1.3, 2.3, 1590738992000, "2020-05-02", true,] - [4, 1, NULL, NULL, NULL, NULL, NULL, 1590738993000, NULL, NULL] - - [5, 1, "d", 5, 35, 1.5, 2.5, 1590738994000, "2020-05-04", false] + - [5, 1, "d", 5, 35, 1.5, 2.5, 1590738994000, "2020-05-04", false] sql: | SELECT {0}.id, at(c1, 0) OVER w1 as m1, @@ -2332,24 +2335,103 @@ cases: at(c7, 0) OVER w1 as m13, at(c7, 2) OVER w1 as m14, at(c8, 0) OVER w1 as m15, - at(c8, 2) OVER w1 as m16, - at(pk, -1) OVER w1 as m17 + at(c8, 2) OVER w1 as m16 FROM {0} WINDOW w1 AS (PARTITION BY {0}.pk ORDER BY {0}.c6 ROWS BETWEEN 2 PRECEDING AND CURRENT ROW); expect: order: id columns: ["id int","m1 string", "m2 string", "m3 int", "m4 int", "m5 bigint", "m6 bigint", "m7 float", "m8 float", "m9 double", "m10 double", - "m11 timestamp", "m12 timestamp", "m13 date", "m14 date", "m15 bool", "m16 bool", "m17 bigint"] + "m11 timestamp", "m12 timestamp", "m13 date", "m14 date", "m15 bool", "m16 bool"] rows: - [1, "a", NULL, 1, NULL, 30, NULL, 1.1, NULL, 2.1, NULL, - 1590738990000, NULL, "2020-05-01", NULL, true, NULL, NULL] + 1590738990000, NULL, "2020-05-01", NULL, true, NULL] - [2, "c", NULL, 4, NULL, 33, NULL, 1.4, NULL, 2.4, NULL, - 1590738991000, NULL, "2020-05-03", NULL, false, NULL, NULL] + 1590738991000, NULL, "2020-05-03", NULL, false, NULL] - [3, "b", "a", 3, 1, 32, 30, 1.3, 1.1, 2.3, 2.1, - 1590738992000, 1590738990000, "2020-05-02", "2020-05-01", true, true, NULL] + 1590738992000, 1590738990000, "2020-05-02", "2020-05-01", true, true] - [4, NULL, "c", NULL, 4, NULL, 33, NULL, 1.4, NULL, 2.4, - 1590738993000, 1590738991000, NULL, "2020-05-03", NULL, false, NULL] + 1590738993000, 1590738991000, NULL, "2020-05-03", NULL, false] - [5, "d", "b", 5, 3, 35, 32, 1.5, 1.3, 2.5, 2.3, - 1590738994000, 1590738992000, "2020-05-04", "2020-05-02", false, true, NULL] + 1590738994000, 1590738992000, "2020-05-04", "2020-05-02", false, true] + - id: 57 + desc: | + correctness for at/lag when offset out-of-range window frame bound. + keynote, lag returns value evaluated at the row that is offset rows before the current row within the partition. + refer https://github.com/4paradigm/OpenMLDB/issues/1554 + inputs: + - columns: [ "id int","ts timestamp","group1 string","val1 int" ] + indexs: [ "index1:group1:ts" ] + name: t1 + data: | + 1, 1612130400000, g1, 1 + 2, 1612130401000, g1, 2 + 3, 1612130402000, g1, 3 + 4, 1612130403000, g1, 4 + 5, 1612130404000, g1, 5 + 6, 1612130404000, g2, 4 + 7, 1612130405000, g2, 3 + 8, 1612130406000, g2, 2 + sql: | + select + `id`, + `val1`, + lag(val1, 0) over w1 as agg1, + lag(val1, 1) over w1 as agg2, + lag(val1, 3) over w1 as agg3 + from `t1` WINDOW + w1 as (partition by `group1` order by `ts` rows_range between 2s preceding and 1s preceding MAXSIZE 10); + expect: + columns: ["id int", "val1 int", "agg1 int", "agg2 int", "agg3 int"] + order: id + data: | + 1, 1, 1, NULL, NULL + 2, 2, 2, 1, NULL + 3, 3, 3, 2, NULL + 4, 4, 4, 3, 1 + 5, 5, 5, 4, 2 + 6, 4, 4, NULL, NULL + 7, 3, 3, 4, NULL + 8, 2, 2, 3, NULL + + - id: 58 + desc: | + correctness for at/lag when offset out-of-range window frame bound, together with other window function. + refer https://github.com/4paradigm/OpenMLDB/issues/1554 + # FIXME(ace): request/cluster run do not meet the expects + mode: request-unsupport,cluster-unsupport + inputs: + - columns: [ "id int","ts timestamp","group1 string","val1 int" ] + indexs: [ "index1:group1:ts" ] + name: t1 + data: | + 1, 1612130400000, g1, 1 + 2, 1612130401000, g1, 2 + 3, 1612130402000, g1, 3 + 4, 1612130403000, g1, 4 + 5, 1612130404000, g1, 5 + 6, 1612130404000, g2, 4 + 7, 1612130405000, g2, 3 + 8, 1612130406000, g2, 2 + sql: | + select + `id`, + `val1`, + lag(val1, 0) over w1 as agg1, + lag(val1, 3) over w1 as agg2, + first_value(val1) over w1 as agg3 + from `t1` WINDOW + w1 as (partition by `group1` order by `ts` rows_range between 2s preceding and 1s preceding MAXSIZE 10); + expect: + columns: ["id int", "val1 int", "agg1 int", "agg2 int", "agg3 int"] + order: id + data: | + 1, 1, 1, NULL, NULL + 2, 2, 2, NULL, 1 + 3, 3, 3, NULL, 2 + 4, 4, 4, 1, 3 + 5, 5, 5, 2, 4 + 6, 4, 4, NULL, NULL + 7, 3, 3, NULL, 4 + 8, 2, 2, NULL, 3 diff --git a/hybridse/include/codec/list_iterator_codec.h b/hybridse/include/codec/list_iterator_codec.h index 055807b2356..1f2c85fb0a5 100644 --- a/hybridse/include/codec/list_iterator_codec.h +++ b/hybridse/include/codec/list_iterator_codec.h @@ -49,7 +49,7 @@ template class WrapListImpl : public ListV { public: WrapListImpl() : ListV() {} - ~WrapListImpl() {} + ~WrapListImpl() override {} virtual const V GetFieldUnsafe(const R &row) const = 0; virtual void GetField(const R &row, V *, bool *) const = 0; virtual const bool IsNull(const R &row) const = 0; @@ -67,7 +67,7 @@ class ColumnImpl : public WrapListImpl { col_idx_(col_idx), offset_(offset) {} - ~ColumnImpl() {} + ~ColumnImpl() override {} const V GetFieldUnsafe(const Row &row) const override { V value; diff --git a/hybridse/include/node/node_manager.h b/hybridse/include/node/node_manager.h index 6929cb8fe91..a7bd59e7fa9 100644 --- a/hybridse/include/node/node_manager.h +++ b/hybridse/include/node/node_manager.h @@ -57,7 +57,7 @@ class NodeManager { PlanNode *MakeMultiPlanNode(const PlanType &type); PlanNode *MakeMergeNode(int column_size); WindowPlanNode *MakeWindowPlanNode(int w_id); - ProjectListNode *MakeProjectListPlanNode(const WindowPlanNode *w, + ProjectListNode *MakeProjectListPlanNode(WindowPlanNode *w, const bool need_agg); FilterPlanNode *MakeFilterPlanNode(PlanNode *node, const ExprNode *condition); diff --git a/hybridse/include/node/plan_node.h b/hybridse/include/node/plan_node.h index 743d8fb95a5..748dbe72bd0 100644 --- a/hybridse/include/node/plan_node.h +++ b/hybridse/include/node/plan_node.h @@ -17,14 +17,16 @@ #ifndef HYBRIDSE_INCLUDE_NODE_PLAN_NODE_H_ #define HYBRIDSE_INCLUDE_NODE_PLAN_NODE_H_ -#include #include #include #include #include #include + +#include "glog/logging.h" #include "node/node_enum.h" #include "node/sql_node.h" + namespace hybridse { namespace node { @@ -232,7 +234,7 @@ class ProjectNode : public LeafPlanNode { node::ExprNode *GetExpression() const { return expression_; } void SetExpression(node::ExprNode *expr) { expression_ = expr; } node::FrameNode *frame() const { return frame_; } - void set_frame(node::FrameNode *frame) { frame_ = frame; } + void set_frame(FrameNode* frame) { frame_ = frame; } virtual bool Equals(const PlanNode *node) const; const bool IsAgg() const { return is_aggregation_; } @@ -248,10 +250,10 @@ class WindowPlanNode : public LeafPlanNode { public: explicit WindowPlanNode(int id) : LeafPlanNode(kPlanTypeWindow), - id(id), + id_(id), exclude_current_time_(false), instance_not_in_window_(false), - name(""), + name_(""), keys_(nullptr), orders_(nullptr) {} ~WindowPlanNode() {} @@ -264,9 +266,10 @@ class WindowPlanNode : public LeafPlanNode { const OrderByNode *GetOrders() const { return orders_; } void SetKeys(ExprListNode *keys) { keys_ = keys; } void SetOrders(OrderByNode *orders) { orders_ = orders; } - const std::string &GetName() const { return name; } - void SetName(const std::string &name) { WindowPlanNode::name = name; } - const int GetId() const { return id; } + const std::string &GetName() const { return name_; } + void SetName(const std::string &name) { name_ = name; } + int GetId() const { return id_; } + void SetId(int id) { id_ = id; } void AddUnionTable(PlanNode *node) { return union_tables_.push_back(node); } const PlanNodeList &union_tables() const { return union_tables_; } const bool instance_not_in_window() const { return instance_not_in_window_; } @@ -276,10 +279,10 @@ class WindowPlanNode : public LeafPlanNode { virtual bool Equals(const PlanNode *node) const; private: - int id; + int id_; bool exclude_current_time_; bool instance_not_in_window_; - std::string name; + std::string name_; FrameNode *frame_node_; ExprListNode *keys_; OrderByNode *orders_; @@ -294,47 +297,51 @@ class ProjectListNode : public LeafPlanNode { has_agg_project_(false), w_ptr_(nullptr), having_condition_(nullptr), - projects({}) {} - ProjectListNode(const WindowPlanNode *w_ptr, const bool has_agg) + projects_({}) {} + ProjectListNode(WindowPlanNode *w_ptr, const bool has_agg) : LeafPlanNode(kProjectList), has_row_project_(false), has_agg_project_(has_agg), w_ptr_(w_ptr), having_condition_(nullptr), - projects({}) {} + projects_({}) {} ~ProjectListNode() {} + void Print(std::ostream &output, const std::string &org_tab) const; - const PlanNodeList &GetProjects() const { return projects; } + const PlanNodeList &GetProjects() const { return projects_; } + void AddProject(ProjectNode *project) { - projects.push_back(project); + projects_.push_back(project); if (project->IsAgg()) { has_agg_project_ = true; } else { has_row_project_ = true; } } - const WindowPlanNode *GetW() const { return w_ptr_; } + + WindowPlanNode *GetW() const { return w_ptr_; } + void SetW(WindowPlanNode* window) { w_ptr_ = window; } const ExprNode* GetHavingCondition() const { return having_condition_;} void SetHavingCondition(const node::ExprNode* having_condition) { this->having_condition_ = having_condition; } - const bool HasRowProject() const { return has_row_project_; } - const bool HasAggProject() const { return has_agg_project_; } - const bool IsWindowProject() const { return nullptr != w_ptr_; } + bool HasRowProject() const { return has_row_project_; } + bool HasAggProject() const { return has_agg_project_; } + bool IsWindowProject() const { return nullptr != w_ptr_; } virtual bool Equals(const PlanNode *node) const; static bool MergeProjectList(node::ProjectListNode *project_list1, node::ProjectListNode *project_list2, node::ProjectListNode *merged_project); - bool has_row_project_; - bool has_agg_project_; - const WindowPlanNode *w_ptr_; bool IsSimpleProjectList(); private: + bool has_row_project_; + bool has_agg_project_; + WindowPlanNode *w_ptr_; const ExprNode* having_condition_; - PlanNodeList projects; + PlanNodeList projects_; }; class ProjectPlanNode : public UnaryPlanNode { diff --git a/hybridse/include/node/sql_node.h b/hybridse/include/node/sql_node.h index 3015b5b2957..6bea3ae293c 100644 --- a/hybridse/include/node/sql_node.h +++ b/hybridse/include/node/sql_node.h @@ -1179,12 +1179,16 @@ class FrameExtent : public SqlNode { FrameExtent(FrameBound *start, FrameBound *end) : SqlNode(kFrameExtent, 0, 0), start_(start), end_(end) {} - ~FrameExtent() {} + ~FrameExtent() override {} void Print(std::ostream &output, const std::string &org_tab) const; virtual bool Equals(const SqlNode *node) const; + FrameBound *start() const { return start_; } FrameBound *end() const { return end_; } + void SetStart(FrameBound* start) { start_ = start; } + void SetEnd(FrameBound* end) { end_ = end; } + const std::string GetExprString() const { std::string str = "["; if (nullptr == start_) { @@ -1203,6 +1207,8 @@ class FrameExtent : public SqlNode { return str; } + FrameExtent* ShadowCopy(NodeManager* nm) const override; + private: FrameBound *start_; FrameBound *end_; @@ -1220,6 +1226,9 @@ class FrameNode : public SqlNode { void set_frame_type(FrameType frame_type) { frame_type_ = frame_type; } FrameExtent *frame_range() const { return frame_range_; } FrameExtent *frame_rows() const { return frame_rows_; } + void SetFrameRange(FrameExtent* ext) { frame_range_ = ext; } + void SetFrameRows(FrameExtent* ext) { frame_rows_ = ext; } + int64_t frame_maxsize() const { return frame_maxsize_; } int64_t GetHistoryRangeStart() const { if (nullptr == frame_rows_ && nullptr == frame_range_) { @@ -1311,6 +1320,8 @@ class FrameNode : public SqlNode { return false; } + FrameNode* ShadowCopy(node::NodeManager* nm) const override; + private: FrameType frame_type_; FrameExtent *frame_range_; @@ -1348,14 +1359,20 @@ class WindowDefNode : public SqlNode { SqlNodeList *union_tables() const { return union_tables_; } void set_union_tables(SqlNodeList *union_table) { union_tables_ = union_table; } + bool PermitWindowMerge() const { return permit_window_merge_; } + void SetPermitWindowMerge(bool permit) { permit_window_merge_ = permit; } + const bool instance_not_in_window() const { return instance_not_in_window_; } void set_instance_not_in_window(bool instance_not_in_window) { instance_not_in_window_ = instance_not_in_window; } const bool exclude_current_time() const { return exclude_current_time_; } void set_exclude_current_time(bool exclude_current_time) { exclude_current_time_ = exclude_current_time; } void Print(std::ostream &output, const std::string &org_tab) const; - virtual bool Equals(const SqlNode *that) const; + bool Equals(const SqlNode *that) const override; bool CanMergeWith(const WindowDefNode *that, const bool enable_window_maxsize_merged = true) const; + // shadow copy all fields except window_name_ + WindowDefNode* ShadowCopy(NodeManager* nm) const override; + private: bool exclude_current_time_; bool instance_not_in_window_; @@ -1364,6 +1381,10 @@ class WindowDefNode : public SqlNode { SqlNodeList *union_tables_; /* union other table in window */ ExprListNode *partitions_; /* PARTITION BY expression list */ OrderByNode *orders_; /* ORDER BY (list of SortBy) */ + + // whether allow two window merged into single + // one can explicitly set this to false to disable window merge over this window + bool permit_window_merge_ = true; }; class AllNode : public ExprNode { diff --git a/hybridse/src/node/node_manager.cc b/hybridse/src/node/node_manager.cc index 68b6ebd9c3e..165539579e2 100644 --- a/hybridse/src/node/node_manager.cc +++ b/hybridse/src/node/node_manager.cc @@ -566,7 +566,7 @@ WindowPlanNode *NodeManager::MakeWindowPlanNode(int w_id) { return node_ptr; } -ProjectListNode *NodeManager::MakeProjectListPlanNode(const WindowPlanNode *w_ptr, const bool need_agg) { +ProjectListNode *NodeManager::MakeProjectListPlanNode(WindowPlanNode *w_ptr, const bool need_agg) { ProjectListNode *node_ptr = new ProjectListNode(w_ptr, need_agg); RegisterNode(node_ptr); return node_ptr; diff --git a/hybridse/src/node/plan_node.cc b/hybridse/src/node/plan_node.cc index 149cf10e5e8..c05b0bc6730 100644 --- a/hybridse/src/node/plan_node.cc +++ b/hybridse/src/node/plan_node.cc @@ -279,18 +279,15 @@ void ProjectListNode::Print(std::ostream &output, PlanNode::Print(output, org_tab); if (nullptr == w_ptr_) { output << "\n"; - PrintPlanVector(output, org_tab + INDENT, projects, - "projects on table ", nullptr == this->having_condition_); + PrintPlanVector(output, org_tab + INDENT, projects_, "projects on table ", nullptr == this->having_condition_); if (nullptr != this->having_condition_) { - PrintSqlNode(output, org_tab + INDENT, having_condition_, - "having condition: ", true); + PrintSqlNode(output, org_tab + INDENT, having_condition_, "having condition: ", true); } } else { output << "\n"; PrintPlanNode(output, org_tab + INDENT, (w_ptr_), "", false); output << "\n"; - PrintPlanVector(output, org_tab + INDENT, projects, - "projects on window ", true); + PrintPlanVector(output, org_tab + INDENT, projects_, "projects on window ", true); } } @@ -347,7 +344,7 @@ bool ProjectListNode::Equals(const PlanNode *node) const { return false; } const ProjectListNode *that = dynamic_cast(node); - if (this->projects.size() != that->projects.size()) { + if (this->projects_.size() != that->projects_.size()) { return false; } @@ -355,17 +352,17 @@ bool ProjectListNode::Equals(const PlanNode *node) const { this->has_agg_project_ == that->has_agg_project_ && node::ExprEquals(this->having_condition_, that->having_condition_) && node::PlanEquals(this->w_ptr_, that->w_ptr_) && - PlanListEquals(this->projects, that->projects) && + PlanListEquals(this->projects_, that->projects_) && LeafPlanNode::Equals(node); } bool ProjectListNode::IsSimpleProjectList() { if (has_agg_project_) { return false; } - if (projects.empty()) { + if (projects_.empty()) { return false; } - for (auto item : projects) { + for (auto item : projects_) { auto expr = dynamic_cast(item)->GetExpression(); if (!node::ExprIsSimple(expr)) { return false; @@ -530,7 +527,7 @@ void WindowPlanNode::Print(std::ostream &output, const std::string &org_tab) const { PlanNode::Print(output, org_tab); output << "\n"; - PrintValue(output, org_tab, name, "window_name", true); + PrintValue(output, org_tab, name_, "window_name", true); } bool WindowPlanNode::Equals(const PlanNode *node) const { if (nullptr == node) { @@ -545,7 +542,7 @@ bool WindowPlanNode::Equals(const PlanNode *node) const { return false; } const WindowPlanNode *that = dynamic_cast(node); - return this->name == that->name && + return this->name_ == that->name_ && this->instance_not_in_window() == that->instance_not_in_window() && this->exclude_current_time() == that->exclude_current_time() && SqlEquals(this->frame_node_, that->frame_node_) && diff --git a/hybridse/src/node/sql_node.cc b/hybridse/src/node/sql_node.cc index a93bb80ce59..fc90d0b8690 100644 --- a/hybridse/src/node/sql_node.cc +++ b/hybridse/src/node/sql_node.cc @@ -630,6 +630,11 @@ bool FrameExtent::Equals(const SqlNode *node) const { const FrameExtent *that = dynamic_cast(node); return SqlEquals(this->start_, that->start_) && SqlEquals(this->end_, that->end_); } + +FrameExtent* FrameExtent::ShadowCopy(NodeManager* nm) const { + return dynamic_cast(nm->MakeFrameExtent(start(), end())); +} + bool FrameNode::Equals(const SqlNode *node) const { if (!SqlNode::Equals(node)) { return false; @@ -653,6 +658,11 @@ const std::string FrameNode::GetExprString() const { } return str; } + +FrameNode *FrameNode::ShadowCopy(NodeManager *nm) const { + return dynamic_cast(nm->MakeFrameNode(frame_type(), frame_range(), frame_rows(), frame_maxsize())); +} + bool FrameNode::CanMergeWith(const FrameNode *that, const bool enbale_merge_with_maxsize) const { if (Equals(that)) { return true; @@ -683,7 +693,7 @@ bool FrameNode::CanMergeWith(const FrameNode *that, const bool enbale_merge_with return false; } - // Handle RowsRange-like frame with MAXSIZE and RowsFrame + // Handle RowsRange-like frame with MAXSIZE and RowsFrame if (this->IsRowsRangeLikeMaxSizeFrame() && kFrameRows == that->frame_type_) { // Pure History RowRangeLike Frame with maxsize can't be merged with // Rows frame @@ -875,6 +885,15 @@ void WindowDefNode::Print(std::ostream &output, const std::string &org_tab) cons PrintSqlNode(output, tab, frame_ptr_, "frame", true); } +// test if two window can be merged into single one +// besides the two windows is the same one, two can also merged when all of those condition meet: +// - flag permit_window_merge_ is true +// - union table equal +// - exclude current time equal +// - instance not in window equal +// - order equal +// - partion equal +// - window frame can be merged bool WindowDefNode::CanMergeWith(const WindowDefNode *that, const bool enable_window_maxsize_merged) const { if (nullptr == that) { return false; @@ -882,12 +901,18 @@ bool WindowDefNode::CanMergeWith(const WindowDefNode *that, const bool enable_wi if (Equals(that)) { return true; } - return SqlListEquals(this->union_tables_, that->union_tables_) && + return PermitWindowMerge() && SqlListEquals(this->union_tables_, that->union_tables_) && this->exclude_current_time_ == that->exclude_current_time_ && this->instance_not_in_window_ == that->instance_not_in_window_ && ExprEquals(this->orders_, that->orders_) && ExprEquals(this->partitions_, that->partitions_) && nullptr != frame_ptr_ && this->frame_ptr_->CanMergeWith(that->frame_ptr_, enable_window_maxsize_merged); } + +WindowDefNode* WindowDefNode::ShadowCopy(NodeManager *nm) const { + return dynamic_cast(nm->MakeWindowDefNode(union_tables_, GetPartitions(), GetOrders(), GetFrame(), + exclude_current_time_, instance_not_in_window_)); +} + bool WindowDefNode::Equals(const SqlNode *node) const { if (!SqlNode::Equals(node)) { return false; diff --git a/hybridse/src/plan/planner.cc b/hybridse/src/plan/planner.cc index 042a40db8ac..d3e1d1990fe 100644 --- a/hybridse/src/plan/planner.cc +++ b/hybridse/src/plan/planner.cc @@ -24,6 +24,7 @@ #include #include +#include "absl/strings/match.h" #include "plan/plan_api.h" #include "proto/fe_common.pb.h" #include "udf/default_udf_library.h" @@ -123,10 +124,6 @@ base::Status Planner::CreateSelectQueryPlan(const node::SelectQueryNode *root, P CHECK_TRUE(nullptr != root->GetSelectList() && !root->GetSelectList()->GetList().empty(), common::kPlanError, "fail to create select query plan: select expr list is null or empty") - const udf::UdfLibrary *lib = udf::DefaultUdfLibrary::get(); - // prepare window list - std::map window_project_list_map; - node::ProjectListNode *table_project_list = node_manager_->MakeProjectListPlanNode(nullptr, false); // prepare window def int w_id = 1; std::map windows; @@ -140,6 +137,10 @@ base::Status Planner::CreateSelectQueryPlan(const node::SelectQueryNode *root, P } } + std::map window_project_list_map; + node::ProjectListNode *table_project_list = node_manager_->MakeProjectListPlanNode(nullptr, false); + const udf::UdfLibrary *lib = udf::DefaultUdfLibrary::get(); + const node::NodePointVector &select_expr_list = root->GetSelectList()->GetList(); for (uint32_t pos = 0u; pos < select_expr_list.size(); pos++) { auto expr = select_expr_list[pos]; @@ -161,6 +162,7 @@ base::Status Planner::CreateSelectQueryPlan(const node::SelectQueryNode *root, P } } + // get the window for the expr const node::WindowDefNode *w_ptr = nullptr; CHECK_TRUE(node::WindowOfExpression(windows, project_expr, &w_ptr), common::kPlanError, "fail to resolved window") @@ -168,17 +170,21 @@ base::Status Planner::CreateSelectQueryPlan(const node::SelectQueryNode *root, P // deal with row project / table aggregation project if (w_ptr == nullptr) { if (node::IsAggregationExpression(lib, project_expr)) { + // table aggregation project table_project_list->AddProject( node_manager_->MakeAggProjectNode(pos, project_name, project_expr, nullptr)); } else { + // row project table_project_list->AddProject(node_manager_->MakeRowProjectNode(pos, project_name, project_expr)); } continue; } // deal with window project if (window_project_list_map.find(w_ptr) == window_project_list_map.end()) { + // save the newly found window to (window -> project list) map node::WindowPlanNode *w_node_ptr = node_manager_->MakeWindowPlanNode(w_id++); CHECK_STATUS(CreateWindowPlanNode(w_ptr, w_node_ptr)) + // and create initial project list node for that new window window_project_list_map[w_ptr] = node_manager_->MakeProjectListPlanNode(w_node_ptr, true); } window_project_list_map[w_ptr]->AddProject( @@ -198,11 +204,11 @@ base::Status Planner::CreateSelectQueryPlan(const node::SelectQueryNode *root, P // Add table projects into project map beforehand // Thus we can merge project list based on window frame when it is necessary. if (!table_project_list->GetProjects().empty()) { + // (nullptr, project list) pair contains all projects that is not window related table_project_list->SetHavingCondition(root->having_clause_ptr_); window_project_list_map[nullptr] = table_project_list; } // merge window map - std::map merged_project_list_map; bool long_window_exist = false; // only support long-window optimization for request-mode if (!is_batch_mode_ && !long_windows_.empty()) { @@ -217,18 +223,21 @@ base::Status Planner::CreateSelectQueryPlan(const node::SelectQueryNode *root, P } } + std::map merged_project_list_map; if (long_window_exist) { merged_project_list_map = window_project_list_map; } else { - CHECK_STATUS(MergeProjectMap(window_project_list_map, &merged_project_list_map)) + CHECK_STATUS(MergeProjectMap(window_project_list_map, &merged_project_list_map, &w_id)) } // add MergeNode if multi ProjectionLists exist PlanNodeList project_list_vec(w_id); for (auto &v : merged_project_list_map) { - node::ProjectListNode *project_list = v.second; - int pos = nullptr == project_list->GetW() ? 0 : project_list->GetW()->GetId(); - project_list_vec[pos] = project_list; + if (v.second->GetW() == nullptr) { + project_list_vec[0] = v.second; + } else { + project_list_vec[v.second->GetW()->GetId()] = v.second; + } } // merge simple project with 1st window project @@ -884,12 +893,11 @@ bool Planner::MergeWindows(const std::map> window_id_pairs; for (auto it = map.begin(); it != map.end(); it++) { - window_id_pairs.push_back( - std::make_pair(it->first, nullptr == it->second->GetW() ? 0 : it->second->GetW()->GetId())); + window_id_pairs.emplace_back(it->first, nullptr == it->second->GetW() ? 0 : it->second->GetW()->GetId()); } std::sort(window_id_pairs.begin(), window_id_pairs.end(), [](const std::pair &p1, @@ -901,12 +909,12 @@ bool Planner::MergeWindows(const std::mapfirst); + if (windows_ptr->empty()) { + windows_ptr->push_back(iter->first); continue; } bool can_be_merged = false; - for (auto iter_w = windows.begin(); iter_w != windows.end(); iter_w++) { + for (auto iter_w = windows_ptr->begin(); iter_w != windows_ptr->end(); iter_w++) { if (node::SqlEquals(iter->first, *iter_w)) { can_be_merged = true; has_window_merged = true; @@ -924,7 +932,7 @@ bool Planner::MergeWindows(const std::mapfirst); + windows_ptr->push_back(iter->first); } } @@ -933,12 +941,12 @@ bool Planner::MergeWindows(const std::mapfirst); + if (windows_ptr->empty()) { + windows_ptr->push_back(iter->first); continue; } bool can_be_merged = false; - for (auto iter_w = windows.begin(); iter_w != windows.end(); iter_w++) { + for (auto iter_w = windows_ptr->begin(); iter_w != windows_ptr->end(); iter_w++) { if (node::SqlEquals(iter->first, *iter_w)) { can_be_merged = true; has_window_merged = true; @@ -956,15 +964,17 @@ bool Planner::MergeWindows(const std::mapfirst); + windows_ptr->push_back(iter->first); } } return has_window_merged; } +// win_id passed in for the purpose of creating possible new WindowPlanNode base::Status Planner::MergeProjectMap(const std::map &map, - std::map *output) { + std::map *output, + int *win_id) { if (map.empty()) { DLOG(INFO) << "Nothing to merge, project list map is empty"; *output = map; @@ -975,28 +985,33 @@ base::Status Planner::MergeProjectMap(const std::mapproject list) map, with empty project list + std::map merged_out; for (auto iter = windows.cbegin(); iter != windows.cend(); iter++) { if (nullptr == *iter) { - output->insert(std::make_pair(nullptr, node_manager_->MakeProjectListPlanNode(nullptr, false))); + // table project list or row project list + merged_out.emplace(nullptr, node_manager_->MakeProjectListPlanNode(nullptr, false)); continue; } node::WindowPlanNode *w_node_ptr = node_manager_->MakeWindowPlanNode(w_id++); CHECK_STATUS(CreateWindowPlanNode(*iter, w_node_ptr)) - output->insert(std::make_pair(*iter, node_manager_->MakeProjectListPlanNode(w_node_ptr, true))); + merged_out.emplace(*iter, node_manager_->MakeProjectListPlanNode(w_node_ptr, true)); } + // add project nodes from map to merged_out, based on whether two window can merged for (auto map_iter = map.cbegin(); map_iter != map.cend(); map_iter++) { bool merge_ok = false; - for (auto iter = output->begin(); iter != output->end(); iter++) { + for (auto iter = merged_out.begin(); iter != merged_out.end(); iter++) { if (node::SqlEquals(map_iter->first, iter->first) || (nullptr != map_iter->first && map_iter->first->CanMergeWith(iter->first))) { - auto frame = iter->second->GetW(); - node::ProjectListNode *merged_project = node_manager_->MakeProjectListPlanNode(frame, frame != nullptr); + auto window_plan_node = iter->second->GetW(); + node::ProjectListNode *merged_project = + node_manager_->MakeProjectListPlanNode(window_plan_node, window_plan_node != nullptr); node::ProjectListNode::MergeProjectList(iter->second, map_iter->second, merged_project); iter->second = merged_project; merge_ok = true; @@ -1006,16 +1021,17 @@ base::Status Planner::MergeProjectMap(const std::map *windows_ptr) { if (nullptr == windows_ptr) { return false; } - auto &windows = *windows_ptr; bool has_window_expand = false; // merge big history window with current history window - for (auto iter = windows.begin(); iter != windows.end(); iter++) { + for (auto iter = windows_ptr->begin(); iter != windows_ptr->end(); iter++) { const node::WindowDefNode *w_ptr = *iter; if (nullptr != w_ptr && nullptr != w_ptr->GetFrame() && !w_ptr->GetFrame()->IsRowsRangeLikeFrame() && w_ptr->GetFrame()->IsPureHistoryFrame()) { @@ -1128,6 +1144,130 @@ base::Status Planner::TransformTableDef(const std::string &table_name, const Nod return base::Status::OK(); } +std::map Planner::FilterWindowFrameBoundNonRelativeProjects( + const std::map &in, int *win_id) { + std::map splited_map; + + for (auto& kv : in) { + if (kv.first == nullptr) { + splited_map.emplace(kv.first, kv.second); + continue; + } + + node::ProjectListNode* window_bound_relative_projects = nullptr; + node::ProjectListNode* window_bound_non_relative_projects = nullptr; + SplitProjectList( + kv.second, + [](const node::ProjectNode *n) { + /* lag functions actually do not rely on window frame bound, the predicate will filter those out */ + if (n == nullptr) { + return true; + } + + if (n->GetExpression()->GetExprType() != node::kExprCall) { + return true; + } + + auto *call_expr = dynamic_cast(n->GetExpression()); + if (call_expr == nullptr) { + return true; + } + const auto &fn_name = call_expr->GetFnDef()->GetName(); + return !absl::EqualsIgnoreCase("lag", fn_name) && !absl::EqualsIgnoreCase("at", fn_name) && + !absl::EqualsIgnoreCase("lead", fn_name); + }, + &window_bound_relative_projects, &window_bound_non_relative_projects); + + if (window_bound_non_relative_projects->GetProjects().empty()) { + // there is no lag style functions in current project list node, keep it as-is + splited_map.emplace(kv.first, kv.second); + } else { + // found lag/at style functions + + bool preserve_old_window = !window_bound_relative_projects->GetProjects().empty(); + + if (preserve_old_window) { + splited_map.emplace(kv.first, window_bound_relative_projects); + } + + auto* window_bound_non_relative_window = CreateWindowUnboundPrecedingAndCurrent(kv.first); + // prohibit window merge on the window + window_bound_non_relative_window->SetPermitWindowMerge(false); + + // doing some update for fields in the window_bound_non_relative_projects + // 1. update window plan node + // 2. update frame for each project node inside + + // have to keep all window plan nodes in the final map unique and increasing by 1 strictly + // so the new window id, is needed, should be + // 1. old window id of kv input if there is only window_bound_non_relative_projects + // 2. *win_id + 1 if we want to keep both + // newly added window_bound_non_relative_projects always located right of the original one (if both exists) + int new_win_id = preserve_old_window ? *win_id : kv.second->GetW()->GetId(); + auto* new_win_plan = node_manager_->MakeWindowPlanNode(new_win_id); + if (preserve_old_window) { + *win_id += 1; + } + + auto s = CreateWindowPlanNode(window_bound_non_relative_window, new_win_plan); + if (!s.isOK()) { + LOG(ERROR) << "creating seperate window plan for lag/at failed: " << s.GetMsg(); + } + window_bound_non_relative_projects->SetW(new_win_plan); + for (auto &node : window_bound_non_relative_projects->GetProjects()) { + auto *project = dynamic_cast(node); + if (project != nullptr) { + project->set_frame(window_bound_non_relative_window->GetFrame()); + } + } + + splited_map.emplace(window_bound_non_relative_window, window_bound_non_relative_projects); + } + } + + return splited_map; +} + +void Planner::SplitProjectList(const node::ProjectListNode *in, + std::function predicate, node::ProjectListNode **trues, + node::ProjectListNode **falses) const { + auto* win = in->GetW(); + *trues = node_manager_->MakeProjectListPlanNode(win, false); + *falses = node_manager_->MakeProjectListPlanNode(win, false); + + for (auto node : in->GetProjects()) { + auto pn = dynamic_cast(node); + if (predicate(pn)) { + (*trues)->AddProject(pn); + } else { + (*falses)->AddProject(pn); + } + } +} + +node::WindowDefNode *Planner::CreateWindowUnboundPrecedingAndCurrent(const node::WindowDefNode *window) const { + auto *new_frame = window->GetFrame()->ShadowCopy(node_manager_); + + if (new_frame->frame_rows() != nullptr) { + auto *frame_ext = new_frame->frame_rows()->ShadowCopy(node_manager_); + frame_ext->SetStart( + dynamic_cast(node_manager_->MakeFrameBound(node::BoundType::kPrecedingUnbound))); + frame_ext->SetEnd(dynamic_cast(node_manager_->MakeFrameBound(node::BoundType::kCurrent))); + new_frame->SetFrameRows(frame_ext); + } + if (new_frame->frame_range() != nullptr) { + auto *frame_ext = new_frame->frame_range()->ShadowCopy(node_manager_); + frame_ext->SetStart( + dynamic_cast(node_manager_->MakeFrameBound(node::BoundType::kPrecedingUnbound))); + frame_ext->SetEnd(dynamic_cast(node_manager_->MakeFrameBound(node::BoundType::kCurrent))); + new_frame->SetFrameRange(frame_ext); + } + + // create the new window based on the newly created frame + auto *new_win = window->ShadowCopy(node_manager_); + new_win->SetFrame(new_frame); + return new_win; +} } // namespace plan } // namespace hybridse diff --git a/hybridse/src/plan/planner.h b/hybridse/src/plan/planner.h index df3ab00732d..7f24545a16f 100644 --- a/hybridse/src/plan/planner.h +++ b/hybridse/src/plan/planner.h @@ -86,9 +86,23 @@ class Planner { std::string MakeTableName(const PlanNode *node) const; base::Status MergeProjectMap(const std::map &map, - std::map *output); + std::map *output, int* win_id); private: + // iterate the {in} map and make a new map from that, but sperate projects into two ProjectListNode if there are any + // lag/at in the project list list + // it is used to sperate project list nodes which is at/lag/lead call expr, see #1554 + std::map FilterWindowFrameBoundNonRelativeProjects( + const std::map &in, int* win_id); + + // split the give ProjectListNode {in} into two ProjectListNode with the given {condition}, + // true results into {trues}, false results into {falses} + void SplitProjectList(const node::ProjectListNode *in, std::function predicate, + node::ProjectListNode **trues, node::ProjectListNode **falses) const; + + // create a copy (shallow) of {in} where it's frame is [unbound preceding, current row] + node::WindowDefNode* CreateWindowUnboundPrecedingAndCurrent(const node::WindowDefNode* in) const; + const std::unordered_map* extra_options_ = nullptr; std::set long_windows_; }; diff --git a/hybridse/src/udf/default_defs/window_functions_def.cc b/hybridse/src/udf/default_defs/window_functions_def.cc index 2c5eab93374..03aa9c39015 100644 --- a/hybridse/src/udf/default_defs/window_functions_def.cc +++ b/hybridse/src/udf/default_defs/window_functions_def.cc @@ -39,7 +39,7 @@ void AtList(::hybridse::codec::ListRef* list_ref, int64_t pos, V* v, *v = V(DataTypeTrait::zero_value()); return; } - auto list = (codec::ListV*)(list_ref->list); + auto list = reinterpret_cast*>(list_ref->list); auto column = dynamic_cast*>(list); if (column != nullptr) { auto row = column->root()->At(pos); @@ -95,22 +95,29 @@ template void RegisterBaseListAt(UdfLibrary* lib) { lib->RegisterExternal("at") .doc(R"( - @brief Returns the value of expression from the offset-th row of the ordered partition. + @brief Returns value evaluated at the row that is offset rows before the current row within the partition. Offset is evaluated with respect to the current row - @param offset The number of rows forward from the current row from which to obtain the value. + @param offset The number of rows forwarded from the current row, must not negative Example: - |value| - |--| - |0| - |1| - |2| - |3| - |4| + |c1|c2| + |--|--| + |0 | 1| + |1 | 1| + |2 | 2| + |3 | 2| + |4 | 2| @code{.sql} - SELECT at(value, 3) OVER w; - -- output 3 + SELECT at(c1, 1) as co OVER w from t1 window (order by c1 partition by c2); + -- output + -- | co | + -- |----| + -- |NULL| + -- |0 | + -- |NULL| + -- |2 | + -- |3 | @endcode )") .args, int64_t>(reinterpret_cast(AtList)) diff --git a/hybridse/src/vm/transform.cc b/hybridse/src/vm/transform.cc index 43df2793683..d217d066cc3 100644 --- a/hybridse/src/vm/transform.cc +++ b/hybridse/src/vm/transform.cc @@ -454,7 +454,7 @@ Status BatchModeTransformer::TransformProjectPlanOpWindowSerial( dynamic_cast( node->project_list_vec_[0]); auto project_list = node_manager_->MakeProjectListPlanNode( - first_project_list->w_ptr_, first_project_list->has_agg_project_); + first_project_list->GetW(), first_project_list->HasAggProject()); uint32_t pos = 0; for (auto iter = node->pos_mapping_.cbegin(); iter != node->pos_mapping_.cend(); iter++) { @@ -1205,14 +1205,12 @@ Status BatchModeTransformer::CreatePhysicalProjectNode( CHECK_TRUE(nullptr == having_condition, kPlanError, "Can't support having clause and window clause simultaneously") CHECK_STATUS(CreateOp( - &window_agg_op, depend, column_projects, - WindowOp(project_list->w_ptr_), - project_list->w_ptr_->instance_not_in_window(), append_input, - project_list->w_ptr_->exclude_current_time())); - if (!project_list->w_ptr_->union_tables().empty()) { - for (auto iter = project_list->w_ptr_->union_tables().cbegin(); - iter != project_list->w_ptr_->union_tables().cend(); - iter++) { + &window_agg_op, depend, column_projects, WindowOp(project_list->GetW()), + project_list->GetW()->instance_not_in_window(), append_input, + project_list->GetW()->exclude_current_time())); + if (!project_list->GetW()->union_tables().empty()) { + for (auto iter = project_list->GetW()->union_tables().cbegin(); + iter != project_list->GetW()->union_tables().cend(); iter++) { PhysicalOpNode* union_table_op; CHECK_STATUS(TransformPlanOp(*iter, &union_table_op)); PhysicalRenameNode* rename_union_op = nullptr; @@ -1261,7 +1259,7 @@ Status BatchModeTransformer::TransformProjectOp(node::ProjectListNode* project_l if (project_list->HasAggProject()) { if (project_list->IsWindowProject()) { CHECK_STATUS( - CheckWindow(project_list->w_ptr_, depend->schemas_ctx())); + CheckWindow(project_list->GetW(), depend->schemas_ctx())); return CreatePhysicalProjectNode(kWindowAggregation, depend, project_list, append_input, output); @@ -2253,13 +2251,13 @@ Status RequestModeTransformer::TransformProjectOp( node::ProjectListNode* project_list, PhysicalOpNode* depend, bool append_input, PhysicalOpNode** output) { PhysicalOpNode* new_depend = depend; - if (nullptr != project_list->w_ptr_) { + if (nullptr != project_list->GetW()) { CHECK_STATUS( - TransformWindowOp(depend, project_list->w_ptr_, &new_depend)); + TransformWindowOp(depend, project_list->GetW(), &new_depend)); } switch (new_depend->GetOutputType()) { case kSchemaTypeRow: - CHECK_TRUE(!project_list->has_agg_project_, kPlanError, "Non-support aggregation project on request row") + CHECK_TRUE(!project_list->HasAggProject(), kPlanError, "Non-support aggregation project on request row") return CreatePhysicalProjectNode( kRowProject, new_depend, project_list, append_input, output); case kSchemaTypeGroup: @@ -2267,7 +2265,7 @@ Status RequestModeTransformer::TransformProjectOp( project_list, append_input, output); case kSchemaTypeTable: - if (project_list->has_agg_project_) { + if (project_list->HasAggProject()) { return CreatePhysicalProjectNode(kAggregation, new_depend, project_list, append_input, output);