Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

window aggregation -> window function; change formatter #634

Merged
merged 4 commits into from
Oct 25, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -328,9 +328,9 @@ set(P3_FILES
"src/include/execution/executors/sort_executor.h"
"src/include/execution/executors/topn_executor.h"
"src/include/execution/executors/update_executor.h"
"src/include/execution/executors/window_aggregate_executor.h"
"src/include/execution/executors/window_function_executor.h"
"src/execution/aggregation_executor.cpp"
"src/execution/window_aggregate_executor.cpp"
"src/execution/window_function_executor.cpp"
"src/execution/delete_executor.cpp"
"src/execution/filter_executor.cpp"
"src/execution/hash_join_executor.cpp"
Expand Down
2 changes: 1 addition & 1 deletion src/binder/bind_select.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -564,7 +564,7 @@ auto Binder::BindFuncCall(duckdb_libpgquery::PGFuncCall *root) -> std::unique_pt
}

if (function_name == "min" || function_name == "max" || function_name == "first" || function_name == "last" ||
function_name == "sum" || function_name == "count") {
function_name == "sum" || function_name == "count" || function_name == "rank") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we still need to have ROW_NUMBER here, but rewriting it to count(1) somewhere in planner?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed it to WindowFunctionExecutor because I think rank and row_number are not aggregations and in postgres docs, it is also named as window function.

About Row_number, I'll add the row_number function and change it to count(1) in planner.

// Rewrite count(*) to count_star().
if (function_name == "count" && children.empty()) {
function_name = "count_star";
Expand Down
2 changes: 1 addition & 1 deletion src/execution/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ add_library(
topn_check_executor.cpp
update_executor.cpp
values_executor.cpp
window_aggregate_executor.cpp
window_function_executor.cpp
)

set(ALL_OBJECT_FILES
Expand Down
6 changes: 3 additions & 3 deletions src/execution/executor_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
#include "execution/executors/topn_executor.h"
#include "execution/executors/update_executor.h"
#include "execution/executors/values_executor.h"
#include "execution/executors/window_aggregate_executor.h"
#include "execution/executors/window_function_executor.h"
#include "execution/plans/filter_plan.h"
#include "execution/plans/mock_scan_plan.h"
#include "execution/plans/projection_plan.h"
Expand Down Expand Up @@ -96,9 +96,9 @@ auto ExecutorFactory::CreateExecutor(ExecutorContext *exec_ctx, const AbstractPl
}

case PlanType::Window: {
auto window_plan = dynamic_cast<const WindowAggregationPlanNode *>(plan.get());
auto window_plan = dynamic_cast<const WindowFunctionPlanNode *>(plan.get());
auto child_executor = ExecutorFactory::CreateExecutor(exec_ctx, window_plan->GetChildPlan());
return std::make_unique<WindowAggregationExecutor>(exec_ctx, window_plan, std::move(child_executor));
return std::make_unique<WindowFunctionExecutor>(exec_ctx, window_plan, std::move(child_executor));
}

// Create a new nested-loop join executor
Expand Down
15 changes: 13 additions & 2 deletions src/execution/fmt_impl.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include <type_traits>
#include "execution/expressions/column_value_expression.h"
#include "execution/plans/update_plan.h"
#include "execution/plans/window_plan.h"
#include "fmt/format.h"
Expand Down Expand Up @@ -37,13 +38,23 @@ auto AggregationPlanNode::PlanNodeToString() const -> std::string {
return fmt::format("Agg {{ types={}, aggregates={}, group_by={} }}", agg_types_, aggregates_, group_bys_);
}

auto WindowAggregationPlanNode::PlanNodeToString() const -> std::string {
auto WindowFunctionPlanNode::PlanNodeToString() const -> std::string {
std::string columns_str;
for (const auto &col : columns_) {
const auto &col_val = dynamic_cast<const ColumnValueExpression &>(*col);
if (col_val.GetColIdx() == static_cast<uint32_t>(-1)) {
columns_str += "placeholder, ";
continue;
}
columns_str += col->ToString() + ", ";
}

std::vector<std::string> map_content;
map_content.reserve(window_functions_.size());
for (const auto &[k, v] : window_functions_) {
map_content.emplace_back(fmt::format(" {}=>{}", k, v));
}
return fmt::format("WindowAgg {{\n columns={},\n window_functions={{\n{}\n }}\n}}", columns_,
return fmt::format("WindowFunc {{\n columns={},\n window_functions={{\n{}\n }}\n}}", columns_str,
fmt::join(map_content, ",\n"));
}

Expand Down
2 changes: 1 addition & 1 deletion src/execution/plan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ auto AggregationPlanNode::InferAggSchema(const std::vector<AbstractExpressionRef
return Schema(output);
}

auto WindowAggregationPlanNode::InferWindowSchema(const std::vector<AbstractExpressionRef> &columns) -> Schema {
auto WindowFunctionPlanNode::InferWindowSchema(const std::vector<AbstractExpressionRef> &columns) -> Schema {
std::vector<Column> output;
// TODO(avery): correctly infer window call return type
for (const auto &column : columns) {
Expand Down
16 changes: 0 additions & 16 deletions src/execution/window_aggregate_executor.cpp

This file was deleted.

14 changes: 14 additions & 0 deletions src/execution/window_function_executor.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#include "execution/executors/window_function_executor.h"
#include "execution/plans/window_plan.h"
#include "storage/table/tuple.h"

namespace bustub {

WindowFunctionExecutor::WindowFunctionExecutor(ExecutorContext *exec_ctx, const WindowFunctionPlanNode *plan,
std::unique_ptr<AbstractExecutor> &&child_executor)
: AbstractExecutor(exec_ctx), plan_(plan), child_executor_(std::move(child_executor)) {}

void WindowFunctionExecutor::Init() { throw NotImplementedException("WindowFunctionExecutor is not implemented"); }

auto WindowFunctionExecutor::Next(Tuple *tuple, RID *rid) -> bool { return false; }
} // namespace bustub
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
//
// BusTub
//
// window_aggregate_executor.h
// window_function_executor.h
//
// Identification: src/include/execution/executors/projection_executor.h
// Identification: src/include/execution/executors/window_function_executor.h
//
// Copyright (c) 2015-2022, Carnegie Mellon University Database Group
//
Expand All @@ -23,52 +23,52 @@
namespace bustub {

/**
* The WindowAggregationExecutor executor executes a window aggregation for columns using window function.
* The WindowFunctionExecutor executor executes a window function for columns using window function.
*
* Window Aggregation is different from normal aggregation as it outputs one row for each inputing rows,
* and can be combined with normal selected columns. The columns in WindowAggregationPlanNode contains both
* normal selected columns and placeholder columns for window aggregations.
* Window function is different from normal aggregation as it outputs one row for each inputing rows,
* and can be combined with normal selected columns. The columns in WindowFunctionPlanNode contains both
* normal selected columns and placeholder columns for window functions.
*
* For example, if we have a query like:
* SELECT 0.1, 0.2, SUM(0.3) OVER (PARTITION BY 0.2 ORDER BY 0.3), SUM(0.4) OVER (PARTITION BY 0.1 ORDER BY 0.2,0.3)
* FROM table;
*
* The WindowAggregationPlanNode contains following structure:
* The WindowFunctionPlanNode contains following structure:
* columns: std::vector<AbstractExpressionRef>{0.1, 0.2, 0.-1(placeholder), 0.-1(placeholder)}
* window_functions_: {
* 3: {
* partition_by: std::vector<AbstractExpressionRef>{0.2}
* order_by: std::vector<AbstractExpressionRef>{0.3}
* aggregate: std::vector<AbstractExpressionRef>{0.3}
* window_agg_type: WindowAggregationType::SumAggregate
* functions: std::vector<AbstractExpressionRef>{0.3}
* window_func_type: WindowFunctionType::SumAggregate
* }
* 4: {
* partition_by: std::vector<AbstractExpressionRef>{0.1}
* order_by: std::vector<AbstractExpressionRef>{0.2,0.3}
* aggregate: std::vector<AbstractExpressionRef>{0.4}
* window_agg_type: WindowAggregationType::SumAggregate
* functions: std::vector<AbstractExpressionRef>{0.4}
* window_func_type: WindowFunctionType::SumAggregate
* }
* }
*
* Your executor should use child executor and exprs in columns to produce selected columns except for window
* aggregation columns, and use window_agg_indexes, partition_bys, order_bys, aggregates and window_agg_types to
* generate window aggregation columns results. Directly use placeholders for window aggregation columns in columns is
* function columns, and use window_agg_indexes, partition_bys, order_bys, functionss and window_agg_types to
* generate window function columns results. Directly use placeholders for window function columns in columns is
* not allowed, as it contains invalid column id.
*
* Your windowAggregationExecutor does not need to support specified window frames (eg: 1 preceding and 1 following).
* Your WindowFunctionExecutor does not need to support specified window frames (eg: 1 preceding and 1 following).
* You can assume that all window frames are UNBOUNDED FOLLOWING AND CURRENT ROW when there is ORDER BY clause, and
* UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING when there is no ORDER BY clause.
*
*/
class WindowAggregationExecutor : public AbstractExecutor {
class WindowFunctionExecutor : public AbstractExecutor {
public:
/**
* Construct a new WindowAggregationExecutor instance.
* Construct a new WindowFunctionExecutor instance.
* @param exec_ctx The executor context
* @param plan The window aggregation plan to be executed
*/
WindowAggregationExecutor(ExecutorContext *exec_ctx, const WindowAggregationPlanNode *plan,
std::unique_ptr<AbstractExecutor> &&child_executor);
WindowFunctionExecutor(ExecutorContext *exec_ctx, const WindowFunctionPlanNode *plan,
std::unique_ptr<AbstractExecutor> &&child_executor);

/** Initialize the window aggregation */
void Init() override;
Expand All @@ -86,7 +86,7 @@ class WindowAggregationExecutor : public AbstractExecutor {

private:
/** The window aggregation plan node to be executed */
const WindowAggregationPlanNode *plan_;
const WindowFunctionPlanNode *plan_;

/** The child executor from which tuples are obtained */
std::unique_ptr<AbstractExecutor> child_executor_;
Expand Down
79 changes: 41 additions & 38 deletions src/include/execution/plans/window_plan.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,47 +27,47 @@

namespace bustub {

/** WindowAggregationType enumerates all the possible window aggregation functions in our system */
enum class WindowAggregationType { CountStarAggregate, CountAggregate, SumAggregate, MinAggregate, MaxAggregate };
/** WindowFunctionType enumerates all the possible window functions in our system */
enum class WindowFunctionType { CountStarAggregate, CountAggregate, SumAggregate, MinAggregate, MaxAggregate, Rank };

class WindowAggregationPlanNode : public AbstractPlanNode {
class WindowFunctionPlanNode : public AbstractPlanNode {
public:
/**
* Construct a new WindowAggregationPlanNode.
* Construct a new WindowFunctionPlanNode.
* @param output_schema The output format of this plan node
* @param child The child plan to aggregate data over
* @param window_agg_indexes The indexes of the window aggregation functions
* @param columns All columns include the placeholder for window aggregation functions
* @param partition_bys The partition by clause of the window aggregations
* @param order_bys The order by clause of the window aggregations
* @param aggregates The expressions that we are aggregating
* @param window_agg_types The types that we are aggregating
* @param window_func_indexes The indexes of the window functions
* @param columns All columns include the placeholder for window functions
* @param partition_bys The partition by clause of the window functions
* @param order_bys The order by clause of the window functions
* @param funcions The expressions that we are aggregating
* @param window_func_types The types that we are aggregating
*
* Window Aggregation is different from normal aggregation as it outputs one row for each inputing rows,
* and can be combined with normal selected columns. The columns in WindowAggregationPlanNode contains both
* and can be combined with normal selected columns. The columns in WindowFunctionPlanNode contains both
* normal selected columns and placeholder columns for window aggregations.
*
* For example, if we have a query like:
* SELECT 0.1, 0.2, SUM(0.3) OVER (PARTITION BY 0.2 ORDER BY 0.3), SUM(0.4) OVER (PARTITION BY 0.1 ORDER BY
* 0.2,0.3) FROM table;
*
* The WindowAggregationPlanNode should contains following structure:
* The WindowFunctionPlanNode should contains following structure:
* columns: std::vector<AbstractExpressionRef>{0.1, 0.2, 0.-1(placeholder), 0.-1(placeholder)}
* partition_bys: std::vector<std::vector<AbstractExpressionRef>>{{0.2}, {0.1}}
* order_bys: std::vector<std::vector<AbstractExpressionRef>>{{0.3}, {0.2,0.3}}
* aggregates: std::vector<AbstractExpressionRef>{0.3, 0.4}
* window_agg_types: std::vector<WindowAggregationType>{SumAggregate, SumAggregate}
* functions: std::vector<AbstractExpressionRef>{0.3, 0.4}
* window_func_types: std::vector<WindowFunctionType>{SumAggregate, SumAggregate}
*/
WindowAggregationPlanNode(SchemaRef output_schema, AbstractPlanNodeRef child,
std::vector<uint32_t> window_agg_indexes, std::vector<AbstractExpressionRef> columns,
std::vector<std::vector<AbstractExpressionRef>> partition_bys,
std::vector<std::vector<std::pair<OrderByType, AbstractExpressionRef>>> order_bys,
std::vector<AbstractExpressionRef> aggregates,
std::vector<WindowAggregationType> window_agg_types)
WindowFunctionPlanNode(SchemaRef output_schema, AbstractPlanNodeRef child, std::vector<uint32_t> window_func_indexes,
std::vector<AbstractExpressionRef> columns,
std::vector<std::vector<AbstractExpressionRef>> partition_bys,
std::vector<std::vector<std::pair<OrderByType, AbstractExpressionRef>>> order_bys,
std::vector<AbstractExpressionRef> functions,
std::vector<WindowFunctionType> window_func_types)
: AbstractPlanNode(std::move(output_schema), {std::move(child)}), columns_(std::move(columns)) {
for (uint32_t i = 0; i < window_agg_indexes.size(); i++) {
window_functions_[window_agg_indexes[i]] =
WindowFunction{aggregates[i], window_agg_types[i], partition_bys[i], order_bys[i]};
for (uint32_t i = 0; i < window_func_indexes.size(); i++) {
window_functions_[window_func_indexes[i]] =
WindowFunction{functions[i], window_func_types[i], partition_bys[i], order_bys[i]};
}
}

Expand All @@ -82,11 +82,11 @@ class WindowAggregationPlanNode : public AbstractPlanNode {

static auto InferWindowSchema(const std::vector<AbstractExpressionRef> &columns) -> Schema;

BUSTUB_PLAN_NODE_CLONE_WITH_CHILDREN(WindowAggregationPlanNode);
BUSTUB_PLAN_NODE_CLONE_WITH_CHILDREN(WindowFunctionPlanNode);

struct WindowFunction {
AbstractExpressionRef aggregate_;
WindowAggregationType type_;
AbstractExpressionRef function_;
WindowFunctionType type_;
std::vector<AbstractExpressionRef> partition_by_;
std::vector<std::pair<OrderByType, AbstractExpressionRef>> order_by_;
};
Expand All @@ -103,37 +103,40 @@ class WindowAggregationPlanNode : public AbstractPlanNode {
} // namespace bustub

template <>
struct fmt::formatter<bustub::WindowAggregationPlanNode::WindowFunction> : formatter<std::string> {
struct fmt::formatter<bustub::WindowFunctionPlanNode::WindowFunction> : formatter<std::string> {
template <typename FormatContext>
auto format(const bustub::WindowAggregationPlanNode::WindowFunction &x, FormatContext &ctx) const {
return formatter<std::string>::format(fmt::format("{{ aggregate={}, type={}, partition_by={}, order_by={} }}",
x.aggregate_, x.type_, x.partition_by_, x.order_by_),
auto format(const bustub::WindowFunctionPlanNode::WindowFunction &x, FormatContext &ctx) const {
return formatter<std::string>::format(fmt::format("{{ function_arg={}, type={}, partition_by={}, order_by={} }}",
x.function_, x.type_, x.partition_by_, x.order_by_),
ctx);
}
};

template <>
struct fmt::formatter<bustub::WindowAggregationType> : formatter<std::string> {
struct fmt::formatter<bustub::WindowFunctionType> : formatter<std::string> {
template <typename FormatContext>
auto format(bustub::WindowAggregationType c, FormatContext &ctx) const {
using bustub::WindowAggregationType;
auto format(bustub::WindowFunctionType c, FormatContext &ctx) const {
using bustub::WindowFunctionType;
std::string name = "unknown";
switch (c) {
case WindowAggregationType::CountStarAggregate:
case WindowFunctionType::CountStarAggregate:
name = "count_star";
break;
case WindowAggregationType::CountAggregate:
case WindowFunctionType::CountAggregate:
name = "count";
break;
case WindowAggregationType::SumAggregate:
case WindowFunctionType::SumAggregate:
name = "sum";
break;
case WindowAggregationType::MinAggregate:
case WindowFunctionType::MinAggregate:
name = "min";
break;
case WindowAggregationType::MaxAggregate:
case WindowFunctionType::MaxAggregate:
name = "max";
break;
case WindowFunctionType::Rank:
name = "rank";
break;
}
return formatter<std::string>::format(name, ctx);
}
Expand Down
2 changes: 1 addition & 1 deletion src/include/planner/planner.h
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ class Planner {
-> std::tuple<AggregationType, std::vector<AbstractExpressionRef>>;

auto GetWindowAggCallFromFactory(const std::string &func_name, std::vector<AbstractExpressionRef> args)
-> std::tuple<WindowAggregationType, std::vector<AbstractExpressionRef>>;
-> std::tuple<WindowFunctionType, std::vector<AbstractExpressionRef>>;

auto GetBinaryExpressionFromFactory(const std::string &op_name, AbstractExpressionRef left,
AbstractExpressionRef right) -> AbstractExpressionRef;
Expand Down
2 changes: 1 addition & 1 deletion src/planner/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ add_library(
plan_insert.cpp
plan_table_ref.cpp
plan_select.cpp
plan_window_aggregation.cpp
plan_window_function.cpp
planner.cpp)

set(ALL_OBJECT_FILES
Expand Down
Loading
Loading