Skip to content

Commit

Permalink
Merge pull request #113 from pdet/view
Browse files Browse the repository at this point in the history
Reuse connection to Extract,Consume and execute substrait query plans
  • Loading branch information
pdet authored Nov 6, 2024
2 parents be71387 + 3ac8aae commit bc9f4b3
Show file tree
Hide file tree
Showing 13 changed files with 325 additions and 159 deletions.
12 changes: 6 additions & 6 deletions .github/workflows/distribution.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,19 @@ concurrency:
jobs:
duckdb-stable-build:
name: Build extension binaries
uses: duckdb/extension-ci-tools/.github/workflows/_extension_distribution.yml@v1.1.0
uses: duckdb/extension-ci-tools/.github/workflows/_extension_distribution.yml@main
with:
duckdb_version: v1.1.0
exclude_archs: "wasm_mvp;wasm_eh;wasm_threads;windows_amd64;windows_amd64_rtools"
duckdb_version: main
exclude_archs: "wasm_mvp;wasm_eh;wasm_threads;windows_amd64;windows_amd64_mingw"
extension_name: substrait

duckdb-stable-deploy:
name: Deploy extension binaries
needs: duckdb-stable-build
uses: duckdb/extension-ci-tools/.github/workflows/_extension_deploy.yml@v1.1.0
uses: duckdb/extension-ci-tools/.github/workflows/_extension_deploy.yml@main
secrets: inherit
with:
duckdb_version: v1.1.0
exclude_archs: "wasm_mvp;wasm_eh;wasm_threads;windows_amd64;windows_amd64_rtools"
duckdb_version: main
exclude_archs: "wasm_mvp;wasm_eh;wasm_threads;windows_amd64;windows_amd64_mingw"
extension_name: substrait
deploy_latest: true
2 changes: 1 addition & 1 deletion .github/workflows/main_distribution.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,6 @@ jobs:
with:
duckdb_version: main
ci_tools_version: main
exclude_archs: "wasm_mvp;wasm_eh;wasm_threads;windows_amd64;windows_amd64_rtools"
exclude_archs: "wasm_mvp;wasm_eh;wasm_threads;windows_amd64;windows_amd64_mingw"
extension_name: substrait

2 changes: 1 addition & 1 deletion duckdb
Submodule duckdb updated 1863 files
87 changes: 72 additions & 15 deletions src/from_substrait.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,6 @@

#include "duckdb/common/types/value.hpp"
#include "duckdb/parser/expression/list.hpp"
#include "duckdb/main/relation/join_relation.hpp"
#include "duckdb/main/relation/cross_product_relation.hpp"

#include "duckdb/main/relation/limit_relation.hpp"
#include "duckdb/main/relation/projection_relation.hpp"
#include "duckdb/main/relation/setop_relation.hpp"
#include "duckdb/main/relation/aggregate_relation.hpp"
#include "duckdb/main/relation/filter_relation.hpp"
#include "duckdb/main/relation/order_relation.hpp"
#include "duckdb/main/connection.hpp"
#include "duckdb/parser/parser.hpp"
#include "duckdb/common/exception.hpp"
Expand All @@ -25,7 +16,24 @@
#include "google/protobuf/util/json_util.h"
#include "substrait/plan.pb.h"

#include "duckdb/main/table_description.hpp"

#include "duckdb/catalog/catalog_entry/table_catalog_entry.hpp"
#include "duckdb/common/helper.hpp"

#include "duckdb/main/relation.hpp"
#include "duckdb/main/relation/table_relation.hpp"
#include "duckdb/main/relation/table_function_relation.hpp"
#include "duckdb/main/relation/value_relation.hpp"
#include "duckdb/main/relation/view_relation.hpp"
#include "duckdb/main/relation/aggregate_relation.hpp"
#include "duckdb/main/relation/cross_product_relation.hpp"
#include "duckdb/main/relation/filter_relation.hpp"
#include "duckdb/main/relation/join_relation.hpp"
#include "duckdb/main/relation/limit_relation.hpp"
#include "duckdb/main/relation/order_relation.hpp"
#include "duckdb/main/relation/projection_relation.hpp"
#include "duckdb/main/relation/setop_relation.hpp"

namespace duckdb {
const std::unordered_map<std::string, std::string> SubstraitToDuckDB::function_names_remap = {
Expand All @@ -40,7 +48,7 @@ const case_insensitive_set_t SubstraitToDuckDB::valid_extract_subfields = {
"quarter", "microsecond", "milliseconds", "second", "minute", "hour"};

string SubstraitToDuckDB::RemapFunctionName(const string &function_name) {
// Lets first drop any extension id
// Let's first drop any extension id
string name;
for (auto &c : function_name) {
if (c == ':') {
Expand All @@ -67,7 +75,9 @@ string SubstraitToDuckDB::RemoveExtension(const string &function_name) {
return name;
}

SubstraitToDuckDB::SubstraitToDuckDB(Connection &con_p, const string &serialized, bool json) : con(con_p) {
SubstraitToDuckDB::SubstraitToDuckDB(shared_ptr<ClientContext> &context_p, const string &serialized, bool json,
bool acquire_lock_p)
: context(context_p), acquire_lock(acquire_lock_p) {
if (!json) {
if (!plan.ParseFromString(serialized)) {
throw std::runtime_error("Was not possible to convert binary into Substrait plan");
Expand Down Expand Up @@ -510,16 +520,46 @@ shared_ptr<Relation> SubstraitToDuckDB::TransformAggregateOp(const substrait::Re
return make_shared_ptr<AggregateRelation>(TransformOp(sop.aggregate().input()), std::move(expressions),
std::move(groups));
}
unique_ptr<TableDescription> TableInfo(ClientContext &context, const string &schema_name, const string &table_name) {
// obtain the table info
auto table = Catalog::GetEntry<TableCatalogEntry>(context, INVALID_CATALOG, schema_name, table_name,
OnEntryNotFound::RETURN_NULL);
if (!table) {
return {};
}
// write the table info to the result
auto result = make_uniq<TableDescription>(INVALID_CATALOG, schema_name, table_name);
for (auto &column : table->GetColumns().Logical()) {
result->columns.emplace_back(column.Copy());
}
return result;
}

shared_ptr<Relation> SubstraitToDuckDB::TransformReadOp(const substrait::Rel &sop) {
auto &sget = sop.read();
shared_ptr<Relation> scan;
auto context_wrapper = make_shared_ptr<RelationContextWrapper>(context);
if (sget.has_named_table()) {
auto table_name = sget.named_table().names(0);
// If we can't find a table with that name, let's try a view.
try {
scan = con.Table(sget.named_table().names(0));
auto table_info = TableInfo(*context, DEFAULT_SCHEMA, table_name);
if (!table_info) {
throw CatalogException("Table '%s' does not exist!", table_name);
}
if (acquire_lock) {
scan = make_shared_ptr<TableRelation>(context, std::move(table_info));

} else {
scan = make_shared_ptr<TableRelation>(context_wrapper, std::move(table_info));
}
} catch (...) {
scan = con.View(sget.named_table().names(0));
if (acquire_lock) {
scan = make_shared_ptr<ViewRelation>(context, DEFAULT_SCHEMA, table_name);

} else {
scan = make_shared_ptr<ViewRelation>(context_wrapper, DEFAULT_SCHEMA, table_name);
}
}
} else if (sget.has_local_files()) {
vector<Value> parquet_files;
Expand All @@ -540,7 +580,18 @@ shared_ptr<Relation> SubstraitToDuckDB::TransformReadOp(const substrait::Rel &so
}
string name = "parquet_" + StringUtil::GenerateRandomName();
named_parameter_map_t named_parameters({{"binary_as_string", Value::BOOLEAN(false)}});
scan = con.TableFunction("parquet_scan", {Value::LIST(parquet_files)}, named_parameters)->Alias(name);
vector<Value> parameters {Value::LIST(parquet_files)};
shared_ptr<TableFunctionRelation> scan_rel;
if (acquire_lock) {
scan_rel = make_shared_ptr<TableFunctionRelation>(context, "parquet_scan", parameters,
std::move(named_parameters));
} else {
scan_rel = make_shared_ptr<TableFunctionRelation>(context_wrapper, "parquet_scan", parameters,
std::move(named_parameters));
}

auto rel = static_cast<Relation *>(scan_rel.get());
scan = rel->Alias(name);
} else if (sget.has_virtual_table()) {
// We need to handle a virtual table as a LogicalExpressionGet
auto literal_values = sget.virtual_table().values();
Expand All @@ -553,7 +604,13 @@ shared_ptr<Relation> SubstraitToDuckDB::TransformReadOp(const substrait::Rel &so
}
expression_rows.emplace_back(expression_row);
}
scan = con.Values(expression_rows);
vector<string> column_names;
if (acquire_lock) {
scan = make_shared_ptr<ValueRelation>(context, expression_rows, column_names);

} else {
scan = make_shared_ptr<ValueRelation>(context_wrapper, expression_rows, column_names);
}
} else {
throw NotImplementedException("Unsupported type of read operator for substrait");
}
Expand Down
17 changes: 14 additions & 3 deletions src/include/from_substrait.hpp
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
//===----------------------------------------------------------------------===//
// DuckDB
//
// from_substrait.hpp
//
//
//===----------------------------------------------------------------------===//

#pragma once

#include <string>
Expand All @@ -10,7 +18,8 @@ namespace duckdb {

class SubstraitToDuckDB {
public:
SubstraitToDuckDB(Connection &con_p, const string &serialized, bool json = false);
SubstraitToDuckDB(shared_ptr<ClientContext> &context_p, const string &serialized, bool json = false,
bool acquire_lock = false);
//! Transforms Substrait Plan to DuckDB Relation
shared_ptr<Relation> TransformPlan();

Expand Down Expand Up @@ -48,8 +57,8 @@ class SubstraitToDuckDB {

//! Transform Substrait Sort Order to DuckDB Order
OrderByNode TransformOrder(const substrait::SortField &sordf);
//! DuckDB Connection
Connection &con;
//! DuckDB Client Context
shared_ptr<ClientContext> context;
//! Substrait Plan
substrait::Plan plan;
//! Variable used to register functions
Expand All @@ -59,5 +68,7 @@ class SubstraitToDuckDB {
static const unordered_map<std::string, std::string> function_names_remap;
static const case_insensitive_set_t valid_extract_subfields;
vector<ParsedExpression *> struct_expressions;
//! If we should acquire a client context lock when creating the relatiosn
const bool acquire_lock;
};
} // namespace duckdb
8 changes: 8 additions & 0 deletions src/include/to_substrait.hpp
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
//===----------------------------------------------------------------------===//
// DuckDB
//
// to_substrait.hpp
//
//
//===----------------------------------------------------------------------===//

#pragma once

#include "custom_extensions/custom_extensions.hpp"
Expand Down
Loading

0 comments on commit bc9f4b3

Please sign in to comment.