Skip to content

Commit

Permalink
Merge pull request duckdb#10873 from Mytherin/limitcleanup
Browse files Browse the repository at this point in the history
LIMIT/OFFSET clean-up
  • Loading branch information
Mytherin authored Feb 29, 2024
2 parents b0f4d75 + 598e092 commit 403485d
Show file tree
Hide file tree
Showing 43 changed files with 592 additions and 446 deletions.
1 change: 1 addition & 0 deletions .github/config/out_of_tree_extensions.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -90,5 +90,6 @@ if (NOT WIN32)
LOAD_TESTS DONT_LINK
GIT_URL https://github.com/duckdb/substrait
GIT_TAG 870bab8725d1123905296bfb1f35ce737434e0b3
APPLY_PATCHES
)
endif()
38 changes: 38 additions & 0 deletions .github/patches/extensions/substrait/substrait.patch
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
diff --git a/src/to_substrait.cpp b/src/to_substrait.cpp
index 03d9778..d2429c6 100644
--- a/src/to_substrait.cpp
+++ b/src/to_substrait.cpp
@@ -777,8 +777,31 @@ substrait::Rel *DuckDBToSubstrait::TransformLimit(LogicalOperator &dop) {
auto stopn = res->mutable_fetch();
stopn->set_allocated_input(TransformOp(*dop.children[0]));

- stopn->set_offset(dlimit.offset_val);
- stopn->set_count(dlimit.limit_val);
+ idx_t limit_val;
+ idx_t offset_val;
+
+ switch(dlimit.limit_val.Type()) {
+ case LimitNodeType::CONSTANT_VALUE:
+ limit_val = dlimit.limit_val.GetConstantValue();
+ break;
+ case LimitNodeType::UNSET:
+ limit_val = 2ULL << 62ULL;
+ break;
+ default:
+ throw InternalException("Unsupported limit value type");
+ }
+ switch(dlimit.offset_val.Type()) {
+ case LimitNodeType::CONSTANT_VALUE:
+ offset_val = dlimit.offset_val.GetConstantValue();
+ break;
+ case LimitNodeType::UNSET:
+ offset_val = 0;
+ break;
+ default:
+ throw InternalException("Unsupported offset value type");
+ }
+ stopn->set_offset(offset_val);
+ stopn->set_count(limit_val);
return res;
}

1 change: 1 addition & 0 deletions scripts/generate_serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@
'LogicalType',
'ColumnDefinition',
'BaseStatistics',
'BoundLimitNode',
]

reference_list = ['ClientContext', 'bound_parameter_map_t']
Expand Down
43 changes: 38 additions & 5 deletions src/common/enum_util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2810,6 +2810,44 @@ KeywordCategory EnumUtil::FromString<KeywordCategory>(const char *value) {
throw NotImplementedException(StringUtil::Format("Enum value: '%s' not implemented", value));
}

template<>
const char* EnumUtil::ToChars<LimitNodeType>(LimitNodeType value) {
switch(value) {
case LimitNodeType::UNSET:
return "UNSET";
case LimitNodeType::CONSTANT_VALUE:
return "CONSTANT_VALUE";
case LimitNodeType::CONSTANT_PERCENTAGE:
return "CONSTANT_PERCENTAGE";
case LimitNodeType::EXPRESSION_VALUE:
return "EXPRESSION_VALUE";
case LimitNodeType::EXPRESSION_PERCENTAGE:
return "EXPRESSION_PERCENTAGE";
default:
throw NotImplementedException(StringUtil::Format("Enum value: '%d' not implemented", value));
}
}

template<>
LimitNodeType EnumUtil::FromString<LimitNodeType>(const char *value) {
if (StringUtil::Equals(value, "UNSET")) {
return LimitNodeType::UNSET;
}
if (StringUtil::Equals(value, "CONSTANT_VALUE")) {
return LimitNodeType::CONSTANT_VALUE;
}
if (StringUtil::Equals(value, "CONSTANT_PERCENTAGE")) {
return LimitNodeType::CONSTANT_PERCENTAGE;
}
if (StringUtil::Equals(value, "EXPRESSION_VALUE")) {
return LimitNodeType::EXPRESSION_VALUE;
}
if (StringUtil::Equals(value, "EXPRESSION_PERCENTAGE")) {
return LimitNodeType::EXPRESSION_PERCENTAGE;
}
throw NotImplementedException(StringUtil::Format("Enum value: '%s' not implemented", value));
}

template<>
const char* EnumUtil::ToChars<LoadType>(LoadType value) {
switch(value) {
Expand Down Expand Up @@ -2865,8 +2903,6 @@ const char* EnumUtil::ToChars<LogicalOperatorType>(LogicalOperatorType value) {
return "LOGICAL_DISTINCT";
case LogicalOperatorType::LOGICAL_SAMPLE:
return "LOGICAL_SAMPLE";
case LogicalOperatorType::LOGICAL_LIMIT_PERCENT:
return "LOGICAL_LIMIT_PERCENT";
case LogicalOperatorType::LOGICAL_PIVOT:
return "LOGICAL_PIVOT";
case LogicalOperatorType::LOGICAL_COPY_DATABASE:
Expand Down Expand Up @@ -3006,9 +3042,6 @@ LogicalOperatorType EnumUtil::FromString<LogicalOperatorType>(const char *value)
if (StringUtil::Equals(value, "LOGICAL_SAMPLE")) {
return LogicalOperatorType::LOGICAL_SAMPLE;
}
if (StringUtil::Equals(value, "LOGICAL_LIMIT_PERCENT")) {
return LogicalOperatorType::LOGICAL_LIMIT_PERCENT;
}
if (StringUtil::Equals(value, "LOGICAL_PIVOT")) {
return LogicalOperatorType::LOGICAL_PIVOT;
}
Expand Down
2 changes: 0 additions & 2 deletions src/common/enums/logical_operator_type.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,6 @@ string LogicalOperatorToString(LogicalOperatorType type) {
return "TOP_N";
case LogicalOperatorType::LOGICAL_SAMPLE:
return "SAMPLE";
case LogicalOperatorType::LOGICAL_LIMIT_PERCENT:
return "LIMIT_PERCENT";
case LogicalOperatorType::LOGICAL_COPY_TO_FILE:
return "COPY_TO_FILE";
case LogicalOperatorType::LOGICAL_COPY_DATABASE:
Expand Down
90 changes: 54 additions & 36 deletions src/execution/operator/helper/physical_limit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,10 @@

namespace duckdb {

PhysicalLimit::PhysicalLimit(vector<LogicalType> types, idx_t limit, idx_t offset,
unique_ptr<Expression> limit_expression, unique_ptr<Expression> offset_expression,
PhysicalLimit::PhysicalLimit(vector<LogicalType> types, BoundLimitNode limit_val_p, BoundLimitNode offset_val_p,
idx_t estimated_cardinality)
: PhysicalOperator(PhysicalOperatorType::LIMIT, std::move(types), estimated_cardinality), limit_value(limit),
offset_value(offset), limit_expression(std::move(limit_expression)),
offset_expression(std::move(offset_expression)) {
: PhysicalOperator(PhysicalOperatorType::LIMIT, std::move(types), estimated_cardinality),
limit_val(std::move(limit_val_p)), offset_val(std::move(offset_val_p)) {
}

//===--------------------------------------------------------------------===//
Expand All @@ -36,13 +34,12 @@ class LimitLocalState : public LocalSinkState {
public:
explicit LimitLocalState(ClientContext &context, const PhysicalLimit &op)
: current_offset(0), data(context, op.types, true) {
this->limit = op.limit_expression ? DConstants::INVALID_INDEX : op.limit_value;
this->offset = op.offset_expression ? DConstants::INVALID_INDEX : op.offset_value;
PhysicalLimit::SetInitialLimits(op.limit_val, op.offset_val, limit, offset);
}

idx_t current_offset;
idx_t limit;
idx_t offset;
optional_idx limit;
optional_idx offset;
BatchedDataCollection data;
};

Expand All @@ -54,38 +51,56 @@ unique_ptr<LocalSinkState> PhysicalLimit::GetLocalSinkState(ExecutionContext &co
return make_uniq<LimitLocalState>(context.client, *this);
}

bool PhysicalLimit::ComputeOffset(ExecutionContext &context, DataChunk &input, idx_t &limit, idx_t &offset,
idx_t current_offset, idx_t &max_element, Expression *limit_expression,
Expression *offset_expression) {
if (limit != DConstants::INVALID_INDEX && offset != DConstants::INVALID_INDEX) {
max_element = limit + offset;
if ((limit == 0 || current_offset >= max_element) && !(limit_expression || offset_expression)) {
return false;
}
void PhysicalLimit::SetInitialLimits(const BoundLimitNode &limit_val, const BoundLimitNode &offset_val,
optional_idx &limit, optional_idx &offset) {
switch (limit_val.Type()) {
case LimitNodeType::CONSTANT_VALUE:
limit = limit_val.GetConstantValue();
break;
case LimitNodeType::UNSET:
limit = MAX_LIMIT_VALUE;
break;
default:
break;
}
switch (offset_val.Type()) {
case LimitNodeType::CONSTANT_VALUE:
offset = offset_val.GetConstantValue();
break;
case LimitNodeType::UNSET:
offset = 0;
break;
default:
break;
}
}

// get the next chunk from the child
if (limit == DConstants::INVALID_INDEX) {
limit = 1ULL << 62ULL;
Value val = GetDelimiter(context, input, limit_expression);
bool PhysicalLimit::ComputeOffset(ExecutionContext &context, DataChunk &input, optional_idx &limit,
optional_idx &offset, idx_t current_offset, idx_t &max_element,
const BoundLimitNode &limit_val, const BoundLimitNode &offset_val) {
if (!limit.IsValid()) {
Value val = GetDelimiter(context, input, limit_val.GetValueExpression());
if (!val.IsNull()) {
limit = val.GetValue<idx_t>();
} else {
limit = MAX_LIMIT_VALUE;
}
if (limit > 1ULL << 62ULL) {
throw BinderException("Max value %lld for LIMIT/OFFSET is %lld", limit, 1ULL << 62ULL);
if (limit.GetIndex() > MAX_LIMIT_VALUE) {
throw BinderException("Max value %lld for LIMIT/OFFSET is %lld", limit.GetIndex(), MAX_LIMIT_VALUE);
}
}
if (offset == DConstants::INVALID_INDEX) {
offset = 0;
Value val = GetDelimiter(context, input, offset_expression);
if (!offset.IsValid()) {
Value val = GetDelimiter(context, input, offset_val.GetValueExpression());
if (!val.IsNull()) {
offset = val.GetValue<idx_t>();
} else {
offset = 0;
}
if (offset > 1ULL << 62ULL) {
throw BinderException("Max value %lld for LIMIT/OFFSET is %lld", offset, 1ULL << 62ULL);
if (offset.GetIndex() > MAX_LIMIT_VALUE) {
throw BinderException("Max value %lld for LIMIT/OFFSET is %lld", offset.GetIndex(), MAX_LIMIT_VALUE);
}
}
max_element = limit + offset;
max_element = limit.GetIndex() + offset.GetIndex();
if (limit == 0 || current_offset >= max_element) {
return false;
}
Expand All @@ -100,8 +115,7 @@ SinkResultType PhysicalLimit::Sink(ExecutionContext &context, DataChunk &chunk,
auto &offset = state.offset;

idx_t max_element;
if (!ComputeOffset(context, chunk, limit, offset, state.current_offset, max_element, limit_expression.get(),
offset_expression.get())) {
if (!ComputeOffset(context, chunk, limit, offset, state.current_offset, max_element, limit_val, offset_val)) {
return SinkResultType::FINISHED;
}
auto max_cardinality = max_element - state.current_offset;
Expand All @@ -121,8 +135,12 @@ SinkCombineResultType PhysicalLimit::Combine(ExecutionContext &context, Operator
auto &state = input.local_state.Cast<LimitLocalState>();

lock_guard<mutex> lock(gstate.glock);
gstate.limit = state.limit;
gstate.offset = state.offset;
if (state.limit.IsValid()) {
gstate.limit = state.limit.GetIndex();
}
if (state.offset.IsValid()) {
gstate.offset = state.offset.GetIndex();
}
gstate.data.Merge(state.data);

return SinkCombineResultType::FINISHED;
Expand Down Expand Up @@ -209,12 +227,12 @@ bool PhysicalLimit::HandleOffset(DataChunk &input, idx_t &current_offset, idx_t
return true;
}

Value PhysicalLimit::GetDelimiter(ExecutionContext &context, DataChunk &input, Expression *expr) {
Value PhysicalLimit::GetDelimiter(ExecutionContext &context, DataChunk &input, const Expression &expr) {
DataChunk limit_chunk;
vector<LogicalType> types {expr->return_type};
vector<LogicalType> types {expr.return_type};
auto &allocator = Allocator::Get(context.client);
limit_chunk.Initialize(allocator, types);
ExpressionExecutor limit_executor(context.client, expr);
ExpressionExecutor limit_executor(context.client, &expr);
auto input_size = input.size();
input.SetCardinality(1);
limit_executor.Execute(input, limit_chunk);
Expand Down
Loading

0 comments on commit 403485d

Please sign in to comment.