Skip to content

Commit

Permalink
task: added translation for mohair rel types
Browse files Browse the repository at this point in the history
Specifically, added translations for SkyPartitionRel and SkySliceRel.
Also added translation for SkyRel, but at the moment it is almost
identical to ReadRel. The idea is that a SkyRel operator should still be
a lookup against the catalog like ReadRel for a named table is.
  • Loading branch information
drin committed Nov 15, 2024
1 parent 23ece7c commit 8b388ab
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 44 deletions.
30 changes: 16 additions & 14 deletions src/include/engine_duckdb.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,12 +114,11 @@ namespace duckdb {
shared_ptr<DuckSystemPlan> sys_plan;
shared_ptr<DuckLogicalPlan> engine_plan;
shared_ptr<DuckPhysicalPlan> exec_plan;
shared_ptr<PreparedStatementData> plan_data;

bool enable_optimizer;
bool finished { false };
};


struct FnDataSubstraitExecution : public TableFunctionData {
FnDataSubstraitExecution() = default;

Expand Down Expand Up @@ -150,7 +149,7 @@ namespace duckdb {
//! Transforms DuckDB Relation to DuckDB Logical Operator
shared_ptr<DuckLogicalPlan> TranspilePlanMessage(shared_ptr<DuckSystemPlan> sys_plan);

//! Transforms DuckDB Relation to DuckDB Physical Operator
//! Transforms DuckDB Logical Operator to DuckDB Physical Operator
shared_ptr<DuckPhysicalPlan> TranslateLogicalPlan( shared_ptr<DuckLogicalPlan> engine_plan
,bool optimize);

Expand Down Expand Up @@ -182,17 +181,20 @@ namespace duckdb {

// >> Internal translation functions for operators
// NOTE: these member methods eventually use t_conn and functions_map
shared_ptr<Relation> TranslateJoinOp (const skysubstrait::JoinRel& sjoin);
shared_ptr<Relation> TranslateCrossProductOp (const skysubstrait::CrossRel& scross);
shared_ptr<Relation> TranslateFetchOp (const skysubstrait::FetchRel& slimit);
shared_ptr<Relation> TranslateFilterOp (const skysubstrait::FilterRel& sfilter);
shared_ptr<Relation> TranslateProjectOp (const skysubstrait::ProjectRel& sproj);
shared_ptr<Relation> TranslateAggregateOp (const skysubstrait::AggregateRel& saggr);
shared_ptr<Relation> TranslateReadOp (const skysubstrait::ReadRel& sget);
shared_ptr<Relation> TranslateSortOp (const skysubstrait::SortRel& ssort);
shared_ptr<Relation> TranslateSetOp (const skysubstrait::SetRel& sset);

shared_ptr<Relation> TranslateSkyRel (const skymohair::SkyRel& sky_rel);
shared_ptr<Relation> TranslateJoinOp (const skysubstrait::JoinRel& sjoin);
shared_ptr<Relation> TranslateCrossProductOp (const skysubstrait::CrossRel& scross);
shared_ptr<Relation> TranslateFetchOp (const skysubstrait::FetchRel& slimit);
shared_ptr<Relation> TranslateFilterOp (const skysubstrait::FilterRel& sfilter);
shared_ptr<Relation> TranslateProjectOp (const skysubstrait::ProjectRel& sproj);
shared_ptr<Relation> TranslateAggregateOp (const skysubstrait::AggregateRel& saggr);
shared_ptr<Relation> TranslateReadOp (const skysubstrait::ReadRel& sget);
shared_ptr<Relation> TranslateSortOp (const skysubstrait::SortRel& ssort);
shared_ptr<Relation> TranslateSetOp (const skysubstrait::SetRel& sset);
shared_ptr<Relation> TranslateExtensionLeafOp(const skysubstrait::ExtensionLeafRel& leaf_rel);

shared_ptr<Relation> TranslateSkyRel (const skymohair::SkyRel& sky_rel);
shared_ptr<Relation> TranslateSkyPartitionRel(const skymohair::SkyPartitionRel& sky_rel);
shared_ptr<Relation> TranslateSkySliceRel (const skymohair::SkySliceRel& sky_rel);

//! Translate Substrait Sort Order to DuckDB Order
OrderByNode TranslateOrder(const skysubstrait::SortField& sordf);
Expand Down
103 changes: 73 additions & 30 deletions src/translation/duckdb_operators.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -385,8 +385,19 @@ namespace duckdb {
}


//! A SkyRel is assumed to be a lookup against a skytether catalog
shared_ptr<Relation>
DuckDBTranslator::TranslateSkyRel(const skymohair::SkyRel& sky_rel) {
string sky_relname { sky_rel.domain() + "/" + sky_rel.partition() };

try { return t_conn->Table(sky_relname); }
catch (...) { return t_conn->View (sky_relname); }
}

//! A SkyPartitionRel is assumed to be a filesystem lookup against a partition (need to
// find names of each slice contained in the partition).
shared_ptr<Relation>
DuckDBTranslator::TranslateSkyPartitionRel(const skymohair::SkyPartitionRel& sky_rel) {
vector<Value> slice_names;
string table_name { sky_rel.domain() + "/" + sky_rel.partition() };

Expand All @@ -412,41 +423,73 @@ namespace duckdb {
);
}

//! A SkySliceRel is assumed to be a filesystem lookup against a single slice ("as-is").
shared_ptr<Relation>
DuckDBTranslator::TranslateSkySliceRel(const skymohair::SkySliceRel& sky_rel) {
vector<Value> slice_names;
slice_names.emplace_back(sky_rel.slice_key());

//! Translate Substrait Operations to DuckDB Relations
using SRelType = skysubstrait::Rel::RelTypeCase;
shared_ptr<Relation> DuckDBTranslator::TranslateOp(const skysubstrait::Rel& sop) {
switch (sop.rel_type_case()) {
case SRelType::kJoin: return TranslateJoinOp (sop.join());
case SRelType::kCross: return TranslateCrossProductOp(sop.cross());
case SRelType::kFetch: return TranslateFetchOp (sop.fetch());
case SRelType::kFilter: return TranslateFilterOp (sop.filter());
case SRelType::kProject: return TranslateProjectOp (sop.project());
case SRelType::kAggregate: return TranslateAggregateOp (sop.aggregate());
case SRelType::kRead: return TranslateReadOp (sop.read());
case SRelType::kSort: return TranslateSortOp (sop.sort());
case SRelType::kSet: return TranslateSetOp (sop.set());

case SRelType::kExtensionLeaf: {
auto& leaf_rel = sop.extension_leaf().detail();
if (not leaf_rel.Is<skymohair::SkyRel>()) {
std::cerr << "Unsupported extension type: " << leaf_rel.descriptor()->name() << std::endl;
throw InternalException("Unsupported extension type");
}
// We expect the arrow files to contain arrow stream formatted data
string table_name { sky_rel.domain() + "/" + sky_rel.partition() };
return (
t_conn->TableFunction("scan_arrows_file", {Value::LIST(slice_names)})
->Alias(table_name)
);
}

shared_ptr<Relation>
DuckDBTranslator::TranslateExtensionLeafOp(const skysubstrait::ExtensionLeafRel& leaf_rel) {
shared_ptr<Relation> translated_rel { nullptr };

skymohair::SkyRel sky_rel;
leaf_rel.UnpackTo(&sky_rel);
if (not leaf_rel.has_detail()) {
throw InternalException("ExtensionLeaf op is missing extension details");
}

auto duck_rel = TranslateSkyRel(sky_rel);
if (duck_rel == nullptr) {
throw InternalException("Support for SkyRel in progress");
}
else {
return duck_rel;
}
// Figure out which type of extension operator it is
auto& extension_msg = leaf_rel.detail();
if (extension_msg.Is<skymohair::SkyRel>()) {
skymohair::SkyRel sky_rel;
extension_msg.UnpackTo(&sky_rel);

}
translated_rel = TranslateSkyRel(sky_rel);
}

else if (extension_msg.Is<skymohair::SkyPartitionRel>()) {
skymohair::SkyPartitionRel sky_rel;
extension_msg.UnpackTo(&sky_rel);

translated_rel = TranslateSkyPartitionRel(sky_rel);
}

else if (extension_msg.Is<skymohair::SkySliceRel>()) {
skymohair::SkySliceRel sky_rel;
extension_msg.UnpackTo(&sky_rel);

translated_rel = TranslateSkySliceRel(sky_rel);
}

// If a translator was matched and it succeeded, return the translated sub-plan
if (translated_rel != nullptr) { return translated_rel; }

// Otherwise, throw an exception
std::cerr << "Unsupported extension type: " << extension_msg.descriptor()->name() << std::endl;
throw InternalException("Unsupported extension type");
}

//! Translate Substrait Operations to DuckDB Relations
using SRelType = skysubstrait::Rel::RelTypeCase;
shared_ptr<Relation> DuckDBTranslator::TranslateOp(const skysubstrait::Rel& sop) {
switch (sop.rel_type_case()) {
case SRelType::kJoin: return TranslateJoinOp (sop.join());
case SRelType::kCross: return TranslateCrossProductOp (sop.cross());
case SRelType::kFetch: return TranslateFetchOp (sop.fetch());
case SRelType::kFilter: return TranslateFilterOp (sop.filter());
case SRelType::kProject: return TranslateProjectOp (sop.project());
case SRelType::kAggregate: return TranslateAggregateOp (sop.aggregate());
case SRelType::kRead: return TranslateReadOp (sop.read());
case SRelType::kSort: return TranslateSortOp (sop.sort());
case SRelType::kSet: return TranslateSetOp (sop.set());
case SRelType::kExtensionLeaf: return TranslateExtensionLeafOp(sop.extension_leaf());
default:
throw InternalException(
"Unsupported relation type " + to_string(sop.rel_type_case())
Expand Down

0 comments on commit 8b388ab

Please sign in to comment.