Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(long window): support *_where in sql engine #2255

Merged
merged 40 commits into from
Aug 10, 2022
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
ed55dee
feat(plan): long window optimize count_where
aceforeverd Jul 21, 2022
435eeea
update constructor for request agg union node
aceforeverd Jul 22, 2022
c1e9b01
feat(runner): support count_where
aceforeverd Jul 24, 2022
d40d00d
test(long window): test execute count_where
aceforeverd Jul 24, 2022
3fe70a1
refactor
aceforeverd Jul 26, 2022
89c436a
fix: ConstNode::GetAs
aceforeverd Jul 26, 2022
d1f6382
refactor eval
aceforeverd Jul 29, 2022
0d6394c
fix: cond over agg row
aceforeverd Jul 29, 2022
79e8d7f
fix compile
aceforeverd Jul 29, 2022
75d400d
fix: tablet_catalog_test compile
aceforeverd Jul 29, 2022
8fc6707
style: improve code style
aceforeverd Jul 30, 2022
9b4c6b3
improve style
aceforeverd Jul 30, 2022
177d0a4
fix long_window_optimize
aceforeverd Jul 30, 2022
415c340
fix style
aceforeverd Jul 30, 2022
342ee6f
fix: cpplint
aceforeverd Aug 1, 2022
bcc2613
fix: compare over different type
aceforeverd Aug 2, 2022
25166e4
fix: eval expr with agg row
aceforeverd Aug 3, 2022
cf04d78
fix: sql_cmd_test for count_where(long_window)
aceforeverd Aug 4, 2022
3793b10
test: add case for long window count_where for range buckets
aceforeverd Aug 5, 2022
877589f
fix(runner window): avoid ts overflow if start_base = 0
aceforeverd Aug 5, 2022
b56007a
fix: agg rows not aligned which lead to incorrect results
aceforeverd Aug 6, 2022
1ca0bbf
fix: compile on mac
aceforeverd Aug 6, 2022
bfce512
Merge remote-tracking branch 'upstream/main' into feat-count-where
aceforeverd Aug 6, 2022
9bf8438
disable *_where op for rows window
aceforeverd Aug 7, 2022
530a56f
min/max/avg/sum_where op for long window
aceforeverd Aug 7, 2022
dbbc468
test(long window): add case for min/max/avg/sum_where
aceforeverd Aug 7, 2022
c3a6d62
test count_where(*)
aceforeverd Aug 7, 2022
17fe51a
fix: ddl_parser_test
aceforeverd Aug 8, 2022
ec6e4f5
fix: aggregator_test
aceforeverd Aug 8, 2022
f8244f8
fix(runner): correct handling dup key rows in long window
aceforeverd Aug 8, 2022
d6cf97d
ops add limit to *_where ops in long window
aceforeverd Aug 8, 2022
b758249
fix: sql_cluster_router
aceforeverd Aug 8, 2022
49662c8
update sql_cmd_test
aceforeverd Aug 9, 2022
1503847
Merge remote-tracking branch 'upstream/main' into feat-count-where
aceforeverd Aug 9, 2022
7d6796b
test(sql_cmd_test): update LongWindowSumWhere
aceforeverd Aug 9, 2022
63f6097
feat(runner): check max_size for pre agg rows
aceforeverd Aug 9, 2022
937771b
test(long_window): add fail tests for *_where
aceforeverd Aug 9, 2022
fcb33e9
add test for *_where over hdd table
aceforeverd Aug 9, 2022
52c027a
chore: rm wrong comments
aceforeverd Aug 9, 2022
2555d63
fix comment
aceforeverd Aug 9, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 9 additions & 12 deletions hybridse/examples/toydb/src/tablet/tablet_catalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -221,21 +221,18 @@ class TabletCatalog : public vm::Catalog {

bool AddTable(std::shared_ptr<TabletTableHandler> table);

std::shared_ptr<type::Database> GetDatabase(const std::string& db);
std::shared_ptr<type::Database> GetDatabase(const std::string& db) override;

std::shared_ptr<vm::TableHandler> GetTable(const std::string& db, const std::string& table_name) override;

std::shared_ptr<vm::TableHandler> GetTable(const std::string& db,
const std::string& table_name);
bool IndexSupport() override;

std::vector<vm::AggrTableInfo> GetAggrTables(
const std::string& base_db,
const std::string& base_table,
const std::string& aggr_func,
const std::string& aggr_col,
const std::string& partition_cols,
const std::string& order_col) override {
vm::AggrTableInfo info = {"aggr_" + base_table, "aggr_db", base_db, base_table,
aggr_func, aggr_col, partition_cols, order_col, "1000"};
std::vector<vm::AggrTableInfo> GetAggrTables(const std::string& base_db, const std::string& base_table,
const std::string& aggr_func, const std::string& aggr_col,
const std::string& partition_cols, const std::string& order_col,
const std::string& filter_col) override {
vm::AggrTableInfo info = {"aggr_" + base_table, "aggr_db", base_db, base_table, aggr_func, aggr_col,
partition_cols, order_col, "1000", filter_col};
return {info};
}

Expand Down
30 changes: 29 additions & 1 deletion hybridse/include/node/sql_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <unordered_map>
#include <vector>

#include "absl/status/statusor.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/string_view.h"
#include "boost/algorithm/string.hpp"
Expand Down Expand Up @@ -1074,6 +1075,33 @@ class ConstNode : public ExprNode {
}
}

// include 'udf/literal_traits.h' for Nullable lead to recursive include
// so `optional` is used for nullable info
template <typename T>
absl::StatusOr<std::optional<T>> GetAs() const {
if (IsNull()) {
return std::nullopt;
}

if constexpr (std::is_same_v<T, bool>) {
return GetBool();
} else if constexpr(std::is_same_v<T, int16_t>) {
return GetAsInt16();
} else if constexpr (std::is_same_v<T, int32_t>) {
return GetAsInt32();
} else if constexpr (std::is_same_v<T, int64_t>) {
return GetAsInt64();
} else if constexpr (std::is_same_v<T, float>) {
return GetAsFloat();
} else if constexpr (std::is_same_v<T, double>) {
return GetAsDouble();
} else if constexpr (std::is_same_v<T, std::string>) {
return GetAsString();
} else {
return absl::InvalidArgumentError("can't cast as T");
}
}

Status InferAttr(ExprAnalysisContext *ctx) override;
static ConstNode *CastFrom(ExprNode *node);

Expand Down Expand Up @@ -1618,7 +1646,7 @@ class ColumnRefNode : public ExprNode {

void SetRelationName(const std::string &relation_name) { relation_name_ = relation_name; }

std::string GetColumnName() const { return column_name_; }
const std::string &GetColumnName() const { return column_name_; }

void SetColumnName(const std::string &column_name) { column_name_ = column_name; }

Expand Down
15 changes: 7 additions & 8 deletions hybridse/include/vm/catalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,7 @@ struct AggrTableInfo {
std::string partition_cols;
std::string order_by_col;
std::string bucket_size;
std::string filter_col;

bool operator==(const AggrTableInfo& rhs) const {
return aggr_table == rhs.aggr_table &&
Expand All @@ -481,7 +482,8 @@ struct AggrTableInfo {
aggr_col == rhs.aggr_col &&
partition_cols == rhs.partition_cols &&
order_by_col == rhs.order_by_col &&
bucket_size == rhs.bucket_size;
bucket_size == rhs.bucket_size &&
filter_col == rhs.filter_col;
}
};

Expand Down Expand Up @@ -514,13 +516,10 @@ class Catalog {
return nullptr;
}

virtual std::vector<AggrTableInfo> GetAggrTables(
const std::string& base_db,
const std::string& base_table,
const std::string& aggr_func,
const std::string& aggr_col,
const std::string& partition_cols,
const std::string& order_col) {
virtual std::vector<AggrTableInfo> GetAggrTables(const std::string& base_db, const std::string& base_table,
const std::string& aggr_func, const std::string& aggr_col,
const std::string& partition_cols, const std::string& order_col,
const std::string& filter_col) {
return std::vector<AggrTableInfo>();
}
};
Expand Down
40 changes: 20 additions & 20 deletions hybridse/include/vm/physical_op.h
Original file line number Diff line number Diff line change
Expand Up @@ -785,7 +785,7 @@ class PhysicalReduceAggregationNode : public PhysicalProjectNode {
}
virtual ~PhysicalReduceAggregationNode() {}
base::Status InitSchema(PhysicalPlanContext *) override;
virtual void Print(std::ostream &output, const std::string &tab) const;
void Print(std::ostream &output, const std::string &tab) const override;
ConditionFilter having_condition_;
const PhysicalAggregationNode* orig_aggr_ = nullptr;
};
Expand Down Expand Up @@ -1500,26 +1500,25 @@ class PhysicalRequestAggUnionNode : public PhysicalOpNode {
PhysicalRequestAggUnionNode(PhysicalOpNode *request, PhysicalOpNode *raw, PhysicalOpNode *aggr,
const RequestWindowOp &window, const RequestWindowOp &aggr_window,
bool instance_not_in_window, bool exclude_current_time, bool output_request_row,
const node::FnDefNode *func, const node::ExprNode* agg_col)
const node::CallExprNode *project)
: PhysicalOpNode(kPhysicalOpRequestAggUnion, true),
window_(window),
agg_window_(aggr_window),
func_(func),
agg_col_(agg_col),
project_(project),
instance_not_in_window_(instance_not_in_window),
exclude_current_time_(exclude_current_time),
output_request_row_(output_request_row) {
output_type_ = kSchemaTypeTable;

fn_infos_.push_back(&window_.partition_.fn_info());
fn_infos_.push_back(&window_.sort_.fn_info());
fn_infos_.push_back(&window_.range_.fn_info());
fn_infos_.push_back(&window_.index_key_.fn_info());
AddFnInfo(&window_.partition_.fn_info());
AddFnInfo(&window_.sort_.fn_info());
AddFnInfo(&window_.range_.fn_info());
AddFnInfo(&window_.index_key_.fn_info());

fn_infos_.push_back(&agg_window_.partition_.fn_info());
fn_infos_.push_back(&agg_window_.sort_.fn_info());
fn_infos_.push_back(&agg_window_.range_.fn_info());
fn_infos_.push_back(&agg_window_.index_key_.fn_info());
AddFnInfo(&agg_window_.partition_.fn_info());
AddFnInfo(&agg_window_.sort_.fn_info());
AddFnInfo(&agg_window_.range_.fn_info());
AddFnInfo(&agg_window_.index_key_.fn_info());

AddProducers(request, raw, aggr);
}
Expand Down Expand Up @@ -1547,11 +1546,18 @@ class PhysicalRequestAggUnionNode : public PhysicalOpNode {

RequestWindowOp window_;
RequestWindowOp agg_window_;
const node::FnDefNode* func_ = nullptr;
const node::ExprNode* agg_col_;

// for long window, each node has only one projection node
const node::CallExprNode* project_;
const SchemasContext* parent_schema_context_ = nullptr;

private:
void AddProducers(PhysicalOpNode *request, PhysicalOpNode *raw, PhysicalOpNode *aggr) {
AddProducer(request);
AddProducer(raw);
AddProducer(aggr);
}

const bool instance_not_in_window_;
const bool exclude_current_time_;

Expand All @@ -1563,12 +1569,6 @@ class PhysicalRequestAggUnionNode : public PhysicalOpNode {
// `EXCLUDE CURRENT_ROW`
bool output_request_row_;

void AddProducers(PhysicalOpNode *request, PhysicalOpNode *raw, PhysicalOpNode *aggr) {
AddProducer(request);
AddProducer(raw);
AddProducer(aggr);
}

Schema agg_schema_;
};

Expand Down
11 changes: 4 additions & 7 deletions hybridse/include/vm/simple_catalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,13 +98,10 @@ class SimpleCatalog : public Catalog {
bool InsertRows(const std::string &db, const std::string &table,
const std::vector<Row> &row);

std::vector<AggrTableInfo> GetAggrTables(
const std::string& base_db,
const std::string& base_table,
const std::string& aggr_func,
const std::string& aggr_col,
const std::string& partition_cols,
const std::string& order_col) override;
std::vector<AggrTableInfo> GetAggrTables(const std::string &base_db, const std::string &base_table,
const std::string &aggr_func, const std::string &aggr_col,
const std::string &partition_cols, const std::string &order_col,
const std::string &filter_col) override;

private:
bool enable_index_;
Expand Down
Loading