Skip to content

Commit

Permalink
fix(4paradigm#1554): lag results always evaluated with respect to cur…
Browse files Browse the repository at this point in the history
…rent row

- logic plan: for lag/at project, it will create a new `ProjectListNode`
  with window frame bound to [unbound, current row]
- the fix may not work in batch-request or cluster environment
  • Loading branch information
aceforeverd committed May 5, 2022
1 parent 3275a1b commit df949cb
Show file tree
Hide file tree
Showing 12 changed files with 409 additions and 118 deletions.
122 changes: 102 additions & 20 deletions cases/function/function/test_udaf_function.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1918,6 +1918,8 @@ cases:
- id: 46
desc: window lag functions
sqlDialect: ["HybridSQL"]
# FIXME(ace): cluster & batch-request mode
mode: cluster-unsupport, batch-request-unsupport
inputs:
-
columns : ["id int","pk bigint","c1 string","c2 int","c3 bigint","c4 float",
Expand Down Expand Up @@ -1946,26 +1948,25 @@ cases:
lag(c7, 0) OVER w1 as m13,
lag(c7, 2) OVER w1 as m14,
lag(c8, 0) OVER w1 as m15,
lag(c8, 2) OVER w1 as m16,
lag(pk, -1) OVER w1 as m17
lag(c8, 2) OVER w1 as m16
FROM {0} WINDOW
w1 AS (PARTITION BY {0}.pk ORDER BY {0}.c6 ROWS BETWEEN 2 PRECEDING AND CURRENT ROW);
expect:
order: id
columns: ["id int","m1 string", "m2 string", "m3 int", "m4 int", "m5 bigint", "m6 bigint",
"m7 float", "m8 float", "m9 double", "m10 double",
"m11 timestamp", "m12 timestamp", "m13 date", "m14 date", "m15 bool", "m16 bool", "m17 bigint"]
"m11 timestamp", "m12 timestamp", "m13 date", "m14 date", "m15 bool", "m16 bool"]
rows:
- [1, "a", NULL, 1, NULL, 30, NULL, 1.1, NULL, 2.1, NULL,
1590738990000, NULL, "2020-05-01", NULL, true, NULL, NULL]
1590738990000, NULL, "2020-05-01", NULL, true, NULL]
- [2, "c", NULL, 4, NULL, 33, NULL, 1.4, NULL, 2.4, NULL,
1590738991000, NULL, "2020-05-03", NULL, false, NULL, NULL]
1590738991000, NULL, "2020-05-03", NULL, false, NULL]
- [3, "b", "a", 3, 1, 32, 30, 1.3, 1.1, 2.3, 2.1,
1590738992000, 1590738990000, "2020-05-02", "2020-05-01", true, true, NULL]
1590738992000, 1590738990000, "2020-05-02", "2020-05-01", true, true]
- [4, NULL, "c", NULL, 4, NULL, 33, NULL, 1.4, NULL, 2.4,
1590738993000, 1590738991000, NULL, "2020-05-03", NULL, false, NULL]
1590738993000, 1590738991000, NULL, "2020-05-03", NULL, false]
- [5, "d", "b", 5, 3, 35, 32, 1.5, 1.3, 2.5, 2.3,
1590738994000, 1590738992000, "2020-05-04", "2020-05-02", false, true, NULL]
1590738994000, 1590738992000, "2020-05-04", "2020-05-02", false, true]

- id: 47
desc: count where value equals first value
Expand Down Expand Up @@ -2303,18 +2304,20 @@ cases:
- [4,"aa",1,1,30,1.1,2.1,1590738990000,"2020-05-01","a"]
- id: 56
desc: window at functions, at is synonym to lag
# FIXME(ace): cluster & batch-request mode
mode: cluster-unsupport, batch-request-unsupport
sqlDialect: ["HybridSQL"]
inputs:
-
columns : ["id int","pk bigint","c1 string","c2 int","c3 bigint","c4 float",
"c5 double","c6 timestamp","c7 date","c8 bool"]
indexs: ["index1:pk:c6"]
rows:
- [1, 1, "a", 1, 30, 1.1, 2.1, 1590738990000, "2020-05-01", true]
- [2, 1, "c", 4, 33, 1.4, 2.4, 1590738991000, "2020-05-03", false]
- [3, 1, "b", 3, 32, 1.3, 2.3, 1590738992000, "2020-05-02", true,]
- [1, 1, "a", 1, 30, 1.1, 2.1, 1590738990000, "2020-05-01", true]
- [2, 1, "c", 4, 33, 1.4, 2.4, 1590738991000, "2020-05-03", false]
- [3, 1, "b", 3, 32, 1.3, 2.3, 1590738992000, "2020-05-02", true,]
- [4, 1, NULL, NULL, NULL, NULL, NULL, 1590738993000, NULL, NULL]
- [5, 1, "d", 5, 35, 1.5, 2.5, 1590738994000, "2020-05-04", false]
- [5, 1, "d", 5, 35, 1.5, 2.5, 1590738994000, "2020-05-04", false]
sql: |
SELECT {0}.id,
at(c1, 0) OVER w1 as m1,
Expand All @@ -2332,24 +2335,103 @@ cases:
at(c7, 0) OVER w1 as m13,
at(c7, 2) OVER w1 as m14,
at(c8, 0) OVER w1 as m15,
at(c8, 2) OVER w1 as m16,
at(pk, -1) OVER w1 as m17
at(c8, 2) OVER w1 as m16
FROM {0} WINDOW
w1 AS (PARTITION BY {0}.pk ORDER BY {0}.c6 ROWS BETWEEN 2 PRECEDING AND CURRENT ROW);
expect:
order: id
columns: ["id int","m1 string", "m2 string", "m3 int", "m4 int", "m5 bigint", "m6 bigint",
"m7 float", "m8 float", "m9 double", "m10 double",
"m11 timestamp", "m12 timestamp", "m13 date", "m14 date", "m15 bool", "m16 bool", "m17 bigint"]
"m11 timestamp", "m12 timestamp", "m13 date", "m14 date", "m15 bool", "m16 bool"]
rows:
- [1, "a", NULL, 1, NULL, 30, NULL, 1.1, NULL, 2.1, NULL,
1590738990000, NULL, "2020-05-01", NULL, true, NULL, NULL]
1590738990000, NULL, "2020-05-01", NULL, true, NULL]
- [2, "c", NULL, 4, NULL, 33, NULL, 1.4, NULL, 2.4, NULL,
1590738991000, NULL, "2020-05-03", NULL, false, NULL, NULL]
1590738991000, NULL, "2020-05-03", NULL, false, NULL]
- [3, "b", "a", 3, 1, 32, 30, 1.3, 1.1, 2.3, 2.1,
1590738992000, 1590738990000, "2020-05-02", "2020-05-01", true, true, NULL]
1590738992000, 1590738990000, "2020-05-02", "2020-05-01", true, true]
- [4, NULL, "c", NULL, 4, NULL, 33, NULL, 1.4, NULL, 2.4,
1590738993000, 1590738991000, NULL, "2020-05-03", NULL, false, NULL]
1590738993000, 1590738991000, NULL, "2020-05-03", NULL, false]
- [5, "d", "b", 5, 3, 35, 32, 1.5, 1.3, 2.5, 2.3,
1590738994000, 1590738992000, "2020-05-04", "2020-05-02", false, true, NULL]
1590738994000, 1590738992000, "2020-05-04", "2020-05-02", false, true]

- id: 57
desc: |
correctness for at/lag when offset out-of-range window frame bound.
keynote, lag returns value evaluated at the row that is offset rows before the current row within the partition.
refer https://github.com/4paradigm/OpenMLDB/issues/1554
inputs:
- columns: [ "id int","ts timestamp","group1 string","val1 int" ]
indexs: [ "index1:group1:ts" ]
name: t1
data: |
1, 1612130400000, g1, 1
2, 1612130401000, g1, 2
3, 1612130402000, g1, 3
4, 1612130403000, g1, 4
5, 1612130404000, g1, 5
6, 1612130404000, g2, 4
7, 1612130405000, g2, 3
8, 1612130406000, g2, 2
sql: |
select
`id`,
`val1`,
lag(val1, 0) over w1 as agg1,
lag(val1, 1) over w1 as agg2,
lag(val1, 3) over w1 as agg3
from `t1` WINDOW
w1 as (partition by `group1` order by `ts` rows_range between 2s preceding and 1s preceding MAXSIZE 10);
expect:
columns: ["id int", "val1 int", "agg1 int", "agg2 int", "agg3 int"]
order: id
data: |
1, 1, 1, NULL, NULL
2, 2, 2, 1, NULL
3, 3, 3, 2, NULL
4, 4, 4, 3, 1
5, 5, 5, 4, 2
6, 4, 4, NULL, NULL
7, 3, 3, 4, NULL
8, 2, 2, 3, NULL
- id: 58
desc: |
correctness for at/lag when offset out-of-range window frame bound, together with other window function.
refer https://github.com/4paradigm/OpenMLDB/issues/1554
# FIXME(ace): request/cluster run do not meet the expects
mode: request-unsupport,cluster-unsupport
inputs:
- columns: [ "id int","ts timestamp","group1 string","val1 int" ]
indexs: [ "index1:group1:ts" ]
name: t1
data: |
1, 1612130400000, g1, 1
2, 1612130401000, g1, 2
3, 1612130402000, g1, 3
4, 1612130403000, g1, 4
5, 1612130404000, g1, 5
6, 1612130404000, g2, 4
7, 1612130405000, g2, 3
8, 1612130406000, g2, 2
sql: |
select
`id`,
`val1`,
lag(val1, 0) over w1 as agg1,
lag(val1, 3) over w1 as agg2,
first_value(val1) over w1 as agg3
from `t1` WINDOW
w1 as (partition by `group1` order by `ts` rows_range between 2s preceding and 1s preceding MAXSIZE 10);
expect:
columns: ["id int", "val1 int", "agg1 int", "agg2 int", "agg3 int"]
order: id
data: |
1, 1, 1, NULL, NULL
2, 2, 2, NULL, 1
3, 3, 3, NULL, 2
4, 4, 4, 1, 3
5, 5, 5, 2, 4
6, 4, 4, NULL, NULL
7, 3, 3, NULL, 4
8, 2, 2, NULL, 3
4 changes: 2 additions & 2 deletions hybridse/include/codec/list_iterator_codec.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ template <class V, class R>
class WrapListImpl : public ListV<V> {
public:
WrapListImpl() : ListV<V>() {}
~WrapListImpl() {}
~WrapListImpl() override {}
virtual const V GetFieldUnsafe(const R &row) const = 0;
virtual void GetField(const R &row, V *, bool *) const = 0;
virtual const bool IsNull(const R &row) const = 0;
Expand All @@ -67,7 +67,7 @@ class ColumnImpl : public WrapListImpl<V, Row> {
col_idx_(col_idx),
offset_(offset) {}

~ColumnImpl() {}
~ColumnImpl() override {}

const V GetFieldUnsafe(const Row &row) const override {
V value;
Expand Down
2 changes: 1 addition & 1 deletion hybridse/include/node/node_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class NodeManager {
PlanNode *MakeMultiPlanNode(const PlanType &type);
PlanNode *MakeMergeNode(int column_size);
WindowPlanNode *MakeWindowPlanNode(int w_id);
ProjectListNode *MakeProjectListPlanNode(const WindowPlanNode *w,
ProjectListNode *MakeProjectListPlanNode(WindowPlanNode *w,
const bool need_agg);
FilterPlanNode *MakeFilterPlanNode(PlanNode *node,
const ExprNode *condition);
Expand Down
51 changes: 29 additions & 22 deletions hybridse/include/node/plan_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,16 @@
#ifndef HYBRIDSE_INCLUDE_NODE_PLAN_NODE_H_
#define HYBRIDSE_INCLUDE_NODE_PLAN_NODE_H_

#include <glog/logging.h>
#include <list>
#include <memory>
#include <string>
#include <utility>
#include <vector>

#include "glog/logging.h"
#include "node/node_enum.h"
#include "node/sql_node.h"

namespace hybridse {
namespace node {

Expand Down Expand Up @@ -232,7 +234,7 @@ class ProjectNode : public LeafPlanNode {
node::ExprNode *GetExpression() const { return expression_; }
void SetExpression(node::ExprNode *expr) { expression_ = expr; }
node::FrameNode *frame() const { return frame_; }
void set_frame(node::FrameNode *frame) { frame_ = frame; }
void set_frame(FrameNode* frame) { frame_ = frame; }
virtual bool Equals(const PlanNode *node) const;
const bool IsAgg() const { return is_aggregation_; }

Expand All @@ -248,10 +250,10 @@ class WindowPlanNode : public LeafPlanNode {
public:
explicit WindowPlanNode(int id)
: LeafPlanNode(kPlanTypeWindow),
id(id),
id_(id),
exclude_current_time_(false),
instance_not_in_window_(false),
name(""),
name_(""),
keys_(nullptr),
orders_(nullptr) {}
~WindowPlanNode() {}
Expand All @@ -264,9 +266,10 @@ class WindowPlanNode : public LeafPlanNode {
const OrderByNode *GetOrders() const { return orders_; }
void SetKeys(ExprListNode *keys) { keys_ = keys; }
void SetOrders(OrderByNode *orders) { orders_ = orders; }
const std::string &GetName() const { return name; }
void SetName(const std::string &name) { WindowPlanNode::name = name; }
const int GetId() const { return id; }
const std::string &GetName() const { return name_; }
void SetName(const std::string &name) { name_ = name; }
int GetId() const { return id_; }
void SetId(int id) { id_ = id; }
void AddUnionTable(PlanNode *node) { return union_tables_.push_back(node); }
const PlanNodeList &union_tables() const { return union_tables_; }
const bool instance_not_in_window() const { return instance_not_in_window_; }
Expand All @@ -276,10 +279,10 @@ class WindowPlanNode : public LeafPlanNode {
virtual bool Equals(const PlanNode *node) const;

private:
int id;
int id_;
bool exclude_current_time_;
bool instance_not_in_window_;
std::string name;
std::string name_;
FrameNode *frame_node_;
ExprListNode *keys_;
OrderByNode *orders_;
Expand All @@ -294,47 +297,51 @@ class ProjectListNode : public LeafPlanNode {
has_agg_project_(false),
w_ptr_(nullptr),
having_condition_(nullptr),
projects({}) {}
ProjectListNode(const WindowPlanNode *w_ptr, const bool has_agg)
projects_({}) {}
ProjectListNode(WindowPlanNode *w_ptr, const bool has_agg)
: LeafPlanNode(kProjectList),
has_row_project_(false),
has_agg_project_(has_agg),
w_ptr_(w_ptr),
having_condition_(nullptr),
projects({}) {}
projects_({}) {}
~ProjectListNode() {}

void Print(std::ostream &output, const std::string &org_tab) const;

const PlanNodeList &GetProjects() const { return projects; }
const PlanNodeList &GetProjects() const { return projects_; }

void AddProject(ProjectNode *project) {
projects.push_back(project);
projects_.push_back(project);
if (project->IsAgg()) {
has_agg_project_ = true;
} else {
has_row_project_ = true;
}
}
const WindowPlanNode *GetW() const { return w_ptr_; }

WindowPlanNode *GetW() const { return w_ptr_; }
void SetW(WindowPlanNode* window) { w_ptr_ = window; }
const ExprNode* GetHavingCondition() const { return having_condition_;}
void SetHavingCondition(const node::ExprNode* having_condition) {
this->having_condition_ = having_condition;
}
const bool HasRowProject() const { return has_row_project_; }
const bool HasAggProject() const { return has_agg_project_; }
const bool IsWindowProject() const { return nullptr != w_ptr_; }
bool HasRowProject() const { return has_row_project_; }
bool HasAggProject() const { return has_agg_project_; }
bool IsWindowProject() const { return nullptr != w_ptr_; }
virtual bool Equals(const PlanNode *node) const;

static bool MergeProjectList(node::ProjectListNode *project_list1, node::ProjectListNode *project_list2,
node::ProjectListNode *merged_project);
bool has_row_project_;
bool has_agg_project_;
const WindowPlanNode *w_ptr_;

bool IsSimpleProjectList();

private:
bool has_row_project_;
bool has_agg_project_;
WindowPlanNode *w_ptr_;
const ExprNode* having_condition_;
PlanNodeList projects;
PlanNodeList projects_;
};

class ProjectPlanNode : public UnaryPlanNode {
Expand Down
Loading

0 comments on commit df949cb

Please sign in to comment.