Skip to content

Commit

Permalink
window aggregation -> window function; change formatter (#634)
Browse files Browse the repository at this point in the history
* window aggregation -> window function; change formatter

* fix

* format

* add row_number
  • Loading branch information
AveryQi115 authored Oct 25, 2023
1 parent f396b84 commit 94a3105
Show file tree
Hide file tree
Showing 14 changed files with 124 additions and 103 deletions.
4 changes: 2 additions & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -328,9 +328,9 @@ set(P3_FILES
"src/include/execution/executors/sort_executor.h"
"src/include/execution/executors/topn_executor.h"
"src/include/execution/executors/update_executor.h"
"src/include/execution/executors/window_aggregate_executor.h"
"src/include/execution/executors/window_function_executor.h"
"src/execution/aggregation_executor.cpp"
"src/execution/window_aggregate_executor.cpp"
"src/execution/window_function_executor.cpp"
"src/execution/delete_executor.cpp"
"src/execution/filter_executor.cpp"
"src/execution/hash_join_executor.cpp"
Expand Down
6 changes: 3 additions & 3 deletions src/binder/bind_select.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -564,9 +564,9 @@ auto Binder::BindFuncCall(duckdb_libpgquery::PGFuncCall *root) -> std::unique_pt
}

if (function_name == "min" || function_name == "max" || function_name == "first" || function_name == "last" ||
function_name == "sum" || function_name == "count") {
// Rewrite count(*) to count_star().
if (function_name == "count" && children.empty()) {
function_name == "sum" || function_name == "count" || function_name == "rank" || function_name == "row_number") {
// Rewrite row_number()/count(*) to count_star().
if ((function_name == "count" && children.empty()) || function_name == "row_number") {
function_name = "count_star";
}

Expand Down
2 changes: 1 addition & 1 deletion src/execution/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ add_library(
topn_check_executor.cpp
update_executor.cpp
values_executor.cpp
window_aggregate_executor.cpp
window_function_executor.cpp
)

set(ALL_OBJECT_FILES
Expand Down
6 changes: 3 additions & 3 deletions src/execution/executor_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
#include "execution/executors/topn_executor.h"
#include "execution/executors/update_executor.h"
#include "execution/executors/values_executor.h"
#include "execution/executors/window_aggregate_executor.h"
#include "execution/executors/window_function_executor.h"
#include "execution/plans/filter_plan.h"
#include "execution/plans/mock_scan_plan.h"
#include "execution/plans/projection_plan.h"
Expand Down Expand Up @@ -96,9 +96,9 @@ auto ExecutorFactory::CreateExecutor(ExecutorContext *exec_ctx, const AbstractPl
}

case PlanType::Window: {
auto window_plan = dynamic_cast<const WindowAggregationPlanNode *>(plan.get());
auto window_plan = dynamic_cast<const WindowFunctionPlanNode *>(plan.get());
auto child_executor = ExecutorFactory::CreateExecutor(exec_ctx, window_plan->GetChildPlan());
return std::make_unique<WindowAggregationExecutor>(exec_ctx, window_plan, std::move(child_executor));
return std::make_unique<WindowFunctionExecutor>(exec_ctx, window_plan, std::move(child_executor));
}

// Create a new nested-loop join executor
Expand Down
15 changes: 13 additions & 2 deletions src/execution/fmt_impl.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include <type_traits>
#include "execution/expressions/column_value_expression.h"
#include "execution/plans/update_plan.h"
#include "execution/plans/window_plan.h"
#include "fmt/format.h"
Expand Down Expand Up @@ -37,13 +38,23 @@ auto AggregationPlanNode::PlanNodeToString() const -> std::string {
return fmt::format("Agg {{ types={}, aggregates={}, group_by={} }}", agg_types_, aggregates_, group_bys_);
}

auto WindowAggregationPlanNode::PlanNodeToString() const -> std::string {
auto WindowFunctionPlanNode::PlanNodeToString() const -> std::string {
std::string columns_str;
for (const auto &col : columns_) {
const auto &col_val = dynamic_cast<const ColumnValueExpression &>(*col);
if (col_val.GetColIdx() == static_cast<uint32_t>(-1)) {
columns_str += "placeholder, ";
continue;
}
columns_str += col->ToString() + ", ";
}

std::vector<std::string> map_content;
map_content.reserve(window_functions_.size());
for (const auto &[k, v] : window_functions_) {
map_content.emplace_back(fmt::format(" {}=>{}", k, v));
}
return fmt::format("WindowAgg {{\n columns={},\n window_functions={{\n{}\n }}\n}}", columns_,
return fmt::format("WindowFunc {{\n columns={},\n window_functions={{\n{}\n }}\n}}", columns_str,
fmt::join(map_content, ",\n"));
}

Expand Down
2 changes: 1 addition & 1 deletion src/execution/plan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ auto AggregationPlanNode::InferAggSchema(const std::vector<AbstractExpressionRef
return Schema(output);
}

auto WindowAggregationPlanNode::InferWindowSchema(const std::vector<AbstractExpressionRef> &columns) -> Schema {
auto WindowFunctionPlanNode::InferWindowSchema(const std::vector<AbstractExpressionRef> &columns) -> Schema {
std::vector<Column> output;
// TODO(avery): correctly infer window call return type
for (const auto &column : columns) {
Expand Down
16 changes: 0 additions & 16 deletions src/execution/window_aggregate_executor.cpp

This file was deleted.

14 changes: 14 additions & 0 deletions src/execution/window_function_executor.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#include "execution/executors/window_function_executor.h"
#include "execution/plans/window_plan.h"
#include "storage/table/tuple.h"

namespace bustub {

WindowFunctionExecutor::WindowFunctionExecutor(ExecutorContext *exec_ctx, const WindowFunctionPlanNode *plan,
std::unique_ptr<AbstractExecutor> &&child_executor)
: AbstractExecutor(exec_ctx), plan_(plan), child_executor_(std::move(child_executor)) {}

void WindowFunctionExecutor::Init() { throw NotImplementedException("WindowFunctionExecutor is not implemented"); }

auto WindowFunctionExecutor::Next(Tuple *tuple, RID *rid) -> bool { return false; }
} // namespace bustub
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
//
// BusTub
//
// window_aggregate_executor.h
// window_function_executor.h
//
// Identification: src/include/execution/executors/projection_executor.h
// Identification: src/include/execution/executors/window_function_executor.h
//
// Copyright (c) 2015-2022, Carnegie Mellon University Database Group
//
Expand All @@ -23,52 +23,52 @@
namespace bustub {

/**
* The WindowAggregationExecutor executor executes a window aggregation for columns using window function.
* The WindowFunctionExecutor executor executes a window function for columns using window function.
*
* Window Aggregation is different from normal aggregation as it outputs one row for each inputing rows,
* and can be combined with normal selected columns. The columns in WindowAggregationPlanNode contains both
* normal selected columns and placeholder columns for window aggregations.
* Window function is different from normal aggregation as it outputs one row for each inputing rows,
* and can be combined with normal selected columns. The columns in WindowFunctionPlanNode contains both
* normal selected columns and placeholder columns for window functions.
*
* For example, if we have a query like:
* SELECT 0.1, 0.2, SUM(0.3) OVER (PARTITION BY 0.2 ORDER BY 0.3), SUM(0.4) OVER (PARTITION BY 0.1 ORDER BY 0.2,0.3)
* FROM table;
*
* The WindowAggregationPlanNode contains following structure:
* The WindowFunctionPlanNode contains following structure:
* columns: std::vector<AbstractExpressionRef>{0.1, 0.2, 0.-1(placeholder), 0.-1(placeholder)}
* window_functions_: {
* 3: {
* partition_by: std::vector<AbstractExpressionRef>{0.2}
* order_by: std::vector<AbstractExpressionRef>{0.3}
* aggregate: std::vector<AbstractExpressionRef>{0.3}
* window_agg_type: WindowAggregationType::SumAggregate
* functions: std::vector<AbstractExpressionRef>{0.3}
* window_func_type: WindowFunctionType::SumAggregate
* }
* 4: {
* partition_by: std::vector<AbstractExpressionRef>{0.1}
* order_by: std::vector<AbstractExpressionRef>{0.2,0.3}
* aggregate: std::vector<AbstractExpressionRef>{0.4}
* window_agg_type: WindowAggregationType::SumAggregate
* functions: std::vector<AbstractExpressionRef>{0.4}
* window_func_type: WindowFunctionType::SumAggregate
* }
* }
*
* Your executor should use child executor and exprs in columns to produce selected columns except for window
* aggregation columns, and use window_agg_indexes, partition_bys, order_bys, aggregates and window_agg_types to
* generate window aggregation columns results. Directly use placeholders for window aggregation columns in columns is
* function columns, and use window_agg_indexes, partition_bys, order_bys, functionss and window_agg_types to
* generate window function columns results. Directly use placeholders for window function columns in columns is
* not allowed, as it contains invalid column id.
*
* Your windowAggregationExecutor does not need to support specified window frames (eg: 1 preceding and 1 following).
* Your WindowFunctionExecutor does not need to support specified window frames (eg: 1 preceding and 1 following).
* You can assume that all window frames are UNBOUNDED FOLLOWING AND CURRENT ROW when there is ORDER BY clause, and
* UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING when there is no ORDER BY clause.
*
*/
class WindowAggregationExecutor : public AbstractExecutor {
class WindowFunctionExecutor : public AbstractExecutor {
public:
/**
* Construct a new WindowAggregationExecutor instance.
* Construct a new WindowFunctionExecutor instance.
* @param exec_ctx The executor context
* @param plan The window aggregation plan to be executed
*/
WindowAggregationExecutor(ExecutorContext *exec_ctx, const WindowAggregationPlanNode *plan,
std::unique_ptr<AbstractExecutor> &&child_executor);
WindowFunctionExecutor(ExecutorContext *exec_ctx, const WindowFunctionPlanNode *plan,
std::unique_ptr<AbstractExecutor> &&child_executor);

/** Initialize the window aggregation */
void Init() override;
Expand All @@ -86,7 +86,7 @@ class WindowAggregationExecutor : public AbstractExecutor {

private:
/** The window aggregation plan node to be executed */
const WindowAggregationPlanNode *plan_;
const WindowFunctionPlanNode *plan_;

/** The child executor from which tuples are obtained */
std::unique_ptr<AbstractExecutor> child_executor_;
Expand Down
79 changes: 41 additions & 38 deletions src/include/execution/plans/window_plan.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,47 +27,47 @@

namespace bustub {

/** WindowAggregationType enumerates all the possible window aggregation functions in our system */
enum class WindowAggregationType { CountStarAggregate, CountAggregate, SumAggregate, MinAggregate, MaxAggregate };
/** WindowFunctionType enumerates all the possible window functions in our system */
enum class WindowFunctionType { CountStarAggregate, CountAggregate, SumAggregate, MinAggregate, MaxAggregate, Rank };

class WindowAggregationPlanNode : public AbstractPlanNode {
class WindowFunctionPlanNode : public AbstractPlanNode {
public:
/**
* Construct a new WindowAggregationPlanNode.
* Construct a new WindowFunctionPlanNode.
* @param output_schema The output format of this plan node
* @param child The child plan to aggregate data over
* @param window_agg_indexes The indexes of the window aggregation functions
* @param columns All columns include the placeholder for window aggregation functions
* @param partition_bys The partition by clause of the window aggregations
* @param order_bys The order by clause of the window aggregations
* @param aggregates The expressions that we are aggregating
* @param window_agg_types The types that we are aggregating
* @param window_func_indexes The indexes of the window functions
* @param columns All columns include the placeholder for window functions
* @param partition_bys The partition by clause of the window functions
* @param order_bys The order by clause of the window functions
* @param funcions The expressions that we are aggregating
* @param window_func_types The types that we are aggregating
*
* Window Aggregation is different from normal aggregation as it outputs one row for each inputing rows,
* and can be combined with normal selected columns. The columns in WindowAggregationPlanNode contains both
* and can be combined with normal selected columns. The columns in WindowFunctionPlanNode contains both
* normal selected columns and placeholder columns for window aggregations.
*
* For example, if we have a query like:
* SELECT 0.1, 0.2, SUM(0.3) OVER (PARTITION BY 0.2 ORDER BY 0.3), SUM(0.4) OVER (PARTITION BY 0.1 ORDER BY
* 0.2,0.3) FROM table;
*
* The WindowAggregationPlanNode should contains following structure:
* The WindowFunctionPlanNode should contains following structure:
* columns: std::vector<AbstractExpressionRef>{0.1, 0.2, 0.-1(placeholder), 0.-1(placeholder)}
* partition_bys: std::vector<std::vector<AbstractExpressionRef>>{{0.2}, {0.1}}
* order_bys: std::vector<std::vector<AbstractExpressionRef>>{{0.3}, {0.2,0.3}}
* aggregates: std::vector<AbstractExpressionRef>{0.3, 0.4}
* window_agg_types: std::vector<WindowAggregationType>{SumAggregate, SumAggregate}
* functions: std::vector<AbstractExpressionRef>{0.3, 0.4}
* window_func_types: std::vector<WindowFunctionType>{SumAggregate, SumAggregate}
*/
WindowAggregationPlanNode(SchemaRef output_schema, AbstractPlanNodeRef child,
std::vector<uint32_t> window_agg_indexes, std::vector<AbstractExpressionRef> columns,
std::vector<std::vector<AbstractExpressionRef>> partition_bys,
std::vector<std::vector<std::pair<OrderByType, AbstractExpressionRef>>> order_bys,
std::vector<AbstractExpressionRef> aggregates,
std::vector<WindowAggregationType> window_agg_types)
WindowFunctionPlanNode(SchemaRef output_schema, AbstractPlanNodeRef child, std::vector<uint32_t> window_func_indexes,
std::vector<AbstractExpressionRef> columns,
std::vector<std::vector<AbstractExpressionRef>> partition_bys,
std::vector<std::vector<std::pair<OrderByType, AbstractExpressionRef>>> order_bys,
std::vector<AbstractExpressionRef> functions,
std::vector<WindowFunctionType> window_func_types)
: AbstractPlanNode(std::move(output_schema), {std::move(child)}), columns_(std::move(columns)) {
for (uint32_t i = 0; i < window_agg_indexes.size(); i++) {
window_functions_[window_agg_indexes[i]] =
WindowFunction{aggregates[i], window_agg_types[i], partition_bys[i], order_bys[i]};
for (uint32_t i = 0; i < window_func_indexes.size(); i++) {
window_functions_[window_func_indexes[i]] =
WindowFunction{functions[i], window_func_types[i], partition_bys[i], order_bys[i]};
}
}

Expand All @@ -82,11 +82,11 @@ class WindowAggregationPlanNode : public AbstractPlanNode {

static auto InferWindowSchema(const std::vector<AbstractExpressionRef> &columns) -> Schema;

BUSTUB_PLAN_NODE_CLONE_WITH_CHILDREN(WindowAggregationPlanNode);
BUSTUB_PLAN_NODE_CLONE_WITH_CHILDREN(WindowFunctionPlanNode);

struct WindowFunction {
AbstractExpressionRef aggregate_;
WindowAggregationType type_;
AbstractExpressionRef function_;
WindowFunctionType type_;
std::vector<AbstractExpressionRef> partition_by_;
std::vector<std::pair<OrderByType, AbstractExpressionRef>> order_by_;
};
Expand All @@ -103,37 +103,40 @@ class WindowAggregationPlanNode : public AbstractPlanNode {
} // namespace bustub

template <>
struct fmt::formatter<bustub::WindowAggregationPlanNode::WindowFunction> : formatter<std::string> {
struct fmt::formatter<bustub::WindowFunctionPlanNode::WindowFunction> : formatter<std::string> {
template <typename FormatContext>
auto format(const bustub::WindowAggregationPlanNode::WindowFunction &x, FormatContext &ctx) const {
return formatter<std::string>::format(fmt::format("{{ aggregate={}, type={}, partition_by={}, order_by={} }}",
x.aggregate_, x.type_, x.partition_by_, x.order_by_),
auto format(const bustub::WindowFunctionPlanNode::WindowFunction &x, FormatContext &ctx) const {
return formatter<std::string>::format(fmt::format("{{ function_arg={}, type={}, partition_by={}, order_by={} }}",
x.function_, x.type_, x.partition_by_, x.order_by_),
ctx);
}
};

template <>
struct fmt::formatter<bustub::WindowAggregationType> : formatter<std::string> {
struct fmt::formatter<bustub::WindowFunctionType> : formatter<std::string> {
template <typename FormatContext>
auto format(bustub::WindowAggregationType c, FormatContext &ctx) const {
using bustub::WindowAggregationType;
auto format(bustub::WindowFunctionType c, FormatContext &ctx) const {
using bustub::WindowFunctionType;
std::string name = "unknown";
switch (c) {
case WindowAggregationType::CountStarAggregate:
case WindowFunctionType::CountStarAggregate:
name = "count_star";
break;
case WindowAggregationType::CountAggregate:
case WindowFunctionType::CountAggregate:
name = "count";
break;
case WindowAggregationType::SumAggregate:
case WindowFunctionType::SumAggregate:
name = "sum";
break;
case WindowAggregationType::MinAggregate:
case WindowFunctionType::MinAggregate:
name = "min";
break;
case WindowAggregationType::MaxAggregate:
case WindowFunctionType::MaxAggregate:
name = "max";
break;
case WindowFunctionType::Rank:
name = "rank";
break;
}
return formatter<std::string>::format(name, ctx);
}
Expand Down
2 changes: 1 addition & 1 deletion src/include/planner/planner.h
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ class Planner {
-> std::tuple<AggregationType, std::vector<AbstractExpressionRef>>;

auto GetWindowAggCallFromFactory(const std::string &func_name, std::vector<AbstractExpressionRef> args)
-> std::tuple<WindowAggregationType, std::vector<AbstractExpressionRef>>;
-> std::tuple<WindowFunctionType, std::vector<AbstractExpressionRef>>;

auto GetBinaryExpressionFromFactory(const std::string &op_name, AbstractExpressionRef left,
AbstractExpressionRef right) -> AbstractExpressionRef;
Expand Down
2 changes: 1 addition & 1 deletion src/planner/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ add_library(
plan_insert.cpp
plan_table_ref.cpp
plan_select.cpp
plan_window_aggregation.cpp
plan_window_function.cpp
planner.cpp)

set(ALL_OBJECT_FILES
Expand Down
Loading

0 comments on commit 94a3105

Please sign in to comment.