diff --git a/CMakeLists.txt b/CMakeLists.txt index 79c8bf707..9eb3dc378 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -328,9 +328,9 @@ set(P3_FILES "src/include/execution/executors/sort_executor.h" "src/include/execution/executors/topn_executor.h" "src/include/execution/executors/update_executor.h" - "src/include/execution/executors/window_aggregate_executor.h" + "src/include/execution/executors/window_function_executor.h" "src/execution/aggregation_executor.cpp" - "src/execution/window_aggregate_executor.cpp" + "src/execution/window_function_executor.cpp" "src/execution/delete_executor.cpp" "src/execution/filter_executor.cpp" "src/execution/hash_join_executor.cpp" diff --git a/src/binder/bind_select.cpp b/src/binder/bind_select.cpp index a24910c10..e08867c3f 100644 --- a/src/binder/bind_select.cpp +++ b/src/binder/bind_select.cpp @@ -564,9 +564,9 @@ auto Binder::BindFuncCall(duckdb_libpgquery::PGFuncCall *root) -> std::unique_pt } if (function_name == "min" || function_name == "max" || function_name == "first" || function_name == "last" || - function_name == "sum" || function_name == "count") { - // Rewrite count(*) to count_star(). - if (function_name == "count" && children.empty()) { + function_name == "sum" || function_name == "count" || function_name == "rank" || function_name == "row_number") { + // Rewrite row_number()/count(*) to count_star(). + if ((function_name == "count" && children.empty()) || function_name == "row_number") { function_name = "count_star"; } diff --git a/src/execution/CMakeLists.txt b/src/execution/CMakeLists.txt index 4b3c8cdf5..cfffcf44c 100644 --- a/src/execution/CMakeLists.txt +++ b/src/execution/CMakeLists.txt @@ -22,7 +22,7 @@ add_library( topn_check_executor.cpp update_executor.cpp values_executor.cpp - window_aggregate_executor.cpp + window_function_executor.cpp ) set(ALL_OBJECT_FILES diff --git a/src/execution/executor_factory.cpp b/src/execution/executor_factory.cpp index f9981cf21..dbfff8b09 100644 --- a/src/execution/executor_factory.cpp +++ b/src/execution/executor_factory.cpp @@ -34,7 +34,7 @@ #include "execution/executors/topn_executor.h" #include "execution/executors/update_executor.h" #include "execution/executors/values_executor.h" -#include "execution/executors/window_aggregate_executor.h" +#include "execution/executors/window_function_executor.h" #include "execution/plans/filter_plan.h" #include "execution/plans/mock_scan_plan.h" #include "execution/plans/projection_plan.h" @@ -96,9 +96,9 @@ auto ExecutorFactory::CreateExecutor(ExecutorContext *exec_ctx, const AbstractPl } case PlanType::Window: { - auto window_plan = dynamic_cast(plan.get()); + auto window_plan = dynamic_cast(plan.get()); auto child_executor = ExecutorFactory::CreateExecutor(exec_ctx, window_plan->GetChildPlan()); - return std::make_unique(exec_ctx, window_plan, std::move(child_executor)); + return std::make_unique(exec_ctx, window_plan, std::move(child_executor)); } // Create a new nested-loop join executor diff --git a/src/execution/fmt_impl.cpp b/src/execution/fmt_impl.cpp index ad56bd9eb..ea3eb0b14 100644 --- a/src/execution/fmt_impl.cpp +++ b/src/execution/fmt_impl.cpp @@ -1,4 +1,5 @@ #include +#include "execution/expressions/column_value_expression.h" #include "execution/plans/update_plan.h" #include "execution/plans/window_plan.h" #include "fmt/format.h" @@ -37,13 +38,23 @@ auto AggregationPlanNode::PlanNodeToString() const -> std::string { return fmt::format("Agg {{ types={}, aggregates={}, group_by={} }}", agg_types_, aggregates_, group_bys_); } -auto WindowAggregationPlanNode::PlanNodeToString() const -> std::string { +auto WindowFunctionPlanNode::PlanNodeToString() const -> std::string { + std::string columns_str; + for (const auto &col : columns_) { + const auto &col_val = dynamic_cast(*col); + if (col_val.GetColIdx() == static_cast(-1)) { + columns_str += "placeholder, "; + continue; + } + columns_str += col->ToString() + ", "; + } + std::vector map_content; map_content.reserve(window_functions_.size()); for (const auto &[k, v] : window_functions_) { map_content.emplace_back(fmt::format(" {}=>{}", k, v)); } - return fmt::format("WindowAgg {{\n columns={},\n window_functions={{\n{}\n }}\n}}", columns_, + return fmt::format("WindowFunc {{\n columns={},\n window_functions={{\n{}\n }}\n}}", columns_str, fmt::join(map_content, ",\n")); } diff --git a/src/execution/plan_node.cpp b/src/execution/plan_node.cpp index 0f0043f63..47a0dfdff 100644 --- a/src/execution/plan_node.cpp +++ b/src/execution/plan_node.cpp @@ -80,7 +80,7 @@ auto AggregationPlanNode::InferAggSchema(const std::vector &columns) -> Schema { +auto WindowFunctionPlanNode::InferWindowSchema(const std::vector &columns) -> Schema { std::vector output; // TODO(avery): correctly infer window call return type for (const auto &column : columns) { diff --git a/src/execution/window_aggregate_executor.cpp b/src/execution/window_aggregate_executor.cpp deleted file mode 100644 index 123b109f8..000000000 --- a/src/execution/window_aggregate_executor.cpp +++ /dev/null @@ -1,16 +0,0 @@ -#include "execution/executors/window_aggregate_executor.h" -#include "execution/plans/window_plan.h" -#include "storage/table/tuple.h" - -namespace bustub { - -WindowAggregationExecutor::WindowAggregationExecutor(ExecutorContext *exec_ctx, const WindowAggregationPlanNode *plan, - std::unique_ptr &&child_executor) - : AbstractExecutor(exec_ctx), plan_(plan), child_executor_(std::move(child_executor)) {} - -void WindowAggregationExecutor::Init() { - throw NotImplementedException("WindowAggregationExecutor is not implemented"); -} - -auto WindowAggregationExecutor::Next(Tuple *tuple, RID *rid) -> bool { return false; } -} // namespace bustub diff --git a/src/execution/window_function_executor.cpp b/src/execution/window_function_executor.cpp new file mode 100644 index 000000000..768fa3d23 --- /dev/null +++ b/src/execution/window_function_executor.cpp @@ -0,0 +1,14 @@ +#include "execution/executors/window_function_executor.h" +#include "execution/plans/window_plan.h" +#include "storage/table/tuple.h" + +namespace bustub { + +WindowFunctionExecutor::WindowFunctionExecutor(ExecutorContext *exec_ctx, const WindowFunctionPlanNode *plan, + std::unique_ptr &&child_executor) + : AbstractExecutor(exec_ctx), plan_(plan), child_executor_(std::move(child_executor)) {} + +void WindowFunctionExecutor::Init() { throw NotImplementedException("WindowFunctionExecutor is not implemented"); } + +auto WindowFunctionExecutor::Next(Tuple *tuple, RID *rid) -> bool { return false; } +} // namespace bustub diff --git a/src/include/execution/executors/window_aggregate_executor.h b/src/include/execution/executors/window_function_executor.h similarity index 63% rename from src/include/execution/executors/window_aggregate_executor.h rename to src/include/execution/executors/window_function_executor.h index f9b4c7137..d8d033f7e 100644 --- a/src/include/execution/executors/window_aggregate_executor.h +++ b/src/include/execution/executors/window_function_executor.h @@ -2,9 +2,9 @@ // // BusTub // -// window_aggregate_executor.h +// window_function_executor.h // -// Identification: src/include/execution/executors/projection_executor.h +// Identification: src/include/execution/executors/window_function_executor.h // // Copyright (c) 2015-2022, Carnegie Mellon University Database Group // @@ -23,52 +23,52 @@ namespace bustub { /** - * The WindowAggregationExecutor executor executes a window aggregation for columns using window function. + * The WindowFunctionExecutor executor executes a window function for columns using window function. * - * Window Aggregation is different from normal aggregation as it outputs one row for each inputing rows, - * and can be combined with normal selected columns. The columns in WindowAggregationPlanNode contains both - * normal selected columns and placeholder columns for window aggregations. + * Window function is different from normal aggregation as it outputs one row for each inputing rows, + * and can be combined with normal selected columns. The columns in WindowFunctionPlanNode contains both + * normal selected columns and placeholder columns for window functions. * * For example, if we have a query like: * SELECT 0.1, 0.2, SUM(0.3) OVER (PARTITION BY 0.2 ORDER BY 0.3), SUM(0.4) OVER (PARTITION BY 0.1 ORDER BY 0.2,0.3) * FROM table; * - * The WindowAggregationPlanNode contains following structure: + * The WindowFunctionPlanNode contains following structure: * columns: std::vector{0.1, 0.2, 0.-1(placeholder), 0.-1(placeholder)} * window_functions_: { * 3: { * partition_by: std::vector{0.2} * order_by: std::vector{0.3} - * aggregate: std::vector{0.3} - * window_agg_type: WindowAggregationType::SumAggregate + * functions: std::vector{0.3} + * window_func_type: WindowFunctionType::SumAggregate * } * 4: { * partition_by: std::vector{0.1} * order_by: std::vector{0.2,0.3} - * aggregate: std::vector{0.4} - * window_agg_type: WindowAggregationType::SumAggregate + * functions: std::vector{0.4} + * window_func_type: WindowFunctionType::SumAggregate * } * } * * Your executor should use child executor and exprs in columns to produce selected columns except for window - * aggregation columns, and use window_agg_indexes, partition_bys, order_bys, aggregates and window_agg_types to - * generate window aggregation columns results. Directly use placeholders for window aggregation columns in columns is + * function columns, and use window_agg_indexes, partition_bys, order_bys, functionss and window_agg_types to + * generate window function columns results. Directly use placeholders for window function columns in columns is * not allowed, as it contains invalid column id. * - * Your windowAggregationExecutor does not need to support specified window frames (eg: 1 preceding and 1 following). + * Your WindowFunctionExecutor does not need to support specified window frames (eg: 1 preceding and 1 following). * You can assume that all window frames are UNBOUNDED FOLLOWING AND CURRENT ROW when there is ORDER BY clause, and * UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING when there is no ORDER BY clause. * */ -class WindowAggregationExecutor : public AbstractExecutor { +class WindowFunctionExecutor : public AbstractExecutor { public: /** - * Construct a new WindowAggregationExecutor instance. + * Construct a new WindowFunctionExecutor instance. * @param exec_ctx The executor context * @param plan The window aggregation plan to be executed */ - WindowAggregationExecutor(ExecutorContext *exec_ctx, const WindowAggregationPlanNode *plan, - std::unique_ptr &&child_executor); + WindowFunctionExecutor(ExecutorContext *exec_ctx, const WindowFunctionPlanNode *plan, + std::unique_ptr &&child_executor); /** Initialize the window aggregation */ void Init() override; @@ -86,7 +86,7 @@ class WindowAggregationExecutor : public AbstractExecutor { private: /** The window aggregation plan node to be executed */ - const WindowAggregationPlanNode *plan_; + const WindowFunctionPlanNode *plan_; /** The child executor from which tuples are obtained */ std::unique_ptr child_executor_; diff --git a/src/include/execution/plans/window_plan.h b/src/include/execution/plans/window_plan.h index 3ca0f602e..b67cf2380 100644 --- a/src/include/execution/plans/window_plan.h +++ b/src/include/execution/plans/window_plan.h @@ -27,47 +27,47 @@ namespace bustub { -/** WindowAggregationType enumerates all the possible window aggregation functions in our system */ -enum class WindowAggregationType { CountStarAggregate, CountAggregate, SumAggregate, MinAggregate, MaxAggregate }; +/** WindowFunctionType enumerates all the possible window functions in our system */ +enum class WindowFunctionType { CountStarAggregate, CountAggregate, SumAggregate, MinAggregate, MaxAggregate, Rank }; -class WindowAggregationPlanNode : public AbstractPlanNode { +class WindowFunctionPlanNode : public AbstractPlanNode { public: /** - * Construct a new WindowAggregationPlanNode. + * Construct a new WindowFunctionPlanNode. * @param output_schema The output format of this plan node * @param child The child plan to aggregate data over - * @param window_agg_indexes The indexes of the window aggregation functions - * @param columns All columns include the placeholder for window aggregation functions - * @param partition_bys The partition by clause of the window aggregations - * @param order_bys The order by clause of the window aggregations - * @param aggregates The expressions that we are aggregating - * @param window_agg_types The types that we are aggregating + * @param window_func_indexes The indexes of the window functions + * @param columns All columns include the placeholder for window functions + * @param partition_bys The partition by clause of the window functions + * @param order_bys The order by clause of the window functions + * @param funcions The expressions that we are aggregating + * @param window_func_types The types that we are aggregating * * Window Aggregation is different from normal aggregation as it outputs one row for each inputing rows, - * and can be combined with normal selected columns. The columns in WindowAggregationPlanNode contains both + * and can be combined with normal selected columns. The columns in WindowFunctionPlanNode contains both * normal selected columns and placeholder columns for window aggregations. * * For example, if we have a query like: * SELECT 0.1, 0.2, SUM(0.3) OVER (PARTITION BY 0.2 ORDER BY 0.3), SUM(0.4) OVER (PARTITION BY 0.1 ORDER BY * 0.2,0.3) FROM table; * - * The WindowAggregationPlanNode should contains following structure: + * The WindowFunctionPlanNode should contains following structure: * columns: std::vector{0.1, 0.2, 0.-1(placeholder), 0.-1(placeholder)} * partition_bys: std::vector>{{0.2}, {0.1}} * order_bys: std::vector>{{0.3}, {0.2,0.3}} - * aggregates: std::vector{0.3, 0.4} - * window_agg_types: std::vector{SumAggregate, SumAggregate} + * functions: std::vector{0.3, 0.4} + * window_func_types: std::vector{SumAggregate, SumAggregate} */ - WindowAggregationPlanNode(SchemaRef output_schema, AbstractPlanNodeRef child, - std::vector window_agg_indexes, std::vector columns, - std::vector> partition_bys, - std::vector>> order_bys, - std::vector aggregates, - std::vector window_agg_types) + WindowFunctionPlanNode(SchemaRef output_schema, AbstractPlanNodeRef child, std::vector window_func_indexes, + std::vector columns, + std::vector> partition_bys, + std::vector>> order_bys, + std::vector functions, + std::vector window_func_types) : AbstractPlanNode(std::move(output_schema), {std::move(child)}), columns_(std::move(columns)) { - for (uint32_t i = 0; i < window_agg_indexes.size(); i++) { - window_functions_[window_agg_indexes[i]] = - WindowFunction{aggregates[i], window_agg_types[i], partition_bys[i], order_bys[i]}; + for (uint32_t i = 0; i < window_func_indexes.size(); i++) { + window_functions_[window_func_indexes[i]] = + WindowFunction{functions[i], window_func_types[i], partition_bys[i], order_bys[i]}; } } @@ -82,11 +82,11 @@ class WindowAggregationPlanNode : public AbstractPlanNode { static auto InferWindowSchema(const std::vector &columns) -> Schema; - BUSTUB_PLAN_NODE_CLONE_WITH_CHILDREN(WindowAggregationPlanNode); + BUSTUB_PLAN_NODE_CLONE_WITH_CHILDREN(WindowFunctionPlanNode); struct WindowFunction { - AbstractExpressionRef aggregate_; - WindowAggregationType type_; + AbstractExpressionRef function_; + WindowFunctionType type_; std::vector partition_by_; std::vector> order_by_; }; @@ -103,37 +103,40 @@ class WindowAggregationPlanNode : public AbstractPlanNode { } // namespace bustub template <> -struct fmt::formatter : formatter { +struct fmt::formatter : formatter { template - auto format(const bustub::WindowAggregationPlanNode::WindowFunction &x, FormatContext &ctx) const { - return formatter::format(fmt::format("{{ aggregate={}, type={}, partition_by={}, order_by={} }}", - x.aggregate_, x.type_, x.partition_by_, x.order_by_), + auto format(const bustub::WindowFunctionPlanNode::WindowFunction &x, FormatContext &ctx) const { + return formatter::format(fmt::format("{{ function_arg={}, type={}, partition_by={}, order_by={} }}", + x.function_, x.type_, x.partition_by_, x.order_by_), ctx); } }; template <> -struct fmt::formatter : formatter { +struct fmt::formatter : formatter { template - auto format(bustub::WindowAggregationType c, FormatContext &ctx) const { - using bustub::WindowAggregationType; + auto format(bustub::WindowFunctionType c, FormatContext &ctx) const { + using bustub::WindowFunctionType; std::string name = "unknown"; switch (c) { - case WindowAggregationType::CountStarAggregate: + case WindowFunctionType::CountStarAggregate: name = "count_star"; break; - case WindowAggregationType::CountAggregate: + case WindowFunctionType::CountAggregate: name = "count"; break; - case WindowAggregationType::SumAggregate: + case WindowFunctionType::SumAggregate: name = "sum"; break; - case WindowAggregationType::MinAggregate: + case WindowFunctionType::MinAggregate: name = "min"; break; - case WindowAggregationType::MaxAggregate: + case WindowFunctionType::MaxAggregate: name = "max"; break; + case WindowFunctionType::Rank: + name = "rank"; + break; } return formatter::format(name, ctx); } diff --git a/src/include/planner/planner.h b/src/include/planner/planner.h index 63f740cf9..ac6d82993 100644 --- a/src/include/planner/planner.h +++ b/src/include/planner/planner.h @@ -140,7 +140,7 @@ class Planner { -> std::tuple>; auto GetWindowAggCallFromFactory(const std::string &func_name, std::vector args) - -> std::tuple>; + -> std::tuple>; auto GetBinaryExpressionFromFactory(const std::string &op_name, AbstractExpressionRef left, AbstractExpressionRef right) -> AbstractExpressionRef; diff --git a/src/planner/CMakeLists.txt b/src/planner/CMakeLists.txt index 68d73ef8b..3bf0e3d53 100644 --- a/src/planner/CMakeLists.txt +++ b/src/planner/CMakeLists.txt @@ -8,7 +8,7 @@ add_library( plan_insert.cpp plan_table_ref.cpp plan_select.cpp - plan_window_aggregation.cpp + plan_window_function.cpp planner.cpp) set(ALL_OBJECT_FILES diff --git a/src/planner/expression_factory.cpp b/src/planner/expression_factory.cpp index a44fdd242..6e93b2da0 100644 --- a/src/planner/expression_factory.cpp +++ b/src/planner/expression_factory.cpp @@ -6,6 +6,7 @@ #include "execution/expressions/comparison_expression.h" #include "execution/expressions/constant_value_expression.h" #include "execution/expressions/logic_expression.h" +#include "execution/plans/window_plan.h" #include "planner/planner.h" namespace bustub { @@ -37,25 +38,28 @@ auto Planner::GetAggCallFromFactory(const std::string &func_name, std::vector args) - -> std::tuple> { + -> std::tuple> { if (args.empty()) { if (func_name == "count_star") { - return {WindowAggregationType::CountStarAggregate, {}}; + return {WindowFunctionType::CountStarAggregate, {}}; + } + if (func_name == "rank") { + return {WindowFunctionType::Rank, {}}; } } if (args.size() == 1) { auto expr = std::move(args[0]); if (func_name == "min") { - return {WindowAggregationType::MinAggregate, {std::move(expr)}}; + return {WindowFunctionType::MinAggregate, {std::move(expr)}}; } if (func_name == "max") { - return {WindowAggregationType::MaxAggregate, {std::move(expr)}}; + return {WindowFunctionType::MaxAggregate, {std::move(expr)}}; } if (func_name == "sum") { - return {WindowAggregationType::SumAggregate, {std::move(expr)}}; + return {WindowFunctionType::SumAggregate, {std::move(expr)}}; } if (func_name == "count") { - return {WindowAggregationType::CountAggregate, {std::move(expr)}}; + return {WindowFunctionType::CountAggregate, {std::move(expr)}}; } } throw Exception(fmt::format("unsupported window_call {} with {} args", func_name, args.size())); diff --git a/src/planner/plan_window_aggregation.cpp b/src/planner/plan_window_function.cpp similarity index 84% rename from src/planner/plan_window_aggregation.cpp rename to src/planner/plan_window_function.cpp index 1678a3771..90fbd9905 100644 --- a/src/planner/plan_window_aggregation.cpp +++ b/src/planner/plan_window_function.cpp @@ -37,8 +37,8 @@ auto Planner::PlanSelectWindow(const SelectStatement &statement, AbstractPlanNod */ std::vector columns; std::vector column_names; - std::vector window_agg_indexes; - std::vector window_agg_types; + std::vector window_func_indexes; + std::vector window_func_types; std::vector> partition_by_exprs; std::vector>> order_by_exprs; std::vector arg_exprs; @@ -57,7 +57,7 @@ auto Planner::PlanSelectWindow(const SelectStatement &statement, AbstractPlanNod } // parse window function - window_agg_indexes.push_back(i); + window_func_indexes.push_back(i); // we assign a -1 here as a placeholder columns.emplace_back(std::make_shared(0, -1, TypeId::INTEGER)); @@ -98,14 +98,19 @@ auto Planner::PlanSelectWindow(const SelectStatement &statement, AbstractPlanNod auto [_, ret] = PlanExpression(*arg, {child}); raw_args.emplace_back(std::move(ret)); } - auto [window_agg_type, clean_args] = GetWindowAggCallFromFactory(window_call.func_name_, std::move(raw_args)); - window_agg_types.emplace_back(window_agg_type); + auto [window_func_type, clean_args] = GetWindowAggCallFromFactory(window_call.func_name_, std::move(raw_args)); + window_func_types.emplace_back(window_func_type); if (clean_args.size() > 1) { throw bustub::NotImplementedException("only agg call of zero/one arg is supported"); } if (clean_args.empty()) { - // Rewrite count(*) into count(1) - clean_arg = std::make_shared(ValueFactory::GetIntegerValue(1)); + if (window_func_type == WindowFunctionType::CountStarAggregate) { + // Rewrite count(*) into count(1) + clean_arg = std::make_shared(ValueFactory::GetIntegerValue(1)); + } else { + // rank, arg = nullptr + clean_arg = std::shared_ptr(nullptr); + } } else { clean_arg = std::move(clean_args[0]); } @@ -114,11 +119,11 @@ auto Planner::PlanSelectWindow(const SelectStatement &statement, AbstractPlanNod // we don't need window_agg_indexes here because we already use placeholders to infer the window agg column type is // Integer - auto window_output_schema = WindowAggregationPlanNode::InferWindowSchema(columns); + auto window_output_schema = WindowFunctionPlanNode::InferWindowSchema(columns); - auto plan = std::make_shared( + auto plan = std::make_shared( std::make_shared(ProjectionPlanNode::RenameSchema(window_output_schema, column_names)), child, - window_agg_indexes, columns, partition_by_exprs, order_by_exprs, arg_exprs, window_agg_types); + window_func_indexes, columns, partition_by_exprs, order_by_exprs, arg_exprs, window_func_types); return plan; }