Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for Delim Join/Get Relations #91

Closed
wants to merge 27 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
10ad412
wip
pdet May 28, 2024
d1f3a24
merge
pdet Jul 8, 2024
1dbeee2
Getting delimiter join/get DuckDB -> Substrait
pdet Jul 8, 2024
078a315
More code to handle delim joins
pdet Jul 8, 2024
db7f93e
[wip] join for relations
pdet Jul 9, 2024
a8529d1
Woo, queries 4 and 17 with delimiter joins now seem to work
pdet Jul 10, 2024
f8970a2
Q 20 is working
pdet Jul 11, 2024
a362259
Support for other join types and adding more tpch queries
pdet Jul 11, 2024
f4f09e2
All Queries passing
pdet Jul 11, 2024
f59c458
substrait include
pdet Jul 11, 2024
c0eb7b8
Move ProjectionJoin code
pdet Jul 11, 2024
a30002c
more cleanup
pdet Jul 11, 2024
69c755c
Merge branch 'duckdb:main' into delim_join
pdet Jul 11, 2024
9c87044
Merge remote-tracking branch 'origin/experimental' into delim_join
pdet Jul 11, 2024
7515780
Merge remote-tracking branch 'origin/delim_join' into delim_join
pdet Jul 11, 2024
4438b02
apply patches
nickgerrets Jul 22, 2024
8806437
Fixing Q15
pdet Jul 24, 2024
a8a7dd9
woopsie
pdet Jul 24, 2024
eb72924
Support field_refs directly instead of expressions in delim join
pdet Jul 24, 2024
13b387d
have delimside as an enum
pdet Jul 24, 2024
bde22f4
wip: modifying the delim join/get to use refrel
pdet Aug 2, 2024
d6af7b6
Alright delim join get working with refrels
pdet Aug 2, 2024
d54395c
update substrait
pdet Aug 13, 2024
359be4e
Update to new specs
pdet Aug 13, 2024
73b072a
Adjustment to see if this PoC works properly after some PR Requests
pdet Sep 3, 2024
2dc1721
Remove delim flipped var
pdet Sep 3, 2024
68d060c
Revert "Remove delim flipped var"
pdet Sep 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion duckdb
Submodule duckdb updated 2729 files
2 changes: 1 addition & 1 deletion duckdb-r
Submodule duckdb-r updated 1336 files
37 changes: 37 additions & 0 deletions scripts/build_local.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# Requires protoc 3.19.04
# https://github.com/protocolbuffers/protobuf/releases/tag/v3.19.4

import os
import shutil
from os import walk

# Change to substrait folder
sub_folder = os.path.join(os.path.dirname(os.path.realpath(__file__)),'..','third_party','substrait')
# Delete Current CPP files
shutil.rmtree(os.path.join(sub_folder,'substrait'))

git_folder = os.path.join(os.path.dirname(os.path.realpath(__file__)),'..','substrait')

proto_folder = os.path.join(git_folder,'proto')
substrait_proto_folder = os.path.join(proto_folder,'substrait')
substrait_extensions_proto_folder = os.path.join(substrait_proto_folder,'extensions')

# Generate Proto Files on a specific git tag
os.chdir(sub_folder)

os.mkdir("substrait")
os.mkdir("substrait/extensions")

# Generate all files
proto_sub_list = next(walk(substrait_proto_folder), (None, None, []))[2]

proto_sub_extensions = next(walk(substrait_extensions_proto_folder), (None, None, []))[2]

# /usr/local/bin/protoc
print("Protoc version" + os.popen('protoc --version').read())

for proto in proto_sub_list:
os.system("/usr/local/bin/protoc -I="+ proto_folder+ " --cpp_out="+sub_folder +" "+ os.path.join(substrait_proto_folder,proto))

for proto in proto_sub_extensions:
os.system("/usr/local/bin/protoc -I="+ proto_folder+ " --cpp_out="+sub_folder +" "+ os.path.join(substrait_extensions_proto_folder,proto))
100 changes: 89 additions & 11 deletions src/from_substrait.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@
#include "duckdb/common/enums/set_operation_type.hpp"

#include "duckdb/parser/expression/comparison_expression.hpp"

#include "substrait/plan.pb.h"
#include "duckdb/main/client_data.hpp"
#include "google/protobuf/util/json_util.h"
#include "substrait/plan.pb.h"
#include "duckdb/main/relation/delim_get_relation.hpp"
#include "duckdb/main/client_data.hpp"
#include "duckdb/common/http_state.hpp"

namespace duckdb {
const std::unordered_map<std::string, std::string> SubstraitToDuckDB::function_names_remap = {
Expand Down Expand Up @@ -66,9 +66,6 @@ std::string SubstraitToDuckDB::RemoveExtension(std::string &function_name) {
}

SubstraitToDuckDB::SubstraitToDuckDB(Connection &con_p, const string &serialized, bool json) : con(con_p) {
auto http_state = HTTPState::TryGetState(*con_p.context);
http_state->Reset();

if (!json) {
if (!plan.ParseFromString(serialized)) {
throw std::runtime_error("Was not possible to convert binary into Substrait plan");
Expand Down Expand Up @@ -413,15 +410,18 @@ shared_ptr<Relation> SubstraitToDuckDB::TransformJoinOp(const substrait::Rel &so
case substrait::JoinRel::JoinType::JoinRel_JoinType_JOIN_TYPE_RIGHT:
djointype = JoinType::RIGHT;
break;
case substrait::JoinRel::JoinType::JoinRel_JoinType_JOIN_TYPE_SINGLE:
case substrait::JoinRel::JoinType::JoinRel_JoinType_JOIN_TYPE_LEFT_SINGLE:
djointype = JoinType::SINGLE;
break;
case substrait::JoinRel::JoinType::JoinRel_JoinType_JOIN_TYPE_SEMI:
case substrait::JoinRel::JoinType::JoinRel_JoinType_JOIN_TYPE_LEFT_SEMI:
djointype = JoinType::SEMI;
break;
case substrait::JoinRel::JoinType::JoinRel_JoinType_JOIN_TYPE_MARK:
case substrait::JoinRel::JoinType::JoinRel_JoinType_JOIN_TYPE_LEFT_MARK:
djointype = JoinType::MARK;
break;
case substrait::JoinRel::JoinType::JoinRel_JoinType_JOIN_TYPE_RIGHT_SEMI:
djointype = JoinType::RIGHT_SEMI;
break;
default:
throw InternalException("Unsupported join type");
}
Expand All @@ -431,6 +431,73 @@ shared_ptr<Relation> SubstraitToDuckDB::TransformJoinOp(const substrait::Rel &so
return make_shared_ptr<JoinRelation>(std::move(left_op), std::move(right_op), std::move(join_condition), djointype);
}

shared_ptr<Relation> SubstraitToDuckDB::TransformDelimJoinOp(const substrait::Rel &sop) {
auto &sjoin = sop.duplicate_eliminated_join();

vector<unique_ptr<ParsedExpression>> duplicate_eliminated_columns;
for (auto &col : sjoin.duplicate_eliminated_columns()) {
duplicate_eliminated_columns.emplace_back(
make_uniq<PositionalReferenceExpression>(col.direct_reference().struct_field().field() + 1));
}
duplicate_eliminated_columns_ptr = &duplicate_eliminated_columns;
JoinType djointype;
switch (sjoin.type()) {
case substrait::DuplicateEliminatedJoinRel_JoinType::DuplicateEliminatedJoinRel_JoinType_JOIN_TYPE_INNER:
djointype = JoinType::INNER;
break;
case substrait::DuplicateEliminatedJoinRel_JoinType::DuplicateEliminatedJoinRel_JoinType_JOIN_TYPE_LEFT:
djointype = JoinType::LEFT;
break;
case substrait::DuplicateEliminatedJoinRel_JoinType::DuplicateEliminatedJoinRel_JoinType_JOIN_TYPE_RIGHT:
djointype = JoinType::RIGHT;
break;
case substrait::DuplicateEliminatedJoinRel_JoinType::DuplicateEliminatedJoinRel_JoinType_JOIN_TYPE_LEFT_SINGLE:
djointype = JoinType::SINGLE;
break;
case substrait::DuplicateEliminatedJoinRel_JoinType::DuplicateEliminatedJoinRel_JoinType_JOIN_TYPE_RIGHT_SEMI:
djointype = JoinType::RIGHT_SEMI;
break;
case substrait::DuplicateEliminatedJoinRel_JoinType::DuplicateEliminatedJoinRel_JoinType_JOIN_TYPE_LEFT_MARK:
djointype = JoinType::MARK;
break;
case substrait::DuplicateEliminatedJoinRel_JoinType::DuplicateEliminatedJoinRel_JoinType_JOIN_TYPE_RIGHT_ANTI:
djointype = JoinType::RIGHT_ANTI;
break;
default:
throw InternalException("Unsupported join type");
}
unique_ptr<ParsedExpression> join_condition = TransformExpr(sjoin.expression());
auto left_op = TransformOp(sjoin.left())->Alias("left");
auto right_op = TransformOp(sjoin.right())->Alias("right");
auto join =
make_shared_ptr<JoinRelation>(std::move(left_op), std::move(right_op), std::move(join_condition), djointype);
if (sjoin.duplicate_eliminated_side() == substrait::DuplicateEliminatedJoinRel::DUPLICATE_ELIMINATED_SIDE_RIGHT) {
join->delim_flipped = true;
} else if (sjoin.duplicate_eliminated_side() ==
substrait::DuplicateEliminatedJoinRel::DUPLICATE_ELIMINATED_SIDE_LEFT) {
join->delim_flipped = false;
} else {
throw InvalidInputException("The plan has a delimiter join with an invalid type for it's delimiter side.");
}
join->duplicate_eliminated_columns = std::move(duplicate_eliminated_columns);
return join;
}

shared_ptr<Relation> SubstraitToDuckDB::TransformDelimGetOp(const substrait::Rel &sop) {
auto &delim_get = sop.duplicate_eliminated_get();
auto subtree = TransformReferenceOp(delim_get.input());

auto &client_context = con.context;
vector<LogicalType> chunk_types;
auto &input_columns = subtree->Columns();
for (auto &col : *duplicate_eliminated_columns_ptr) {
auto& col_ref = col->Cast<PositionalReferenceExpression>();
chunk_types.emplace_back(input_columns[col_ref.index - 1].Type());
}
duplicate_eliminated_columns_ptr = nullptr;
return make_shared_ptr<DelimGetRelation>(client_context, chunk_types);
}

shared_ptr<Relation> SubstraitToDuckDB::TransformCrossProductOp(const substrait::Rel &sop) {
auto &sub_cross = sop.cross();

Expand Down Expand Up @@ -594,6 +661,12 @@ shared_ptr<Relation> SubstraitToDuckDB::TransformSetOp(const substrait::Rel &sop
return make_shared_ptr<SetOpRelation>(std::move(lhs), std::move(rhs), type);
}

shared_ptr<Relation> SubstraitToDuckDB::TransformReferenceOp(const substrait::ReferenceRel &reference) {
// Lets get the subtree
auto subtree = plan.relations(reference.subtree_ordinal());
return TransformOp(subtree.rel());
}

shared_ptr<Relation> SubstraitToDuckDB::TransformOp(const substrait::Rel &sop) {
switch (sop.rel_type_case()) {
case substrait::Rel::RelTypeCase::kJoin:
Expand All @@ -614,6 +687,12 @@ shared_ptr<Relation> SubstraitToDuckDB::TransformOp(const substrait::Rel &sop) {
return TransformSortOp(sop);
case substrait::Rel::RelTypeCase::kSet:
return TransformSetOp(sop);
case substrait::Rel::RelTypeCase::kDuplicateEliminatedJoin:
return TransformDelimJoinOp(sop);
case substrait::Rel::RelTypeCase::kDuplicateEliminatedGet:
return TransformDelimGetOp(sop);
case substrait::Rel::RelTypeCase::kReference:
return TransformReferenceOp(sop.reference());
default:
throw InternalException("Unsupported relation type " + to_string(sop.rel_type_case()));
}
Expand All @@ -640,13 +719,12 @@ shared_ptr<Relation> SubstraitToDuckDB::TransformPlan() {
d_plan = TransformRootOp(plan.relations(0).root());
} catch (std::exception &ex) {
// Ideally we don't have to do that, we should change to capture the error and throw it here at some point
::duckdb::ErrorData parsed_error(ex);
ErrorData parsed_error(ex);
throw NotImplementedException("Something in this plan is not yet implemented in the Substrait->DuckDB plan "
"conversion.\n Substrait Plan:\n" +
plan.DebugString() +
"Not Implemented error message: " + parsed_error.RawMessage());
}

return d_plan;
}

Expand Down
5 changes: 5 additions & 0 deletions src/include/from_substrait.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ class SubstraitToDuckDB {
//! Transform Substrait Operations to DuckDB Relations
shared_ptr<Relation> TransformOp(const substrait::Rel &sop);
shared_ptr<Relation> TransformJoinOp(const substrait::Rel &sop);
shared_ptr<Relation> TransformDelimJoinOp(const substrait::Rel &sop);
shared_ptr<Relation> TransformDelimGetOp(const substrait::Rel &sop);
shared_ptr<Relation> TransformCrossProductOp(const substrait::Rel &sop);
shared_ptr<Relation> TransformFetchOp(const substrait::Rel &sop);
shared_ptr<Relation> TransformFilterOp(const substrait::Rel &sop);
Expand All @@ -28,6 +30,7 @@ class SubstraitToDuckDB {
shared_ptr<Relation> TransformReadOp(const substrait::Rel &sop);
shared_ptr<Relation> TransformSortOp(const substrait::Rel &sop);
shared_ptr<Relation> TransformSetOp(const substrait::Rel &sop);
shared_ptr<Relation> TransformReferenceOp(const substrait::ReferenceRel &reference);

//! Transform Substrait Expressions to DuckDB Expressions
unique_ptr<ParsedExpression> TransformExpr(const substrait::Expression &sexpr);
Expand Down Expand Up @@ -57,5 +60,7 @@ class SubstraitToDuckDB {
//! names
static const unordered_map<std::string, std::string> function_names_remap;
static const case_insensitive_set_t valid_extract_subfields;
//! Pointer to last seen duplicate_eliminated_columns
vector<unique_ptr<ParsedExpression>> *duplicate_eliminated_columns_ptr = nullptr;
};
} // namespace duckdb
14 changes: 13 additions & 1 deletion src/include/to_substrait.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class DuckDBToSubstrait {
void CreateFieldRef(substrait::Expression *expr, uint64_t col_idx);

//! Transforms Relation Root
substrait::RelRoot *TransformRootOp(LogicalOperator &dop);
void TransformRootOp(substrait::RelRoot &root_rel, LogicalOperator &dop);

//! Methods to Transform Logical Operators to Substrait Relations
substrait::Rel *TransformOp(duckdb::LogicalOperator &dop);
Expand All @@ -47,13 +47,19 @@ class DuckDBToSubstrait {
substrait::Rel *TransformLimit(duckdb::LogicalOperator &dop);
substrait::Rel *TransformOrderBy(duckdb::LogicalOperator &dop);
substrait::Rel *TransformComparisonJoin(duckdb::LogicalOperator &dop);
substrait::Rel *TransformDelimiterJoin(duckdb::LogicalOperator &dop);
substrait::Rel *TransformAggregateGroup(duckdb::LogicalOperator &dop);
substrait::Rel *TransformGet(duckdb::LogicalOperator &dop);
substrait::Rel *TransformCrossProduct(duckdb::LogicalOperator &dop);
substrait::Rel *TransformUnion(duckdb::LogicalOperator &dop);
substrait::Rel *TransformDistinct(duckdb::LogicalOperator &dop);
substrait::Rel *TransformExcept(LogicalOperator &dop);
substrait::Rel *TransformIntersect(LogicalOperator &dop);
substrait::Rel *TransformDelimGet();

//! Auxiliary function to create Projection on top of a Join
substrait::Rel *ProjectJoinRelation(LogicalComparisonJoin &djoin, substrait::Rel *join_relation,
idx_t left_col_count);

//! Methods to transform different LogicalGet Types (e.g., Table, Parquet)
//! To Substrait;
Expand Down Expand Up @@ -160,5 +166,11 @@ class DuckDBToSubstrait {
//! things don't go perfectly shiny
bool strict;
string errors;
//! Index of the current subtree relation we are looking at
//! This really only matters for delim joins/gets, since these are
//! the only splits we currently support.
int32_t cur_subtree_relation = 1;
//! The pointer to a delim join
LogicalComparisonJoin *duplicate_eliminated_parent_ptr = nullptr;
};
} // namespace duckdb
15 changes: 9 additions & 6 deletions src/substrait_extension.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,26 +89,28 @@ shared_ptr<Relation> SubstraitPlanToDuckDBRel(Connection &conn, const string &se

static void VerifySubstraitRoundtrip(unique_ptr<LogicalOperator> &query_plan, Connection &con,
ToSubstraitFunctionData &data, const string &serialized, bool is_json) {
// We round-trip the generated json and verify if the result is the same
auto actual_result = con.Query(data.query);

bool is_optimizer_enabled = con.context->config.enable_optimizer;
con.context->config.enable_optimizer = false;
auto sub_relation = SubstraitPlanToDuckDBRel(con, serialized, is_json);
unique_ptr<QueryResult> substrait_result;

try {
substrait_result = sub_relation->Execute();

} catch (std::exception &ex) {
// Ideally we don't have to do that, we should change to capture the error and throw it here at some point
query_plan->Print();
sub_relation->Print();

throw InternalException("Substrait Plan Execution Failed");
}
con.context->config.enable_optimizer = is_optimizer_enabled;
// We round-trip the generated json and verify if the result is the same
auto actual_result = con.Query(data.query);

substrait_result->names = actual_result->names;
unique_ptr<MaterializedQueryResult> substrait_materialized;

if (substrait_result->type == QueryResultType::STREAM_RESULT) {
auto &stream_query = substrait_result->Cast<duckdb::StreamQueryResult>();
auto &stream_query = substrait_result->Cast<StreamQueryResult>();

substrait_materialized = stream_query.Materialize();
} else if (substrait_result->type == QueryResultType::MATERIALIZED_RESULT) {
Expand Down Expand Up @@ -152,6 +154,7 @@ static DuckDBToSubstrait InitPlanExtractor(ClientContext &context, ToSubstraitFu
set<OptimizerType> disabled_optimizers = DBConfig::GetConfig(context).options.disabled_optimizers;
disabled_optimizers.insert(OptimizerType::IN_CLAUSE);
disabled_optimizers.insert(OptimizerType::COMPRESSED_MATERIALIZATION);
disabled_optimizers.insert(OptimizerType::MATERIALIZED_CTE);
DBConfig::GetConfig(*new_conn.context).options.disabled_optimizers = disabled_optimizers;

query_plan = new_conn.context->ExtractPlan(data.query);
Expand Down
Loading
Loading