diff --git a/CMakeLists.txt b/CMakeLists.txt index 3a21a9eb934d..dd139973a614 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -41,6 +41,7 @@ option(VELOX_ENABLE_PARSE "Build parser used for unit tests." ON) option(VELOX_ENABLE_EXAMPLES "Build examples. This will enable VELOX_ENABLE_EXPRESSION automatically." ON) +option(VELOX_ENABLE_SUBSTRAIT "Buid Substrait-to-Velox converter." OFF) option(VELOX_ENABLE_BENCHMARKS "Build velox top level benchmarks." OFF) option(VELOX_ENABLE_S3 "Build S3 Connector" OFF) option(VELOX_ENABLE_PARQUET "Enable Parquet support" OFF) @@ -60,6 +61,7 @@ if(${VELOX_BUILD_MINIMAL}) set(VELOX_ENABLE_SPARK_FUNCTIONS OFF) set(VELOX_ENABLE_EXAMPLES OFF) set(VELOX_ENABLE_S3 OFF) + set(VELOX_ENABLE_SUBSTRAIT OFF) endif() if(${VELOX_BUILD_TESTING}) diff --git a/velox/CMakeLists.txt b/velox/CMakeLists.txt index 93f25acb654c..93077d983070 100644 --- a/velox/CMakeLists.txt +++ b/velox/CMakeLists.txt @@ -64,3 +64,8 @@ endif() if(${VELOX_CODEGEN_SUPPORT}) add_subdirectory(experimental/codegen) endif() + +# substrait converter +if(${VELOX_ENABLE_SUBSTRAIT}) + add_subdirectory(substrait) +endif() diff --git a/velox/substrait/CMakeLists.txt b/velox/substrait/CMakeLists.txt new file mode 100644 index 000000000000..923b337b8545 --- /dev/null +++ b/velox/substrait/CMakeLists.txt @@ -0,0 +1,54 @@ +# Copyright (c) Facebook, Inc. and its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Set up Proto +set(proto_directory ${CMAKE_CURRENT_SOURCE_DIR}/proto) +set(substrait_proto_directory ${CMAKE_CURRENT_SOURCE_DIR}/proto/substrait) +set(PROTO_OUTPUT_DIR "${CMAKE_CURRENT_SOURCE_DIR}/proto/") +file(MAKE_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/proto/substrait) +file(GLOB PROTO_FILES ${substrait_proto_directory}/*.proto + ${substrait_proto_directory}/extensions/*.proto) +foreach(PROTO ${PROTO_FILES}) + file(RELATIVE_PATH REL_PROTO ${substrait_proto_directory} ${PROTO}) + string(REGEX REPLACE "\\.proto" "" PROTO_NAME ${REL_PROTO}) + list(APPEND PROTO_SRCS "${PROTO_OUTPUT_DIR}/substrait/${PROTO_NAME}.pb.cc") + list(APPEND PROTO_HDRS "${PROTO_OUTPUT_DIR}/substrait/${PROTO_NAME}.pb.h") +endforeach() +set(PROTO_OUTPUT_FILES ${PROTO_HDRS} ${PROTO_SRCS}) +set_source_files_properties(${PROTO_OUTPUT_FILES} PROPERTIES GENERATED TRUE) + +get_filename_component(PROTO_DIR ${substrait_proto_directory}/, DIRECTORY) + +# Generate Substrait hearders +add_custom_command( + OUTPUT ${PROTO_OUTPUT_FILES} + COMMAND protoc --proto_path ${proto_directory}/ --cpp_out ${PROTO_OUTPUT_DIR} + ${PROTO_FILES} + DEPENDS ${PROTO_DIR} + COMMENT "Running PROTO compiler" + VERBATIM) +add_custom_target(substrait_proto ALL DEPENDS ${PROTO_OUTPUT_FILES}) +add_dependencies(substrait_proto protobuf::libprotobuf) + +set(SRCS ${PROTO_SRCS} SubstraitUtils.cpp SubstraitToVeloxExpr.cpp + SubstraitToVeloxPlan.cpp TypeUtils.cpp) +add_library(velox_substrait_plan_converter ${SRCS}) +target_include_directories(velox_substrait_plan_converter + PUBLIC ${PROTO_OUTPUT_DIR}) +target_link_libraries(velox_substrait_plan_converter velox_connector + velox_dwio_dwrf_common) + +if(${VELOX_BUILD_TESTING}) + add_subdirectory(tests) +endif() diff --git a/velox/substrait/SubstraitToVeloxExpr.cpp b/velox/substrait/SubstraitToVeloxExpr.cpp new file mode 100644 index 000000000000..0785cfc168cb --- /dev/null +++ b/velox/substrait/SubstraitToVeloxExpr.cpp @@ -0,0 +1,94 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/substrait/SubstraitToVeloxExpr.h" +#include "velox/substrait/TypeUtils.h" + +namespace facebook::velox::substrait { + +std::shared_ptr +SubstraitVeloxExprConverter::toVeloxExpr( + const ::substrait::Expression::FieldReference& sField, + int32_t inputPlanNodeId) { + auto typeCase = sField.reference_type_case(); + switch (typeCase) { + case ::substrait::Expression::FieldReference::ReferenceTypeCase:: + kDirectReference: { + auto dRef = sField.direct_reference(); + int32_t colIdx = subParser_->parseReferenceSegment(dRef); + auto fieldName = subParser_->makeNodeName(inputPlanNodeId, colIdx); + // TODO: Get the input type and support different types here. + return std::make_shared( + DOUBLE(), fieldName); + } + default: + VELOX_NYI( + "Substrait conversion not supported for Reference '{}'", typeCase); + } +} + +std::shared_ptr +SubstraitVeloxExprConverter::toVeloxExpr( + const ::substrait::Expression::ScalarFunction& sFunc, + int32_t inputPlanNodeId) { + std::vector> params; + params.reserve(sFunc.args().size()); + for (const auto& sArg : sFunc.args()) { + params.emplace_back(toVeloxExpr(sArg, inputPlanNodeId)); + } + auto functionId = sFunc.function_reference(); + auto veloxFunction = subParser_->findVeloxFunction(functionMap_, functionId); + auto subType = subParser_->parseType(sFunc.output_type()); + auto veloxType = toVeloxType(subType->type); + return std::make_shared( + veloxType, std::move(params), veloxFunction); +} + +std::shared_ptr +SubstraitVeloxExprConverter::toVeloxExpr( + const ::substrait::Expression::Literal& sLit) { + auto typeCase = sLit.literal_type_case(); + switch (typeCase) { + case ::substrait::Expression_Literal::LiteralTypeCase::kFp64: + return std::make_shared(sLit.fp64()); + case ::substrait::Expression_Literal::LiteralTypeCase::kBoolean: + return std::make_shared(sLit.boolean()); + default: + VELOX_NYI( + "Substrait conversion not supported for type case '{}'", typeCase); + } +} + +std::shared_ptr +SubstraitVeloxExprConverter::toVeloxExpr( + const ::substrait::Expression& sExpr, + int32_t inputPlanNodeId) { + std::shared_ptr veloxExpr; + auto typeCase = sExpr.rex_type_case(); + switch (typeCase) { + case ::substrait::Expression::RexTypeCase::kLiteral: + return toVeloxExpr(sExpr.literal()); + case ::substrait::Expression::RexTypeCase::kScalarFunction: + return toVeloxExpr(sExpr.scalar_function(), inputPlanNodeId); + case ::substrait::Expression::RexTypeCase::kSelection: + return toVeloxExpr(sExpr.selection(), inputPlanNodeId); + default: + VELOX_NYI( + "Substrait conversion not supported for Expression '{}'", typeCase); + } +} + +} // namespace facebook::velox::substrait diff --git a/velox/substrait/SubstraitToVeloxExpr.h b/velox/substrait/SubstraitToVeloxExpr.h new file mode 100644 index 000000000000..ef6c393ee674 --- /dev/null +++ b/velox/substrait/SubstraitToVeloxExpr.h @@ -0,0 +1,65 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "velox/core/Expressions.h" +#include "velox/substrait/SubstraitUtils.h" + +namespace facebook::velox::substrait { + +/// This class is used to convert Substrait representations to Velox +/// expressions. +class SubstraitVeloxExprConverter { + public: + /// subParser: A Substrait parser used to convert Substrait representations + /// into recognizable representations. functionMap: A pre-constructed map + /// storing the relations between the function id and the function name. + SubstraitVeloxExprConverter( + const std::shared_ptr& subParser, + const std::unordered_map& functionMap) + : subParser_(subParser), functionMap_(functionMap) {} + + /// Used to convert Substrait Field into Velox Field Expression. + std::shared_ptr toVeloxExpr( + const ::substrait::Expression::FieldReference& sField, + int32_t inputPlanNodeId); + + /// Used to convert Substrait ScalarFunction into Velox Expression. + std::shared_ptr toVeloxExpr( + const ::substrait::Expression::ScalarFunction& sFunc, + int32_t inputPlanNodeId); + + /// Used to convert Substrait Literal into Velox Expression. + std::shared_ptr toVeloxExpr( + const ::substrait::Expression::Literal& sLit); + + /// Used to convert Substrait Expression into Velox Expression. + std::shared_ptr toVeloxExpr( + const ::substrait::Expression& sExpr, + int32_t inputPlanNodeId); + + private: + /// The Substrait parser used to convert Substrait representations into + /// recognizable representations. + std::shared_ptr subParser_; + + /// The map storing the relations between the function id and the function + /// name. + std::unordered_map functionMap_; +}; + +} // namespace facebook::velox::substrait diff --git a/velox/substrait/SubstraitToVeloxPlan.cpp b/velox/substrait/SubstraitToVeloxPlan.cpp new file mode 100644 index 000000000000..03c9e04fa810 --- /dev/null +++ b/velox/substrait/SubstraitToVeloxPlan.cpp @@ -0,0 +1,518 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/substrait/SubstraitToVeloxPlan.h" +#include "velox/substrait/TypeUtils.h" + +namespace facebook::velox::substrait { + +std::shared_ptr SubstraitVeloxPlanConverter::toVeloxPlan( + const ::substrait::AggregateRel& sAgg) { + std::shared_ptr childNode; + if (sAgg.has_input()) { + childNode = toVeloxPlan(sAgg.input()); + } else { + VELOX_FAIL("Child Rel is expected in AggregateRel."); + } + + // Construct Velox grouping expressions. + auto inputTypes = childNode->outputType(); + std::vector> + veloxGroupingExprs; + const auto& groupings = sAgg.groupings(); + int inputPlanNodeId = planNodeId_ - 1; + // The index of output column. + int outIdx = 0; + for (const auto& grouping : groupings) { + auto groupingExprs = grouping.grouping_expressions(); + for (const auto& groupingExpr : groupingExprs) { + // Velox's groupings are limited to be Field, so groupingExpr is + // expected to be FieldReference. + auto fieldExpr = exprConverter_->toVeloxExpr( + groupingExpr.selection(), inputPlanNodeId); + veloxGroupingExprs.emplace_back(fieldExpr); + outIdx += 1; + } + } + + // Parse measures to get Aggregation phase and expressions. + bool phaseInited = false; + core::AggregationNode::Step aggStep; + // Project expressions are used to conduct a pre-projection before + // Aggregation if needed. + std::vector> projectExprs; + std::vector projectOutNames; + std::vector> aggExprs; + aggExprs.reserve(sAgg.measures().size()); + + // Construct Velox Aggregate expressions. + for (const auto& sMea : sAgg.measures()) { + auto aggFunction = sMea.measure(); + // Get the params of this Aggregate function. + std::vector> aggParams; + auto args = aggFunction.args(); + aggParams.reserve(args.size()); + for (auto arg : args) { + auto typeCase = arg.rex_type_case(); + switch (typeCase) { + case ::substrait::Expression::RexTypeCase::kSelection: { + aggParams.emplace_back( + exprConverter_->toVeloxExpr(arg.selection(), inputPlanNodeId)); + break; + } + case ::substrait::Expression::RexTypeCase::kScalarFunction: { + // Pre-projection is needed before Aggregate. + // The input of Aggregatation will be the output of the + // pre-projection. + auto sFunc = arg.scalar_function(); + projectExprs.emplace_back( + exprConverter_->toVeloxExpr(sFunc, inputPlanNodeId)); + auto colOutName = subParser_->makeNodeName(planNodeId_, outIdx); + projectOutNames.emplace_back(colOutName); + auto outType = subParser_->parseType(sFunc.output_type()); + auto aggInputParam = + std::make_shared( + toVeloxType(outType->type), colOutName); + aggParams.emplace_back(aggInputParam); + break; + } + default: + VELOX_NYI( + "Substrait conversion not supported for arg type '{}'", typeCase); + } + } + auto funcId = aggFunction.function_reference(); + auto funcName = subParser_->findVeloxFunction(functionMap_, funcId); + auto aggOutType = subParser_->parseType(aggFunction.output_type()); + auto aggExpr = std::make_shared( + toVeloxType(aggOutType->type), std::move(aggParams), funcName); + aggExprs.emplace_back(aggExpr); + + // Initialize the Aggregate Step. + if (!phaseInited) { + auto phase = aggFunction.phase(); + switch (phase) { + case ::substrait::AGGREGATION_PHASE_INITIAL_TO_INTERMEDIATE: + aggStep = core::AggregationNode::Step::kPartial; + break; + case ::substrait::AGGREGATION_PHASE_INTERMEDIATE_TO_INTERMEDIATE: + aggStep = core::AggregationNode::Step::kIntermediate; + break; + case ::substrait::AGGREGATION_PHASE_INTERMEDIATE_TO_RESULT: + aggStep = core::AggregationNode::Step::kFinal; + break; + default: + VELOX_NYI("Substrait conversion not supported for phase '{}'", phase); + } + phaseInited = true; + } + outIdx += 1; + } + + // Construct the Aggregate Node. + bool ignoreNullKeys = false; + std::vector> aggregateMasks( + outIdx); + std::vector> + preGroupingExprs; + if (projectOutNames.size() == 0) { + // Conduct Aggregation directly. + std::vector aggOutNames; + aggOutNames.reserve(outIdx); + for (int idx = 0; idx < outIdx; idx++) { + aggOutNames.emplace_back(subParser_->makeNodeName(planNodeId_, idx)); + } + return std::make_shared( + nextPlanNodeId(), + aggStep, + veloxGroupingExprs, + preGroupingExprs, + aggOutNames, + aggExprs, + aggregateMasks, + ignoreNullKeys, + childNode); + } else { + // A Project Node is needed before Aggregation. + auto projectNode = std::make_shared( + nextPlanNodeId(), + std::move(projectOutNames), + std::move(projectExprs), + childNode); + std::vector aggOutNames; + aggOutNames.reserve(outIdx); + for (int idx = 0; idx < outIdx; idx++) { + aggOutNames.emplace_back(subParser_->makeNodeName(planNodeId_, idx)); + } + return std::make_shared( + nextPlanNodeId(), + aggStep, + veloxGroupingExprs, + preGroupingExprs, + aggOutNames, + aggExprs, + aggregateMasks, + ignoreNullKeys, + projectNode); + } +} + +std::shared_ptr SubstraitVeloxPlanConverter::toVeloxPlan( + const ::substrait::ProjectRel& sProject) { + std::shared_ptr childNode; + if (sProject.has_input()) { + childNode = toVeloxPlan(sProject.input()); + } else { + VELOX_FAIL("Child Rel is expected in ProjectRel."); + } + + // Construct Velox Expressions. + auto projectExprs = sProject.expressions(); + std::vector projectNames; + std::vector> expressions; + projectNames.reserve(projectExprs.size()); + expressions.reserve(projectExprs.size()); + auto prePlanNodeId = planNodeId_ - 1; + int colIdx = 0; + for (const auto& expr : projectExprs) { + expressions.emplace_back(exprConverter_->toVeloxExpr(expr, prePlanNodeId)); + projectNames.emplace_back(subParser_->makeNodeName(planNodeId_, colIdx)); + colIdx += 1; + } + + auto projectNode = std::make_shared( + nextPlanNodeId(), + std::move(projectNames), + std::move(expressions), + childNode); + return projectNode; +} + +std::shared_ptr SubstraitVeloxPlanConverter::toVeloxPlan( + const ::substrait::FilterRel& sFilter) { + // TODO: Currently Filter is skipped because Filter is Pushdowned to + // TableScan. + std::shared_ptr childNode; + if (sFilter.has_input()) { + childNode = toVeloxPlan(sFilter.input()); + } else { + VELOX_FAIL("Child Rel is expected in FilterRel."); + } + return childNode; +} + +std::shared_ptr SubstraitVeloxPlanConverter::toVeloxPlan( + const ::substrait::ReadRel& sRead, + u_int32_t& index, + std::vector& paths, + std::vector& starts, + std::vector& lengths) { + // Get output names and types. + std::vector colNameList; + std::vector veloxTypeList; + if (sRead.has_base_schema()) { + const auto& baseSchema = sRead.base_schema(); + colNameList.reserve(baseSchema.names().size()); + for (const auto& name : baseSchema.names()) { + colNameList.emplace_back(name); + } + auto substraitTypeList = subParser_->parseNamedStruct(baseSchema); + veloxTypeList.reserve(substraitTypeList.size()); + for (const auto& subType : substraitTypeList) { + veloxTypeList.emplace_back(toVeloxType(subType->type)); + } + } + + // Parse local files + if (sRead.has_local_files()) { + const auto& fileList = sRead.local_files().items(); + paths.reserve(fileList.size()); + starts.reserve(fileList.size()); + lengths.reserve(fileList.size()); + for (const auto& file : fileList) { + // Expect all Partitions share the same index. + index = file.partition_index(); + paths.emplace_back(file.uri_file()); + starts.emplace_back(file.start()); + lengths.emplace_back(file.length()); + } + } + + // Velox requires Filter Pushdown must being enabled. + bool filterPushdownEnabled = true; + std::shared_ptr tableHandle; + if (!sRead.has_filter()) { + tableHandle = std::make_shared( + filterPushdownEnabled, connector::hive::SubfieldFilters{}, nullptr); + } else { + connector::hive::SubfieldFilters filters = + toVeloxFilter(colNameList, veloxTypeList, sRead.filter()); + tableHandle = std::make_shared( + filterPushdownEnabled, std::move(filters), nullptr); + } + + // Get assignments and out names. + std::vector outNames; + outNames.reserve(colNameList.size()); + std::unordered_map> + assignments; + for (int idx = 0; idx < colNameList.size(); idx++) { + auto outName = subParser_->makeNodeName(planNodeId_, idx); + assignments[outName] = std::make_shared( + colNameList[idx], + connector::hive::HiveColumnHandle::ColumnType::kRegular, + veloxTypeList[idx]); + outNames.emplace_back(outName); + } + auto outputType = ROW(std::move(outNames), std::move(veloxTypeList)); + + auto tableScanNode = std::make_shared( + nextPlanNodeId(), outputType, tableHandle, assignments); + return tableScanNode; +} + +std::shared_ptr SubstraitVeloxPlanConverter::toVeloxPlan( + const ::substrait::Rel& sRel) { + if (sRel.has_aggregate()) { + return toVeloxPlan(sRel.aggregate()); + } + if (sRel.has_project()) { + return toVeloxPlan(sRel.project()); + } + if (sRel.has_filter()) { + return toVeloxPlan(sRel.filter()); + } + if (sRel.has_read()) { + return toVeloxPlan(sRel.read(), partitionIndex_, paths_, starts_, lengths_); + } + VELOX_NYI("Substrait conversion not supported for Rel."); +} + +std::shared_ptr SubstraitVeloxPlanConverter::toVeloxPlan( + const ::substrait::RelRoot& sRoot) { + // TODO: Use the names as the output names for the whole computing. + const auto& sNames = sRoot.names(); + if (sRoot.has_input()) { + const auto& sRel = sRoot.input(); + return toVeloxPlan(sRel); + } + VELOX_FAIL("Input is expected in RelRoot."); +} + +std::shared_ptr SubstraitVeloxPlanConverter::toVeloxPlan( + const ::substrait::Plan& sPlan) { + // Construct the function map based on the Substrait representation. + for (const auto& sExtension : sPlan.extensions()) { + if (!sExtension.has_extension_function()) { + continue; + } + const auto& sFmap = sExtension.extension_function(); + auto id = sFmap.function_anchor(); + auto name = sFmap.name(); + functionMap_[id] = name; + } + + // Construct the expression converter. + exprConverter_ = + std::make_shared(subParser_, functionMap_); + + // In fact, only one RelRoot or Rel is expected here. + for (const auto& sRel : sPlan.relations()) { + if (sRel.has_root()) { + return toVeloxPlan(sRel.root()); + } + if (sRel.has_rel()) { + return toVeloxPlan(sRel.rel()); + } + } + VELOX_FAIL("RelRoot or Rel is expected in Plan."); +} + +std::string SubstraitVeloxPlanConverter::nextPlanNodeId() { + auto id = fmt::format("{}", planNodeId_); + planNodeId_++; + return id; +} + +// This class contains the needed infos for Filter Pushdown. +// TODO: Support different types here. +class FilterInfo { + public: + // Used to set the left bound. + void setLeft(double left, bool isExclusive) { + left_ = left; + leftExclusive_ = isExclusive; + if (!isInitialized_) { + isInitialized_ = true; + } + } + + // Used to set the right bound. + void setRight(double right, bool isExclusive) { + right_ = right; + rightExclusive_ = isExclusive; + if (!isInitialized_) { + isInitialized_ = true; + } + } + + // Will fordis Null value if called once. + void forbidsNull() { + nullAllowed_ = false; + if (!isInitialized_) { + isInitialized_ = true; + } + } + + // Return the initialization status. + bool isInitialized() { + return isInitialized_ ? true : false; + } + + // The left bound. + std::optional left_ = std::nullopt; + // The right bound. + std::optional right_ = std::nullopt; + // The Null allowing. + bool nullAllowed_ = true; + // If true, left bound will be exclusive. + bool leftExclusive_ = false; + // If true, right bound will be exclusive. + bool rightExclusive_ = false; + + private: + bool isInitialized_ = false; +}; + +connector::hive::SubfieldFilters SubstraitVeloxPlanConverter::toVeloxFilter( + const std::vector& inputNameList, + const std::vector& inputTypeList, + const ::substrait::Expression& sFilter) { + connector::hive::SubfieldFilters filters; + // A map between the column index and the FilterInfo for that column. + std::unordered_map> colInfoMap; + for (int idx = 0; idx < inputNameList.size(); idx++) { + colInfoMap[idx] = std::make_shared(); + } + + std::vector<::substrait::Expression_ScalarFunction> scalarFunctions; + flattenConditions(sFilter, scalarFunctions); + // Construct the FilterInfo for the related column. + for (const auto& scalarFunction : scalarFunctions) { + auto filterNameSpec = subParser_->findSubstraitFuncSpec( + functionMap_, scalarFunction.function_reference()); + auto filterName = subParser_->getSubFunctionName(filterNameSpec); + int32_t colIdx; + // TODO: Add different types' support here. + double val; + for (auto& param : scalarFunction.args()) { + auto typeCase = param.rex_type_case(); + switch (typeCase) { + case ::substrait::Expression::RexTypeCase::kSelection: { + auto sel = param.selection(); + // TODO: Only direct reference is considered here. + auto dRef = sel.direct_reference(); + colIdx = subParser_->parseReferenceSegment(dRef); + break; + } + case ::substrait::Expression::RexTypeCase::kLiteral: { + auto sLit = param.literal(); + // TODO: Only double is considered here. + val = sLit.fp64(); + break; + } + default: + VELOX_NYI( + "Substrait conversion not supported for arg type '{}'", typeCase); + } + } + if (filterName == "is_not_null") { + colInfoMap[colIdx]->forbidsNull(); + } else if (filterName == "gte") { + colInfoMap[colIdx]->setLeft(val, false); + } else if (filterName == "gt") { + colInfoMap[colIdx]->setLeft(val, true); + } else if (filterName == "lte") { + colInfoMap[colIdx]->setRight(val, false); + } else if (filterName == "lt") { + colInfoMap[colIdx]->setRight(val, true); + } else { + VELOX_NYI( + "Substrait conversion not supported for filter name '{}'", + filterName); + } + } + + // Construct the Filters. + for (int idx = 0; idx < inputNameList.size(); idx++) { + auto filterInfo = colInfoMap[idx]; + double leftBound; + double rightBound; + bool leftUnbounded = true; + bool rightUnbounded = true; + bool leftExclusive = false; + bool rightExclusive = false; + if (filterInfo->isInitialized()) { + if (filterInfo->left_) { + leftUnbounded = false; + leftBound = filterInfo->left_.value(); + leftExclusive = filterInfo->leftExclusive_; + } + if (filterInfo->right_) { + rightUnbounded = false; + rightBound = filterInfo->right_.value(); + rightExclusive = filterInfo->rightExclusive_; + } + bool nullAllowed = filterInfo->nullAllowed_; + filters[common::Subfield(inputNameList[idx])] = + std::make_unique( + leftBound, + leftUnbounded, + leftExclusive, + rightBound, + rightUnbounded, + rightExclusive, + nullAllowed); + } + } + return filters; +} + +void SubstraitVeloxPlanConverter::flattenConditions( + const ::substrait::Expression& sFilter, + std::vector<::substrait::Expression_ScalarFunction>& scalarFunctions) { + auto typeCase = sFilter.rex_type_case(); + switch (typeCase) { + case ::substrait::Expression::RexTypeCase::kScalarFunction: { + auto sFunc = sFilter.scalar_function(); + auto filterNameSpec = subParser_->findSubstraitFuncSpec( + functionMap_, sFunc.function_reference()); + // TODO: Only and relation is supported here. + if (subParser_->getSubFunctionName(filterNameSpec) == "and") { + for (const auto& sCondition : sFunc.args()) { + flattenConditions(sCondition, scalarFunctions); + } + } else { + scalarFunctions.emplace_back(sFunc); + } + break; + } + default: + VELOX_NYI("GetFlatConditions not supported for type '{}'", typeCase); + } +} + +} // namespace facebook::velox::substrait diff --git a/velox/substrait/SubstraitToVeloxPlan.h b/velox/substrait/SubstraitToVeloxPlan.h new file mode 100644 index 000000000000..982240992a32 --- /dev/null +++ b/velox/substrait/SubstraitToVeloxPlan.h @@ -0,0 +1,131 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "velox/connectors/hive/HiveConnector.h" +#include "velox/substrait/SubstraitToVeloxExpr.h" + +namespace facebook::velox::substrait { + +/// This class is used to convert the Substrait plan into Velox plan. +class SubstraitVeloxPlanConverter { + public: + /// Used to convert Substrait AggregateRel into Velox PlanNode. + std::shared_ptr toVeloxPlan( + const ::substrait::AggregateRel& sAgg); + + /// Used to convert Substrait ProjectRel into Velox PlanNode. + std::shared_ptr toVeloxPlan( + const ::substrait::ProjectRel& sProject); + + /// Used to convert Substrait FilterRel into Velox PlanNode. + std::shared_ptr toVeloxPlan( + const ::substrait::FilterRel& sFilter); + + /// Used to convert Substrait ReadRel into Velox PlanNode. + /// Index: the index of the partition this item belongs to. + /// Starts: the start positions in byte to read from the items. + /// Lengths: the lengths in byte to read from the items. + std::shared_ptr toVeloxPlan( + const ::substrait::ReadRel& sRead, + u_int32_t& index, + std::vector& paths, + std::vector& starts, + std::vector& lengths); + + /// Used to convert Substrait Rel into Velox PlanNode. + std::shared_ptr toVeloxPlan( + const ::substrait::Rel& sRel); + + /// Used to convert Substrait RelRoot into Velox PlanNode. + std::shared_ptr toVeloxPlan( + const ::substrait::RelRoot& sRoot); + + /// Used to convert Substrait Plan into Velox PlanNode. + std::shared_ptr toVeloxPlan( + const ::substrait::Plan& sPlan); + + /// Will return the index of Partition to be scanned. + u_int32_t getPartitionIndex() { + return partitionIndex_; + } + + /// Will return the paths of the files to be scanned. + const std::vector& getPaths() { + return paths_; + } + + /// Will return the starts of the files to be scanned. + const std::vector& getStarts() { + return starts_; + } + + /// Will return the lengths to be scanned for each file. + const std::vector& getLengths() { + return lengths_; + } + + private: + /// The Partition index. + u_int32_t partitionIndex_; + + /// The file paths to be scanned. + std::vector paths_; + + /// The file starts in the scan. + std::vector starts_; + + /// The lengths to be scanned. + std::vector lengths_; + + /// The unique identification for each PlanNode. + int planNodeId_ = 0; + + /// The map storing the relations between the function id and the function + /// name. Will be constructed based on the Substrait representation. + std::unordered_map functionMap_; + + /// The Substrait parser used to convert Substrait representations into + /// recognizable representations. + std::shared_ptr subParser_{ + std::make_shared()}; + + /// The Expression converter used to convert Substrait representations into + /// Velox expressions. + std::shared_ptr exprConverter_; + + /// A function returning current function id and adding the plan node id by + /// one once called. + std::string nextPlanNodeId(); + + /// Used to convert Substrait Filter into Velox SubfieldFilters which will + /// be used in TableScan. + connector::hive::SubfieldFilters toVeloxFilter( + const std::vector& inputNameList, + const std::vector& inputTypeList, + const ::substrait::Expression& sFilter); + + /// Multiple conditions are connected to a binary tree structure with + /// the relation key words, including AND, OR, and etc. Currently, only + /// AND is supported. This function is used to extract all the Substrait + /// conditions in the binary tree structure into a vector. + void flattenConditions( + const ::substrait::Expression& sFilter, + std::vector<::substrait::Expression_ScalarFunction>& scalarFunctions); +}; + +} // namespace facebook::velox::substrait diff --git a/velox/substrait/SubstraitUtils.cpp b/velox/substrait/SubstraitUtils.cpp new file mode 100644 index 000000000000..d3840d382ab2 --- /dev/null +++ b/velox/substrait/SubstraitUtils.cpp @@ -0,0 +1,164 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/substrait/SubstraitUtils.h" +#include "velox/common/base/Exceptions.h" + +namespace facebook::velox::substrait { + +std::shared_ptr SubstraitParser::parseType( + const ::substrait::Type& sType) { + // The used type names should be aligned with those in Velox. + std::string typeName; + ::substrait::Type_Nullability nullability; + switch (sType.kind_case()) { + case ::substrait::Type::KindCase::kBool: { + typeName = "BOOLEAN"; + nullability = sType.bool_().nullability(); + break; + } + case ::substrait::Type::KindCase::kFp64: { + typeName = "DOUBLE"; + nullability = sType.fp64().nullability(); + break; + } + case ::substrait::Type::KindCase::kStruct: { + // TODO: Support for Struct is not fully added. + typeName = "STRUCT"; + auto sStruct = sType.struct_(); + auto sTypes = sStruct.types(); + for (const auto& type : sTypes) { + parseType(type); + } + break; + } + case ::substrait::Type::KindCase::kString: { + typeName = "VARCHAR"; + nullability = sType.string().nullability(); + break; + } + default: + VELOX_NYI("Substrait parsing for type {} not supported.", typeName); + } + + bool nullable; + switch (nullability) { + case ::substrait::Type_Nullability:: + Type_Nullability_NULLABILITY_UNSPECIFIED: + nullable = true; + break; + case ::substrait::Type_Nullability::Type_Nullability_NULLABILITY_NULLABLE: + nullable = true; + break; + case ::substrait::Type_Nullability::Type_Nullability_NULLABILITY_REQUIRED: + nullable = false; + break; + default: + VELOX_NYI( + "Substrait parsing for nullability {} not supported.", nullability); + } + SubstraitType subType = {typeName, nullable}; + return std::make_shared(subType); +} + +std::vector> +SubstraitParser::parseNamedStruct(const ::substrait::NamedStruct& namedStruct) { + // Names is not used currently. + const auto& sNames = namedStruct.names(); + // Parse Struct. + const auto& sStruct = namedStruct.struct_(); + const auto& sTypes = sStruct.types(); + std::vector> + substraitTypeList; + substraitTypeList.reserve(sTypes.size()); + for (const auto& type : sTypes) { + substraitTypeList.emplace_back(parseType(type)); + } + return substraitTypeList; +} + +int32_t SubstraitParser::parseReferenceSegment( + const ::substrait::Expression::ReferenceSegment& sRef) { + auto typeCase = sRef.reference_type_case(); + switch (typeCase) { + case ::substrait::Expression::ReferenceSegment::ReferenceTypeCase:: + kStructField: { + return sRef.struct_field().field(); + } + default: + VELOX_NYI( + "Substrait conversion not supported for ReferenceSegment '{}'", + typeCase); + } +} + +std::vector SubstraitParser::makeNames( + const std::string& prefix, + int size) { + std::vector names; + names.reserve(size); + for (int i = 0; i < size; i++) { + names.emplace_back(fmt::format("{}_{}", prefix, i)); + } + return names; +} + +std::string SubstraitParser::makeNodeName(int node_id, int col_idx) { + return fmt::format("n{}_{}", node_id, col_idx); +} + +std::string SubstraitParser::findSubstraitFuncSpec( + const std::unordered_map& functionMap, + uint64_t id) const { + if (functionMap.find(id) == functionMap.end()) { + VELOX_FAIL("Could not find function id {} in function map.", id); + } + std::unordered_map& map = + const_cast&>(functionMap); + return map[id]; +} + +std::string SubstraitParser::getSubFunctionName( + const std::string& subFuncSpec) const { + // Get the position of ":" in the function name. + std::size_t pos = subFuncSpec.find(":"); + if (pos == std::string::npos) { + return subFuncSpec; + } + return subFuncSpec.substr(0, pos); +} + +std::string SubstraitParser::findVeloxFunction( + const std::unordered_map& functionMap, + uint64_t id) const { + std::string subFuncSpec = findSubstraitFuncSpec(functionMap, id); + std::string subFuncName = getSubFunctionName(subFuncSpec); + return mapToVeloxFunction(subFuncName); +} + +std::string SubstraitParser::mapToVeloxFunction( + const std::string& subFunc) const { + auto it = substraitVeloxFunctionMap.find(subFunc); + if (it != substraitVeloxFunctionMap.end()) { + return it->second; + } + + // If not finding the mapping from Substrait function name to Velox function + // name, the original Substrait function name will be used. + return subFunc; +} + +} // namespace facebook::velox::substrait diff --git a/velox/substrait/SubstraitUtils.h b/velox/substrait/SubstraitUtils.h new file mode 100644 index 000000000000..e8e02abd3fe8 --- /dev/null +++ b/velox/substrait/SubstraitUtils.h @@ -0,0 +1,89 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "velox/substrait/proto/substrait/algebra.pb.h" +#include "velox/substrait/proto/substrait/capabilities.pb.h" +#include "velox/substrait/proto/substrait/extensions/extensions.pb.h" +#include "velox/substrait/proto/substrait/function.pb.h" +#include "velox/substrait/proto/substrait/parameterized_types.pb.h" +#include "velox/substrait/proto/substrait/plan.pb.h" +#include "velox/substrait/proto/substrait/type.pb.h" +#include "velox/substrait/proto/substrait/type_expressions.pb.h" + +namespace facebook::velox::substrait { + +/// This class contains some common functions used to parse Substrait +/// components, and convert them into recognizable representations. +class SubstraitParser { + public: + /// Used to store the type name and nullability. + struct SubstraitType { + std::string type; + bool nullable; + }; + + /// Used to parse Substrait NamedStruct. + std::vector> parseNamedStruct( + const ::substrait::NamedStruct& namedStruct); + + /// Used to parse Substrait Type. + std::shared_ptr parseType(const ::substrait::Type& sType); + + /// Used to parse Substrait ReferenceSegment. + int32_t parseReferenceSegment( + const ::substrait::Expression::ReferenceSegment& sRef); + + /// Used to make names in the format of {prefix}_{index}. + std::vector makeNames(const std::string& prefix, int size); + + /// Used to make node name in the format of n{nodeId}_{colIdx}. + std::string makeNodeName(int nodeId, int colIdx); + + /// Used to find the Substrait function name according to the function id + /// from a pre-constructed function map. The function specification can be + /// a simple name or a compound name. The compound name format is: + /// :__..._. + /// Currently, the input types in the function specification are not used. But + /// in the future, they should be used for the validation according the + /// specifications in Substrait yaml files. + std::string findSubstraitFuncSpec( + const std::unordered_map& functionMap, + uint64_t id) const; + + /// This function is used to get the function name from the compound name. + /// When the input is a simple name, it will be returned. + std::string getSubFunctionName(const std::string& subFuncSpec) const; + + /// Used to find the Velox function name according to the function id + /// from a pre-constructed function map. + std::string findVeloxFunction( + const std::unordered_map& functionMap, + uint64_t id) const; + + /// Used to map the Substrait function key word into Velox function key word. + std::string mapToVeloxFunction(const std::string& subFunc) const; + + private: + /// Used for mapping Substrait function key words into Velox functions' key + /// words. Key: the Substrait function key word, Value: the Velox function key + /// word. For those functions with different names in Substrait and Velox, + /// a mapping relation should be added here. + std::unordered_map substraitVeloxFunctionMap; +}; + +} // namespace facebook::velox::substrait diff --git a/velox/substrait/TypeUtils.cpp b/velox/substrait/TypeUtils.cpp new file mode 100644 index 000000000000..e17797cf8fb2 --- /dev/null +++ b/velox/substrait/TypeUtils.cpp @@ -0,0 +1,60 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/type/Type.h" + +namespace facebook::velox::substrait { + +bool isString(const TypePtr& type) { + switch (type->kind()) { + case TypeKind::VARCHAR: + case TypeKind::VARBINARY: + return true; + default: + break; + } + return false; +} + +int64_t bytesOfType(const TypePtr& type) { + auto typeKind = type->kind(); + switch (typeKind) { + case TypeKind::INTEGER: + return 4; + case TypeKind::BIGINT: + case TypeKind::REAL: + case TypeKind::DOUBLE: + return 8; + default: + VELOX_NYI("Returning bytes of Type not supported for type {}.", typeKind); + } +} + +TypePtr toVeloxType(const std::string& typeName) { + auto typeKind = mapNameToTypeKind(typeName); + if (typeKind == TypeKind::BOOLEAN) { + return BOOLEAN(); + } + if (typeKind == TypeKind::DOUBLE) { + return DOUBLE(); + } + if (typeKind == TypeKind::VARCHAR) { + return VARCHAR(); + } + VELOX_NYI("Velox type conversion not supported for type {}.", typeName); +} + +} // namespace facebook::velox::substrait diff --git a/velox/substrait/TypeUtils.h b/velox/substrait/TypeUtils.h new file mode 100644 index 000000000000..42777bb0ba5d --- /dev/null +++ b/velox/substrait/TypeUtils.h @@ -0,0 +1,30 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/type/Type.h" + +namespace facebook::velox::substrait { + +/// Return whether the type is String type. +bool isString(const TypePtr& type); + +/// Return the occupied bytes for a variable of this type. +int64_t bytesOfType(const TypePtr& type); + +/// Return the Velox type according to the typename. +TypePtr toVeloxType(const std::string& typeName); + +} // namespace facebook::velox::substrait diff --git a/velox/substrait/proto/substrait/.clang-format b/velox/substrait/proto/substrait/.clang-format new file mode 100644 index 000000000000..88fd00791f8f --- /dev/null +++ b/velox/substrait/proto/substrait/.clang-format @@ -0,0 +1,4 @@ +--- +Language: Proto +DisableFormat: true +... diff --git a/velox/substrait/proto/substrait/algebra.proto b/velox/substrait/proto/substrait/algebra.proto new file mode 100644 index 000000000000..b46b3db6370e --- /dev/null +++ b/velox/substrait/proto/substrait/algebra.proto @@ -0,0 +1,702 @@ +syntax = "proto3"; + +package substrait; + +import "velox/substrait/proto/substrait/type.proto"; +import "velox/substrait/proto/substrait/extensions/extensions.proto"; +import "google/protobuf/any.proto"; + +option java_multiple_files = true; +option java_package = "io.substrait.proto"; +option csharp_namespace = "Substrait.Protobuf"; + +message RelCommon { + + oneof emit_kind { + Direct direct = 1; + Emit emit = 2; + } + + Hint hint = 3; + substrait.extensions.AdvancedExtension advanced_extension = 4; + + message Direct {} + message Emit { repeated int32 output_mapping = 1; } + + // Changes to the operation that can influence efficiency/performance but + // should not impact correctness. + message Hint { + Stats stats = 1; + RuntimeConstraint constraint = 2; + substrait.extensions.AdvancedExtension advanced_extension = 10; + + message Stats { + double row_count = 1; + double record_size = 2; + substrait.extensions.AdvancedExtension advanced_extension = 10; + } + + message RuntimeConstraint { + // TODO: nodes, cpu threads/%, memory, iops, etc. + + substrait.extensions.AdvancedExtension advanced_extension = 10; + } + } +} + +message ReadRel { + RelCommon common = 1; + NamedStruct base_schema = 2; + Expression filter = 3; + Expression.MaskExpression projection = 4; + substrait.extensions.AdvancedExtension advanced_extension = 10; + + oneof read_type { + VirtualTable virtual_table = 5; + LocalFiles local_files = 6; + NamedTable named_table = 7; + ExtensionTable extension_table = 8; + } + + message NamedTable { + repeated string names = 1; + substrait.extensions.AdvancedExtension advanced_extension = 10; + } + + // a table composed of literals. + message VirtualTable { repeated Expression.Literal.Struct values = 1; } + + // a stub type that can be used to extend/introduce new table types outside + // the specification. + message ExtensionTable { google.protobuf.Any detail = 1; } + + message LocalFiles { + + repeated FileOrFiles items = 1; + substrait.extensions.AdvancedExtension advanced_extension = 10; + + // Many files consist of indivisible chunks (e.g. parquet row groups + // or CSV rows). If a slice partially selects an indivisible chunk + // then the consumer should employ some rule to decide which slice to + // include the chunk in (e.g. include it in the slice that contains + // the midpoint of the chunk) + message FileOrFiles { + oneof path_type { + string uri_path = 1; + string uri_path_glob = 2; + string uri_file = 3; + string uri_folder = 4; + } + + FileFormat format = 5; + + // the index of the partition this item belongs to + uint64 partition_index = 6; + + // the start position in byte to read from this item + uint64 start = 7; + + // the length in byte to read from this item + uint64 length = 8; + + enum FileFormat { + FILE_FORMAT_UNSPECIFIED = 0; + FILE_FORMAT_PARQUET = 1; + } + } + } +} + +message ProjectRel { + RelCommon common = 1; + Rel input = 2; + repeated Expression expressions = 3; + substrait.extensions.AdvancedExtension advanced_extension = 10; +} + +message JoinRel { + RelCommon common = 1; + Rel left = 2; + Rel right = 3; + Expression expression = 4; + Expression post_join_filter = 5; + + JoinType type = 6; + + enum JoinType { + JOIN_TYPE_UNSPECIFIED = 0; + JOIN_TYPE_INNER = 1; + JOIN_TYPE_OUTER = 2; + JOIN_TYPE_LEFT = 3; + JOIN_TYPE_RIGHT = 4; + JOIN_TYPE_SEMI = 5; + JOIN_TYPE_ANTI = 6; + JOIN_TYPE_SINGLE = 7; + } + + substrait.extensions.AdvancedExtension advanced_extension = 10; +} + +message CrossRel { + RelCommon common = 1; + Rel left = 2; + Rel right = 3; + + substrait.extensions.AdvancedExtension advanced_extension = 10; +} + +message FetchRel { + RelCommon common = 1; + Rel input = 2; + int64 offset = 3; + int64 count = 4; + substrait.extensions.AdvancedExtension advanced_extension = 10; +} + +message AggregateRel { + RelCommon common = 1; + Rel input = 2; + repeated Grouping groupings = 3; + repeated Measure measures = 4; + + substrait.extensions.AdvancedExtension advanced_extension = 10; + + message Grouping { repeated Expression grouping_expressions = 1; } + + message Measure { + AggregateFunction measure = 1; + + // An optional boolean expression that acts to filter which records are + // included in the measure. True means include this record for calculation + // within the measure. + Expression filter = 2; + } +} + +message SortRel { + RelCommon common = 1; + Rel input = 2; + repeated SortField sorts = 3; + substrait.extensions.AdvancedExtension advanced_extension = 10; +} + +message FilterRel { + RelCommon common = 1; + Rel input = 2; + Expression condition = 3; + substrait.extensions.AdvancedExtension advanced_extension = 10; +} + +message SetRel { + RelCommon common = 1; + repeated Rel inputs = 2; + SetOp op = 3; + substrait.extensions.AdvancedExtension advanced_extension = 10; + + enum SetOp { + SET_OP_UNSPECIFIED = 0; + SET_OP_MINUS_PRIMARY = 1; + SET_OP_MINUS_MULTISET = 2; + SET_OP_INTERSECTION_PRIMARY = 3; + SET_OP_INTERSECTION_MULTISET = 4; + SET_OP_UNION_DISTINCT = 5; + SET_OP_UNION_ALL = 6; + } +} + +// Stub to support extension with a single input +message ExtensionSingleRel { + RelCommon common = 1; + Rel input = 2; + google.protobuf.Any detail = 3; +} + +// Stub to support extension with a zero inputs +message ExtensionLeafRel { + RelCommon common = 1; + google.protobuf.Any detail = 2; +} + +// Stub to support extension with multiple inputs +message ExtensionMultiRel { + RelCommon common = 1; + repeated Rel inputs = 2; + google.protobuf.Any detail = 3; +} + +// A relation with output field names. +// +// This is for use at the root of a `Rel` tree. +message RelRoot { + // A relation + Rel input = 1; + // Field names in depth-first order + repeated string names = 2; +} + +message Rel { + oneof rel_type { + ReadRel read = 1; + FilterRel filter = 2; + FetchRel fetch = 3; + AggregateRel aggregate = 4; + SortRel sort = 5; + JoinRel join = 6; + ProjectRel project = 7; + SetRel set = 8; + ExtensionSingleRel extension_single = 9; + ExtensionMultiRel extension_multi = 10; + ExtensionLeafRel extension_leaf = 11; + CrossRel cross = 12; + } +} + +message Expression { + oneof rex_type { + Literal literal = 1; + FieldReference selection = 2; + ScalarFunction scalar_function = 3; + WindowFunction window_function = 5; + IfThen if_then = 6; + SwitchExpression switch_expression = 7; + SingularOrList singular_or_list = 8; + MultiOrList multi_or_list = 9; + Enum enum = 10; + Cast cast = 11; + Subquery subquery = 12; + } + + message Enum { + oneof enum_kind { + string specified = 1; + Empty unspecified = 2; + } + + message Empty {} + } + + message Literal { + oneof literal_type { + bool boolean = 1; + int32 i8 = 2; + int32 i16 = 3; + int32 i32 = 5; + int64 i64 = 7; + float fp32 = 10; + double fp64 = 11; + string string = 12; + bytes binary = 13; + // Timestamp in units of microseconds since the UNIX epoch. + int64 timestamp = 14; + // Date in units of days since the UNIX epoch. + int32 date = 16; + // Time in units of microseconds past midnight + int64 time = 17; + IntervalYearToMonth interval_year_to_month = 19; + IntervalDayToSecond interval_day_to_second = 20; + string fixed_char = 21; + VarChar var_char = 22; + bytes fixed_binary = 23; + Decimal decimal = 24; + Struct struct = 25; + Map map = 26; + // Timestamp in units of microseconds since the UNIX epoch. + int64 timestamp_tz = 27; + bytes uuid = 28; + Type null = 29; // a typed null literal + List list = 30; + Type.List empty_list = 31; + Type.Map empty_map = 32; + } + + // whether the literal type should be treated as a nullable type. Applies to + // all members of union other than the Typed null (which should directly + // declare nullability). + bool nullable = 50; + + message VarChar { + string value = 1; + uint32 length = 2; + } + + message Decimal { + // little-endian twos-complement integer representation of complete value + // (ignoring precision) Always 16 bytes in length + bytes value = 1; + // The maximum number of digits allowed in the value. + // the maximum precision is 38. + int32 precision = 2; + // declared scale of decimal literal + int32 scale = 3; + } + + message Map { + message KeyValue { + Literal key = 1; + Literal value = 2; + } + + repeated KeyValue key_values = 1; + } + + message IntervalYearToMonth { + int32 years = 1; + int32 months = 2; + } + + message IntervalDayToSecond { + int32 days = 1; + int32 seconds = 2; + } + + message Struct { + // A possibly heterogeneously typed list of literals + repeated Literal fields = 1; + } + + message List { + // A homogeneously typed list of literals + repeated Literal values = 1; + } + } + + message ScalarFunction { + // points to a function_anchor defined in this plan + uint32 function_reference = 1; + repeated Expression args = 2; + Type output_type = 3; + } + + message WindowFunction { + // points to a function_anchor defined in this plan + uint32 function_reference = 1; + repeated Expression partitions = 2; + repeated SortField sorts = 3; + Bound upper_bound = 4; + Bound lower_bound = 5; + AggregationPhase phase = 6; + Type output_type = 7; + repeated Expression args = 8; + + message Bound { + + message Preceding { int64 offset = 1; } + + message Following { int64 offset = 1; } + + message CurrentRow {} + + message Unbounded {} + + oneof kind { + Preceding preceding = 1; + Following following = 2; + CurrentRow current_row = 3; + Unbounded unbounded = 4; + } + } + } + + message IfThen { + + repeated IfClause ifs = 1; + Expression else = 2; + + message IfClause { + Expression if = 1; + Expression then = 2; + } + } + + message Cast { + Type type = 1; + Expression input = 2; + } + + message SwitchExpression { + repeated IfValue ifs = 1; + Expression else = 2; + + message IfValue { + Literal if = 1; + Expression then = 2; + } + } + + message SingularOrList { + Expression value = 1; + repeated Expression options = 2; + } + + message MultiOrList { + repeated Expression value = 1; + repeated Record options = 2; + + message Record { repeated Expression fields = 1; } + } + + message EmbeddedFunction { + repeated Expression arguments = 1; + Type output_type = 2; + oneof kind { + PythonPickleFunction python_pickle_function = 3; + WebAssemblyFunction web_assembly_function = 4; + } + + message PythonPickleFunction { + bytes function = 1; + repeated string prerequisite = 2; + } + + message WebAssemblyFunction { + bytes script = 1; + repeated string prerequisite = 2; + } + } + + // A way to reference the inner property of a complex record. Can reference + // either a map key by literal, a struct field by the ordinal position of + // the desired field or a particular element in an array. Supports + // expressions that would roughly translate to something similar to: + // a.b[2].c['my_map_key'].x where a,b,c and x are struct field references + // (ordinalized in the internal representation here), [2] is a list offset + // and ['my_map_key'] is a reference into a map field. + message ReferenceSegment { + + oneof reference_type { + MapKey map_key = 1; + StructField struct_field = 2; + ListElement list_element = 3; + } + + message MapKey { + // literal based reference to specific possible value in map. + Literal map_key = 1; + + // Optional child segment + ReferenceSegment child = 2; + } + + message StructField { + // zero-indexed ordinal position of field in struct + int32 field = 1; + + // Optional child segment + ReferenceSegment child = 2; + } + + message ListElement { + // zero-indexed ordinal position of element in list + int32 offset = 1; + + // Optional child segment + ReferenceSegment child = 2; + } + } + + // A reference that takes an existing subtype and selectively removes fields + // from it. For example, one might initially have an inner struct with 100 + // fields but a a particular operation only needs to interact with only 2 of + // those 100 fields. In this situation, one would use a mask expression to + // eliminate the 98 fields that are not relevant to the rest of the operation + // pipeline. + // + // Note that this does not fundamentally alter the structure of data beyond + // the elimination of unnecessary elements. + message MaskExpression { + + StructSelect select = 1; + bool maintain_singular_struct = 2; + + message Select { + oneof type { + StructSelect struct = 1; + ListSelect list = 2; + MapSelect map = 3; + } + } + + message StructSelect { repeated StructItem struct_items = 1; } + + message StructItem { + int32 field = 1; + Select child = 2; + } + + message ListSelect { + + repeated ListSelectItem selection = 1; + Select child = 2; + + message ListSelectItem { + oneof type { + ListElement item = 1; + ListSlice slice = 2; + } + + message ListElement { int32 field = 1; } + + message ListSlice { + int32 start = 1; + int32 end = 2; + } + } + } + + message MapSelect { + oneof select { + MapKey key = 1; + MapKeyExpression expression = 2; + } + + Select child = 3; + + message MapKey { string map_key = 1; } + + message MapKeyExpression { string map_key_expression = 1; } + } + } + + // A reference to an inner part of a complex object. Can reference reference a + // single element or a masked version of elements + message FieldReference { + + // Whether this is composed of a single element reference or a masked + // element subtree + oneof reference_type { + ReferenceSegment direct_reference = 1; + MaskExpression masked_reference = 2; + } + + // Whether this reference has an origin of a root struct or is based on the + // output of an expression. When this is a RootReference and direct_reference + // above is used, the direct_reference must be of a type StructField. + oneof root_type { + Expression expression = 3; + RootReference root_reference = 4; + OuterReference outer_reference = 5; + } + + // Singleton that expresses this FieldReference is rooted off the root + // incoming record type + message RootReference {} + + // A root reference for the outer relation's subquery + message OuterReference { + // number of subquery boundaries to traverse up for this field's reference + // + // This value must be >= 1 + uint32 steps_out = 1; + } + } + + // Subquery relation expression + message Subquery { + oneof subquery_type { + // Scalar subquery + Scalar scalar = 1; + // x IN y predicate + InPredicate in_predicate = 2; + // EXISTS/UNIQUE predicate + SetPredicate set_predicate = 3; + // ANY/ALL predicate + SetComparison set_comparison = 4; + } + + // A subquery with one row and one column. This is often an aggregate + // though not required to be. + message Scalar { Rel input = 1; } + + // Predicate checking that the left expression is contained in the right + // subquery + // + // Examples: + // + // x IN (SELECT * FROM t) + // (x, y) IN (SELECT a, b FROM t) + message InPredicate { + repeated Expression needles = 1; + Rel haystack = 2; + } + + // A predicate over a set of rows in the form of a subquery + // EXISTS and UNIQUE are common SQL forms of this operation. + message SetPredicate { + enum PredicateOp { + PREDICATE_OP_UNSPECIFIED = 0; + PREDICATE_OP_EXISTS = 1; + PREDICATE_OP_UNIQUE = 2; + } + // TODO: should allow expressions + PredicateOp predicate_op = 1; + Rel tuples = 2; + } + + // A subquery comparison using ANY or ALL. + // Examples: + // + // SELECT * + // FROM t1 + // WHERE x < ANY(SELECT y from t2) + message SetComparison { + enum ComparisonOp { + COMPARISON_OP_UNSPECIFIED = 0; + COMPARISON_OP_EQ = 1; + COMPARISON_OP_NE = 2; + COMPARISON_OP_LT = 3; + COMPARISON_OP_GT = 4; + COMPARISON_OP_LE = 5; + COMPARISON_OP_GE = 6; + } + + enum ReductionOp { + REDUCTION_OP_UNSPECIFIED = 0; + REDUCTION_OP_ANY = 1; + REDUCTION_OP_ALL = 2; + } + + // ANY or ALL + ReductionOp reduction_op = 1; + // A comparison operator + ComparisonOp comparison_op = 2; + // left side of the expression + Expression left = 3; + // right side of the expression + Rel right = 4; + } + } +} + +message SortField { + Expression expr = 1; + + oneof sort_kind { + SortDirection direction = 2; + uint32 comparison_function_reference = 3; + } + enum SortDirection { + SORT_DIRECTION_UNSPECIFIED = 0; + SORT_DIRECTION_ASC_NULLS_FIRST = 1; + SORT_DIRECTION_ASC_NULLS_LAST = 2; + SORT_DIRECTION_DESC_NULLS_FIRST = 3; + SORT_DIRECTION_DESC_NULLS_LAST = 4; + SORT_DIRECTION_CLUSTERED = 5; + } +} + +enum AggregationPhase { + AGGREGATION_PHASE_UNSPECIFIED = 0; + AGGREGATION_PHASE_INITIAL_TO_INTERMEDIATE = 1; + AGGREGATION_PHASE_INTERMEDIATE_TO_INTERMEDIATE = 2; + AGGREGATION_PHASE_INITIAL_TO_RESULT = 3; + AGGREGATION_PHASE_INTERMEDIATE_TO_RESULT = 4; +} + +message AggregateFunction { + // points to a function_anchor defined in this plan + uint32 function_reference = 1; + repeated Expression args = 2; + repeated SortField sorts = 3; + AggregationPhase phase = 4; + Type output_type = 5; +} diff --git a/velox/substrait/proto/substrait/capabilities.proto b/velox/substrait/proto/substrait/capabilities.proto new file mode 100644 index 000000000000..711f0ccbba7d --- /dev/null +++ b/velox/substrait/proto/substrait/capabilities.proto @@ -0,0 +1,28 @@ +syntax = "proto3"; + +package substrait; + +option java_multiple_files = true; +option java_package = "io.substrait.proto"; +option csharp_namespace = "Substrait.Protobuf"; + +// Defines a set of Capabilities that a system (producer or consumer) supports. +message Capabilities { + + // List of Substrait versions this system supports + repeated string substrait_versions = 1; + + // list of com.google.Any message types this system supports for advanced + // extensions. + repeated string advanced_extension_type_urls = 2; + + // list of simple extensions this system supports. + repeated SimpleExtension simple_extensions = 3; + + message SimpleExtension { + string uri = 1; + repeated string function_keys = 2; + repeated string type_keys = 3; + repeated string type_variation_keys = 4; + } +} diff --git a/velox/substrait/proto/substrait/extensions/extensions.proto b/velox/substrait/proto/substrait/extensions/extensions.proto new file mode 100644 index 000000000000..3e8450b5c127 --- /dev/null +++ b/velox/substrait/proto/substrait/extensions/extensions.proto @@ -0,0 +1,81 @@ +syntax = "proto3"; + +package substrait.extensions; + +option java_multiple_files = true; +option java_package = "io.substrait.proto"; +option csharp_namespace = "Substrait.Protobuf"; + +import "google/protobuf/any.proto"; + +message SimpleExtensionURI { + // A surrogate key used in the context of a single plan used to reference the + // URI associated with an extension. + uint32 extension_uri_anchor = 1; + + // The URI where this extension YAML can be retrieved. This is the "namespace" + // of this extension. + string uri = 2; +} + +// Describes a mapping between a specific extension entity and the uri where +// that extension can be found. +message SimpleExtensionDeclaration { + + oneof mapping_type { + ExtensionType extension_type = 1; + ExtensionTypeVariation extension_type_variation = 2; + ExtensionFunction extension_function = 3; + } + + // Describes a Type + message ExtensionType { + // references the extension_uri_anchor defined for a specific extension URI. + uint32 extension_uri_reference = 1; + + // A surrogate key used in the context of a single plan to reference a + // specific extension type + uint32 type_anchor = 2; + + // the name of the type in the defined extension YAML. + string name = 3; + } + + message ExtensionTypeVariation { + // references the extension_uri_anchor defined for a specific extension URI. + uint32 extension_uri_reference = 1; + + // A surrogate key used in the context of a single plan to reference a + // specific type variation + uint32 type_variation_anchor = 2; + + // the name of the type in the defined extension YAML. + string name = 3; + } + + message ExtensionFunction { + // references the extension_uri_anchor defined for a specific extension URI. + uint32 extension_uri_reference = 1; + + // A surrogate key used in the context of a single plan to reference a + // specific function + uint32 function_anchor = 2; + + // A simple name if there is only one impl for the function within the YAML. + // A compound name, referencing that includes type short names if there is + // more than one impl per name in the YAML. + string name = 3; + } +} + +// A generic object that can be used to embed additional extension information +// into the serialized substrait plan. +message AdvancedExtension { + + // An optimization is helpful information that don't influence semantics. May + // be ignored by a consumer. + google.protobuf.Any optimization = 1; + + // An enhancement alter semantics. Cannot be ignored by a consumer. + google.protobuf.Any enhancement = 2; +} diff --git a/velox/substrait/proto/substrait/function.proto b/velox/substrait/proto/substrait/function.proto new file mode 100644 index 000000000000..0cf5763de2b5 --- /dev/null +++ b/velox/substrait/proto/substrait/function.proto @@ -0,0 +1,146 @@ +syntax = "proto3"; + +package substrait; + +import "velox/substrait/proto/substrait/type.proto"; +import "velox/substrait/proto/substrait/parameterized_types.proto"; +import "velox/substrait/proto/substrait/type_expressions.proto"; + +option java_multiple_files = true; +option java_package = "io.substrait.proto"; +option csharp_namespace = "Substrait.Protobuf"; + +// List of function signatures available. +message FunctionSignature { + + message FinalArgVariadic { + // the minimum number of arguments allowed for the list of final arguments + // (inclusive). + int64 min_args = 1; + + // the maximum number of arguments allowed for the list of final arguments + // (exclusive) + int64 max_args = 2; + + // the type of parameterized type consistency + ParameterConsistency consistency = 3; + + enum ParameterConsistency { + PARAMETER_CONSISTENCY_UNSPECIFIED = 0; + + // All argument must be the same concrete type. + PARAMETER_CONSISTENCY_CONSISTENT = 1; + + // Each argument can be any possible concrete type afforded by the bounds + // of any parameter defined in the arguments specification. + PARAMETER_CONSISTENCY_INCONSISTENT = 2; + } + } + + message FinalArgNormal {} + + message Scalar { + repeated Argument arguments = 2; + repeated string name = 3; + Description description = 4; + + bool deterministic = 7; + bool session_dependent = 8; + + DerivationExpression output_type = 9; + + oneof final_variable_behavior { + FinalArgVariadic variadic = 10; + FinalArgNormal normal = 11; + } + + repeated Implementation implementations = 12; + } + + message Aggregate { + repeated Argument arguments = 2; + string name = 3; + Description description = 4; + + bool deterministic = 7; + bool session_dependent = 8; + + DerivationExpression output_type = 9; + + oneof final_variable_behavior { + FinalArgVariadic variadic = 10; + FinalArgNormal normal = 11; + } + + bool ordered = 14; + uint64 max_set = 12; + Type intermediate_type = 13; + + repeated Implementation implementations = 15; + } + + message Window { + repeated Argument arguments = 2; + repeated string name = 3; + Description description = 4; + + bool deterministic = 7; + bool session_dependent = 8; + + DerivationExpression intermediate_type = 9; + DerivationExpression output_type = 10; + oneof final_variable_behavior { + FinalArgVariadic variadic = 16; + FinalArgNormal normal = 17; + } + bool ordered = 11; + uint64 max_set = 12; + WindowType window_type = 14; + repeated Implementation implementations = 15; + + enum WindowType { + WINDOW_TYPE_UNSPECIFIED = 0; + WINDOW_TYPE_STREAMING = 1; + WINDOW_TYPE_PARTITION = 2; + } + } + + message Description { + string language = 1; + string body = 2; + } + + message Implementation { + + Type type = 1; + string uri = 2; + + enum Type { + TYPE_UNSPECIFIED = 0; + TYPE_WEB_ASSEMBLY = 1; + TYPE_TRINO_JAR = 2; + } + } + + message Argument { + string name = 1; + + oneof argument_kind { + ValueArgument value = 2; + TypeArgument type = 3; + EnumArgument enum = 4; + } + + message ValueArgument { + ParameterizedType type = 1; + bool constant = 2; + } + + message TypeArgument { ParameterizedType type = 1; } + + message EnumArgument { + repeated string options = 1; + bool optional = 2; + } + } +} diff --git a/velox/substrait/proto/substrait/parameterized_types.proto b/velox/substrait/proto/substrait/parameterized_types.proto new file mode 100644 index 000000000000..d886ae27a056 --- /dev/null +++ b/velox/substrait/proto/substrait/parameterized_types.proto @@ -0,0 +1,113 @@ +syntax = "proto3"; +package substrait; + +import "velox/substrait/proto/substrait/type.proto"; + +option java_multiple_files = true; +option java_package = "io.substrait.proto"; +option csharp_namespace = "Substrait.Protobuf"; + +message ParameterizedType { + + oneof kind { + Type.Boolean bool = 1; + Type.I8 i8 = 2; + Type.I16 i16 = 3; + Type.I32 i32 = 5; + Type.I64 i64 = 7; + Type.FP32 fp32 = 10; + Type.FP64 fp64 = 11; + Type.String string = 12; + Type.Binary binary = 13; + Type.Timestamp timestamp = 14; + Type.Date date = 16; + Type.Time time = 17; + Type.IntervalYear interval_year = 19; + Type.IntervalDay interval_day = 20; + Type.TimestampTZ timestamp_tz = 29; + Type.UUID uuid = 32; + + ParameterizedFixedChar fixed_char = 21; + ParameterizedVarChar varchar = 22; + ParameterizedFixedBinary fixed_binary = 23; + ParameterizedDecimal decimal = 24; + + ParameterizedStruct struct = 25; + ParameterizedList list = 27; + ParameterizedMap map = 28; + + uint32 user_defined_pointer = 31; + + TypeParameter type_parameter = 33; + } + + message TypeParameter { + string name = 1; + repeated ParameterizedType bounds = 2; + } + + message IntegerParameter { + string name = 1; + NullableInteger range_start_inclusive = 2; + NullableInteger range_end_exclusive = 3; + } + + message NullableInteger { int64 value = 1; } + + message ParameterizedFixedChar { + IntegerOption length = 1; + uint32 variation_pointer = 2; + Type.Nullability nullability = 3; + } + + message ParameterizedVarChar { + IntegerOption length = 1; + uint32 variation_pointer = 2; + Type.Nullability nullability = 3; + } + + message ParameterizedFixedBinary { + IntegerOption length = 1; + uint32 variation_pointer = 2; + Type.Nullability nullability = 3; + } + + message ParameterizedDecimal { + IntegerOption scale = 1; + IntegerOption precision = 2; + uint32 variation_pointer = 3; + Type.Nullability nullability = 4; + } + + message ParameterizedStruct { + repeated ParameterizedType types = 1; + uint32 variation_pointer = 2; + Type.Nullability nullability = 3; + } + + message ParameterizedNamedStruct { + // list of names in dfs order + repeated string names = 1; + ParameterizedStruct struct = 2; + } + + message ParameterizedList { + ParameterizedType type = 1; + uint32 variation_pointer = 2; + Type.Nullability nullability = 3; + } + + message ParameterizedMap { + ParameterizedType key = 1; + ParameterizedType value = 2; + uint32 variation_pointer = 3; + Type.Nullability nullability = 4; + } + + message IntegerOption { + oneof integer_type { + int32 literal = 1; + IntegerParameter parameter = 2; + } + } +} diff --git a/velox/substrait/proto/substrait/plan.proto b/velox/substrait/proto/substrait/plan.proto new file mode 100644 index 000000000000..f5c5f97a8144 --- /dev/null +++ b/velox/substrait/proto/substrait/plan.proto @@ -0,0 +1,44 @@ +syntax = "proto3"; + +package substrait; + +import "velox/substrait/proto/substrait/algebra.proto"; +import "velox/substrait/proto/substrait/extensions/extensions.proto"; + +option java_multiple_files = true; +option java_package = "io.substrait.proto"; +option csharp_namespace = "Substrait.Protobuf"; + +// Either a relation or root relation +message PlanRel { + oneof rel_type { + // Any relation + Rel rel = 1; + // The root of a relation tree + RelRoot root = 2; + } +} + +// Describe a set of operations to complete. +// For compactness sake, identifiers are normalized at the plan level. +message Plan { + + // a list of yaml specifications this plan may depend on + repeated substrait.extensions.SimpleExtensionURI extension_uris = 1; + + // a list of extensions this plan may depend on + repeated substrait.extensions.SimpleExtensionDeclaration extensions = 2; + + // one or more relation trees that are associated with this plan. + repeated PlanRel relations = 3; + + // additional extensions associated with this plan. + substrait.extensions.AdvancedExtension advanced_extensions = 4; + + // A list of com.google.Any entities that this plan may use. Can be used to + // warn if some embedded message types are unknown. Note that this list may + // include message types that are ignorable (optimizations) or that are + // unused. In many cases, a consumer may be able to work with a plan even if + // one or more message types defined here are unknown. + repeated string expected_type_urls = 5; +} diff --git a/velox/substrait/proto/substrait/type.proto b/velox/substrait/proto/substrait/type.proto new file mode 100644 index 000000000000..a1c56caf084b --- /dev/null +++ b/velox/substrait/proto/substrait/type.proto @@ -0,0 +1,194 @@ +syntax = "proto3"; +package substrait; + +option java_multiple_files = true; +option java_package = "io.substrait.proto"; +option csharp_namespace = "Substrait.Protobuf"; + +message Type { + + oneof kind { + Boolean bool = 1; + I8 i8 = 2; + I16 i16 = 3; + I32 i32 = 5; + I64 i64 = 7; + FP32 fp32 = 10; + FP64 fp64 = 11; + String string = 12; + Binary binary = 13; + Timestamp timestamp = 14; + Date date = 16; + Time time = 17; + IntervalYear interval_year = 19; + IntervalDay interval_day = 20; + TimestampTZ timestamp_tz = 29; + UUID uuid = 32; + + FixedChar fixed_char = 21; + VarChar varchar = 22; + FixedBinary fixed_binary = 23; + Decimal decimal = 24; + + Struct struct = 25; + List list = 27; + Map map = 28; + + uint32 user_defined_type_reference = 31; + } + + enum Nullability { + NULLABILITY_UNSPECIFIED = 0; + NULLABILITY_NULLABLE = 1; + NULLABILITY_REQUIRED = 2; + } + + message Boolean { + uint32 type_variation_reference = 1; + Nullability nullability = 2; + } + message I8 { + uint32 type_variation_reference = 1; + Nullability nullability = 2; + } + + message I16 { + uint32 type_variation_reference = 1; + Nullability nullability = 2; + } + + message I32 { + uint32 type_variation_reference = 1; + Nullability nullability = 2; + } + + message I64 { + uint32 type_variation_reference = 1; + Nullability nullability = 2; + } + + message FP32 { + uint32 type_variation_reference = 1; + Nullability nullability = 2; + } + + message FP64 { + uint32 type_variation_reference = 1; + Nullability nullability = 2; + } + + message String { + uint32 type_variation_reference = 1; + Nullability nullability = 2; + } + + message Binary { + uint32 type_variation_reference = 1; + Nullability nullability = 2; + } + + message Timestamp { + uint32 type_variation_reference = 1; + Nullability nullability = 2; + } + + message Date { + uint32 type_variation_reference = 1; + Nullability nullability = 2; + } + + message Time { + uint32 type_variation_reference = 1; + Nullability nullability = 2; + } + + message TimestampTZ { + uint32 type_variation_reference = 1; + Nullability nullability = 2; + } + + message IntervalYear { + uint32 type_variation_reference = 1; + Nullability nullability = 2; + } + + message IntervalDay { + uint32 type_variation_reference = 1; + Nullability nullability = 2; + } + + message UUID { + uint32 type_variation_reference = 1; + Nullability nullability = 2; + } + + // Start compound types. + message FixedChar { + int32 length = 1; + uint32 type_variation_reference = 2; + Nullability nullability = 3; + } + + message VarChar { + int32 length = 1; + uint32 type_variation_reference = 2; + Nullability nullability = 3; + } + + message FixedBinary { + int32 length = 1; + uint32 type_variation_reference = 2; + Nullability nullability = 3; + } + + message Decimal { + int32 scale = 1; + int32 precision = 2; + uint32 type_variation_reference = 3; + Nullability nullability = 4; + } + + message Struct { + repeated Type types = 1; + uint32 type_variation_reference = 2; + Nullability nullability = 3; + } + + message List { + Type type = 1; + uint32 type_variation_reference = 2; + Nullability nullability = 3; + } + + message Map { + Type key = 1; + Type value = 2; + uint32 type_variation_reference = 3; + Nullability nullability = 4; + } +} + +// A message for modeling name/type pairs. +// +// Useful for representing relation schemas. +// +// Notes: +// +// * The names field is in depth-first order. +// +// For example a schema such as: +// +// a: int64 +// b: struct +// +// would have a `names` field that looks like: +// +// ["a", "b", "c", "d"] +// +// * Only struct fields are contained in this field's elements, +// * Map keys should be traversed first, then values when producing/consuming +message NamedStruct { + // list of names in dfs order + repeated string names = 1; + Type.Struct struct = 2; +} diff --git a/velox/substrait/proto/substrait/type_expressions.proto b/velox/substrait/proto/substrait/type_expressions.proto new file mode 100644 index 000000000000..798cc557acbc --- /dev/null +++ b/velox/substrait/proto/substrait/type_expressions.proto @@ -0,0 +1,148 @@ +syntax = "proto3"; +package substrait; + +import "velox/substrait/proto/substrait/type.proto"; + +option java_multiple_files = true; +option java_package = "io.substrait.proto"; +option csharp_namespace = "Substrait.Protobuf"; + +message DerivationExpression { + + oneof kind { + Type.Boolean bool = 1; + Type.I8 i8 = 2; + Type.I16 i16 = 3; + Type.I32 i32 = 5; + Type.I64 i64 = 7; + Type.FP32 fp32 = 10; + Type.FP64 fp64 = 11; + Type.String string = 12; + Type.Binary binary = 13; + Type.Timestamp timestamp = 14; + Type.Date date = 16; + Type.Time time = 17; + Type.IntervalYear interval_year = 19; + Type.IntervalDay interval_day = 20; + Type.TimestampTZ timestamp_tz = 29; + Type.UUID uuid = 32; + + ExpressionFixedChar fixed_char = 21; + ExpressionVarChar varchar = 22; + ExpressionFixedBinary fixed_binary = 23; + ExpressionDecimal decimal = 24; + + ExpressionStruct struct = 25; + ExpressionList list = 27; + ExpressionMap map = 28; + + uint32 user_defined_pointer = 31; + + string type_parameter_name = 33; + string integer_parameter_name = 34; + + int32 integer_literal = 35; + UnaryOp unary_op = 36; + BinaryOp binary_op = 37; + IfElse if_else = 38; + ReturnProgram return_program = 39; + } + + message ExpressionFixedChar { + DerivationExpression length = 1; + uint32 variation_pointer = 2; + Type.Nullability nullability = 3; + } + + message ExpressionVarChar { + DerivationExpression length = 1; + uint32 variation_pointer = 2; + Type.Nullability nullability = 3; + } + + message ExpressionFixedBinary { + DerivationExpression length = 1; + uint32 variation_pointer = 2; + Type.Nullability nullability = 3; + } + + message ExpressionDecimal { + DerivationExpression scale = 1; + DerivationExpression precision = 2; + uint32 variation_pointer = 3; + Type.Nullability nullability = 4; + } + + message ExpressionStruct { + repeated DerivationExpression types = 1; + uint32 variation_pointer = 2; + Type.Nullability nullability = 3; + } + + message ExpressionNamedStruct { + repeated string names = 1; + ExpressionStruct struct = 2; + } + + message ExpressionList { + DerivationExpression type = 1; + uint32 variation_pointer = 2; + Type.Nullability nullability = 3; + } + + message ExpressionMap { + DerivationExpression key = 1; + DerivationExpression value = 2; + uint32 variation_pointer = 3; + Type.Nullability nullability = 4; + } + + message IfElse { + DerivationExpression if_condition = 1; + DerivationExpression if_return = 2; + DerivationExpression else_return = 3; + } + + message UnaryOp { + UnaryOpType op_type = 1; + DerivationExpression arg = 2; + + enum UnaryOpType { + UNARY_OP_TYPE_UNSPECIFIED = 0; + UNARY_OP_TYPE_BOOLEAN_NOT = 1; + } + } + + message BinaryOp { + + BinaryOpType op_type = 1; + DerivationExpression arg1 = 2; + DerivationExpression arg2 = 3; + + enum BinaryOpType { + BINARY_OP_TYPE_UNSPECIFIED = 0; + BINARY_OP_TYPE_PLUS = 1; + BINARY_OP_TYPE_MINUS = 2; + BINARY_OP_TYPE_MULTIPLY = 3; + BINARY_OP_TYPE_DIVIDE = 4; + BINARY_OP_TYPE_MIN = 5; + BINARY_OP_TYPE_MAX = 6; + BINARY_OP_TYPE_GREATER_THAN = 7; + BINARY_OP_TYPE_LESS_THAN = 8; + BINARY_OP_TYPE_AND = 9; + BINARY_OP_TYPE_OR = 10; + BINARY_OP_TYPE_EQUALS = 11; + BINARY_OP_TYPE_COVERS = 12; + } + } + + message ReturnProgram { + message Assignment { + string name = 1; + DerivationExpression expression = 2; + } + + repeated Assignment assignments = 1; + DerivationExpression final_expression = 2; + } +} diff --git a/velox/substrait/tests/CMakeLists.txt b/velox/substrait/tests/CMakeLists.txt new file mode 100644 index 000000000000..77569a115a93 --- /dev/null +++ b/velox/substrait/tests/CMakeLists.txt @@ -0,0 +1,55 @@ +# Copyright (c) Facebook, Inc. and its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +add_executable(velox_plan_conversion_test PlanConversionTest.cpp) +add_dependencies(velox_plan_conversion_test velox_substrait_plan_converter) + +add_test( + NAME velox_plan_conversion_test + COMMAND velox_plan_conversion_test + WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}) + +target_link_libraries( + velox_plan_conversion_test + velox_substrait_plan_converter + velox_functions_test_lib + velox_exec + velox_dwio_common + velox_aggregates + velox_aggregates_test_lib + velox_functions_lib + velox_functions_prestosql + velox_hive_connector + velox_type + velox_serialization + velox_exec_test_util + velox_functions_json + velox_vector + velox_memory + velox_dwio_common_exception + ${Boost_ATOMIC_LIBRARIES} + ${Boost_CONTEXT_LIBRARIES} + ${Boost_DATE_TIME_LIBRARIES} + ${Boost_FILESYSTEM_LIBRARIES} + ${Boost_PROGRAM_OPTIONS_LIBRARIES} + ${Boost_REGEX_LIBRARIES} + ${Boost_THREAD_LIBRARIES} + ${Boost_SYSTEM_LIBRARIES} + ${GTEST_BOTH_LIBRARIES} + ${FOLLY_WITH_DEPENDENCIES} + ${DOUBLE_CONVERSION} + ${gflags_LIBRARIES} + glog::glog + ${FMT} + ${FILESYSTEM}) diff --git a/velox/substrait/tests/PlanConversionTest.cpp b/velox/substrait/tests/PlanConversionTest.cpp new file mode 100644 index 000000000000..0215fc7732d5 --- /dev/null +++ b/velox/substrait/tests/PlanConversionTest.cpp @@ -0,0 +1,518 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/common/base/test_utils/GTestUtils.h" +#include "velox/connectors/hive/HiveConnector.h" +#include "velox/connectors/hive/HiveConnectorSplit.h" +#include "velox/dwio/common/Options.h" +#include "velox/dwio/dwrf/test/utils/DataFiles.h" +#include "velox/exec/PartitionedOutputBufferManager.h" +#include "velox/exec/tests/utils/Cursor.h" +#include "velox/exec/tests/utils/HiveConnectorTestBase.h" +#include "velox/exec/tests/utils/PlanBuilder.h" +#include "velox/substrait/SubstraitToVeloxPlan.h" +#include "velox/type/Type.h" +#include "velox/type/tests/FilterBuilder.h" +#include "velox/type/tests/SubfieldFiltersBuilder.h" + +#include + +#include +#include +#include + +#if __has_include("filesystem") +#include +namespace fs = std::filesystem; +#else +#include +namespace fs = std::experimental::filesystem; +#endif +using namespace facebook::velox; +using namespace facebook::velox::connector::hive; +using namespace facebook::velox::exec; +using namespace facebook::velox::common::test; +using namespace facebook::velox::exec::test; + +class PlanConversionTest : public virtual HiveConnectorTestBase, + public testing::WithParamInterface { + protected: + void SetUp() override { + useAsyncCache_ = GetParam(); + HiveConnectorTestBase::SetUp(); + } + + static void SetUpTestCase() { + HiveConnectorTestBase::SetUpTestCase(); + } + + std::vector makeVectors( + int32_t count, + int32_t rowsPerVector, + const std::shared_ptr& rowType) { + return HiveConnectorTestBase::makeVectors(rowType, count, rowsPerVector); + } + + class VeloxConverter { + public: + // This class is an iterator for Velox computing. + class WholeComputeResultIterator { + public: + WholeComputeResultIterator( + const std::shared_ptr& planNode, + u_int32_t index, + const std::vector& paths, + const std::vector& starts, + const std::vector& lengths) + : planNode_(planNode), + index_(index), + paths_(paths), + starts_(starts), + lengths_(lengths) { + // Construct the splits. + std::vector> + connectorSplits; + connectorSplits.reserve(paths.size()); + for (int idx = 0; idx < paths.size(); idx++) { + auto path = paths[idx]; + auto start = starts[idx]; + auto length = lengths[idx]; + auto split = std::make_shared< + facebook::velox::connector::hive::HiveConnectorSplit>( + facebook::velox::exec::test::kHiveConnectorId, + path, + facebook::velox::dwio::common::FileFormat::ORC, + start, + length); + connectorSplits.emplace_back(split); + } + splits_.reserve(connectorSplits.size()); + for (const auto& connectorSplit : connectorSplits) { + splits_.emplace_back(exec::Split(folly::copy(connectorSplit), -1)); + } + + params_.planNode = planNode; + cursor_ = std::make_unique(params_); + addSplits_ = [&](Task* task) { + if (noMoreSplits_) { + return; + } + for (auto& split : splits_) { + task->addSplit("0", std::move(split)); + } + task->noMoreSplits("0"); + noMoreSplits_ = true; + }; + } + + bool HasNext() { + if (!mayHasNext_) { + return false; + } + if (numRows_ > 0) { + return true; + } else { + addSplits_(cursor_->task().get()); + if (cursor_->moveNext()) { + result_ = cursor_->current(); + numRows_ += result_->size(); + return true; + } else { + mayHasNext_ = false; + return false; + } + } + } + + RowVectorPtr Next() { + numRows_ = 0; + return result_; + } + + private: + const std::shared_ptr planNode_; + std::unique_ptr cursor_; + exec::test::CursorParameters params_; + std::vector splits_; + bool noMoreSplits_ = false; + std::function addSplits_; + u_int32_t index_; + std::vector paths_; + std::vector starts_; + std::vector lengths_; + uint64_t numRows_ = 0; + bool mayHasNext_ = true; + RowVectorPtr result_; + }; + + // This method will resume the Substrait plan from Json file, + // and convert it into Velox PlanNode. A result iterator for + // Velox computing will be returned. + std::shared_ptr getResIter( + const std::string& subPlanPath) { + // Read sub.json and resume the Substrait plan. + std::ifstream subJson(subPlanPath); + std::stringstream buffer; + buffer << subJson.rdbuf(); + std::string subData = buffer.str(); + ::substrait::Plan subPlan; + google::protobuf::util::JsonStringToMessage(subData, &subPlan); + + auto planConverter = std::make_shared< + facebook::velox::substrait::SubstraitVeloxPlanConverter>(); + // Convert to Velox PlanNode. + auto planNode = planConverter->toVeloxPlan(subPlan); + + // Get the information for TableScan. + u_int32_t partitionIndex = planConverter->getPartitionIndex(); + std::vector paths = planConverter->getPaths(); + + // In test, need to get the absolute path of the generated ORC file. + std::vector absolutePaths; + absolutePaths.reserve(paths.size()); + for (auto path : paths) { + std::string currentPath = fs::current_path().c_str(); + std::string absolutePath = "file://" + currentPath + path; + absolutePaths.emplace_back(absolutePath); + } + + std::vector starts = planConverter->getStarts(); + std::vector lengths = planConverter->getLengths(); + // Construct the result iterator. + auto resIter = std::make_shared( + planNode, partitionIndex, absolutePaths, starts, lengths); + return resIter; + } + }; + + // This method can be used to create a Fixed-width type of Vector without Null + // values. + template + VectorPtr createSpecificScalar( + size_t size, + std::vector vals, + facebook::velox::memory::MemoryPool& pool) { + facebook::velox::BufferPtr values = AlignedBuffer::allocate(size, &pool); + auto valuesPtr = values->asMutableRange(); + facebook::velox::BufferPtr nulls = nullptr; + for (size_t i = 0; i < size; ++i) { + valuesPtr[i] = vals[i]; + } + return std::make_shared>( + &pool, nulls, size, values, std::vector{}); + } + + // This method can be used to create a String type of Vector without Null + // values. + VectorPtr createSpecificStringVector( + size_t size, + std::vector vals, + facebook::velox::memory::MemoryPool& pool) { + auto vector = BaseVector::create(VARCHAR(), size, &pool); + auto flatVector = vector->asFlatVector(); + + size_t childSize = 0; + std::vector lengths(size); + size_t nullCount = 0; + for (size_t i = 0; i < size; ++i) { + auto notNull = true; + vector->setNull(i, !notNull); + auto len = vals[i].size(); + lengths[i] = len; + childSize += len; + } + vector->setNullCount(0); + + BufferPtr buf = AlignedBuffer::allocate(childSize, &pool); + char* bufPtr = buf->asMutable(); + char* dest = bufPtr; + for (size_t i = 0; i < size; ++i) { + std::string str = vals[i]; + const char* chr = str.c_str(); + auto length = str.size(); + memcpy(dest, chr, length); + dest = dest + length; + } + size_t offset = 0; + for (size_t i = 0; i < size; ++i) { + if (!vector->isNullAt(i)) { + flatVector->set( + i, facebook::velox::StringView(bufPtr + offset, lengths[i])); + offset += lengths[i]; + } + } + return vector; + } +}; + +// This test will firstly generate mock TPC-H lineitem ORC file. Then, Velox's +// computing will be tested based on the generated ORC file. +// Input: Json file of the Substrait plan for the below modified TPC-H Q6 query: +// +// select sum(l_extendedprice*l_discount) as revenue from lineitem where +// l_shipdate_new >= 8766 and l_shipdate_new < 9131 and l_discount between .06 +// - 0.01 and .06 + 0.01 and l_quantity < 24 +// +// Tested Velox computings include: TableScan (Filter Pushdown) + Project + +// Aggregate +// Output: the Velox computed Aggregation result + +TEST_P(PlanConversionTest, queryTest) { + // Generate the used ORC file. + auto type = + ROW({"l_orderkey", + "l_partkey", + "l_suppkey", + "l_linenumber", + "l_quantity", + "l_extendedprice", + "l_discount", + "l_tax", + "l_returnflag", + "l_linestatus", + "l_shipdate_new", + "l_commitdate_new", + "l_receiptdate_new", + "l_shipinstruct", + "l_shipmode", + "l_comment"}, + {BIGINT(), + BIGINT(), + BIGINT(), + INTEGER(), + DOUBLE(), + DOUBLE(), + DOUBLE(), + DOUBLE(), + VARCHAR(), + VARCHAR(), + DOUBLE(), + DOUBLE(), + DOUBLE(), + VARCHAR(), + VARCHAR(), + VARCHAR()}); + std::unique_ptr pool{ + facebook::velox::memory::getDefaultScopedMemoryPool()}; + std::vector vectors; + // TPC-H lineitem table has 16 columns. + int colNum = 16; + vectors.reserve(colNum); + std::vector lOrderkeyData = { + 4636438147, + 2012485446, + 1635327427, + 8374290148, + 2972204230, + 8001568994, + 989963396, + 2142695974, + 6354246853, + 4141748419}; + vectors.emplace_back(createSpecificScalar(10, lOrderkeyData, *pool)); + std::vector lPartkeyData = { + 263222018, + 255918298, + 143549509, + 96877642, + 201976875, + 196938305, + 100260625, + 273511608, + 112999357, + 299103530}; + vectors.emplace_back(createSpecificScalar(10, lPartkeyData, *pool)); + std::vector lSuppkeyData = { + 2102019, + 13998315, + 12989528, + 4717643, + 9976902, + 12618306, + 11940632, + 871626, + 1639379, + 3423588}; + vectors.emplace_back(createSpecificScalar(10, lSuppkeyData, *pool)); + std::vector lLinenumberData = {4, 6, 1, 5, 1, 2, 1, 5, 2, 6}; + vectors.emplace_back( + createSpecificScalar(10, lLinenumberData, *pool)); + std::vector lQuantityData = { + 6.0, 1.0, 19.0, 4.0, 6.0, 12.0, 23.0, 11.0, 16.0, 19.0}; + vectors.emplace_back(createSpecificScalar(10, lQuantityData, *pool)); + std::vector lExtendedpriceData = { + 30586.05, + 7821.0, + 1551.33, + 30681.2, + 1941.78, + 66673.0, + 6322.44, + 41754.18, + 8704.26, + 63780.36}; + vectors.emplace_back( + createSpecificScalar(10, lExtendedpriceData, *pool)); + std::vector lDiscountData = { + 0.05, 0.06, 0.01, 0.07, 0.05, 0.06, 0.07, 0.05, 0.06, 0.07}; + vectors.emplace_back(createSpecificScalar(10, lDiscountData, *pool)); + std::vector lTaxData = { + 0.02, 0.03, 0.01, 0.0, 0.01, 0.01, 0.03, 0.07, 0.01, 0.04}; + vectors.emplace_back(createSpecificScalar(10, lTaxData, *pool)); + std::vector lReturnflagData = { + "N", "A", "A", "R", "A", "N", "A", "A", "N", "R"}; + vectors.emplace_back(createSpecificStringVector(10, lReturnflagData, *pool)); + std::vector lLinestatusData = { + "O", "F", "F", "F", "F", "O", "F", "F", "O", "F"}; + vectors.emplace_back(createSpecificStringVector(10, lLinestatusData, *pool)); + std::vector lShipdateNewData = { + 8953.666666666666, + 8773.666666666666, + 9034.666666666666, + 8558.666666666666, + 9072.666666666666, + 8864.666666666666, + 9004.666666666666, + 8778.666666666666, + 9013.666666666666, + 8832.666666666666}; + vectors.emplace_back( + createSpecificScalar(10, lShipdateNewData, *pool)); + std::vector lCommitdateNewData = { + 10447.666666666666, + 8953.666666666666, + 8325.666666666666, + 8527.666666666666, + 8438.666666666666, + 10049.666666666666, + 9036.666666666666, + 8666.666666666666, + 9519.666666666666, + 9138.666666666666}; + vectors.emplace_back( + createSpecificScalar(10, lCommitdateNewData, *pool)); + std::vector lReceiptdateNewData = { + 10456.666666666666, + 8979.666666666666, + 8299.666666666666, + 8474.666666666666, + 8525.666666666666, + 9996.666666666666, + 9103.666666666666, + 8726.666666666666, + 9593.666666666666, + 9178.666666666666}; + vectors.emplace_back( + createSpecificScalar(10, lReceiptdateNewData, *pool)); + std::vector lShipinstructData = { + "COLLECT COD", + "NONE", + "TAKE BACK RETURN", + "NONE", + "TAKE BACK RETURN", + "NONE", + "DELIVER IN PERSON", + "DELIVER IN PERSON", + "TAKE BACK RETURN", + "NONE"}; + vectors.emplace_back( + createSpecificStringVector(10, lShipinstructData, *pool)); + std::vector lShipmodeData = { + "FOB", + "REG AIR", + "MAIL", + "FOB", + "RAIL", + "SHIP", + "REG AIR", + "REG AIR", + "TRUCK", + "AIR"}; + vectors.emplace_back(createSpecificStringVector(10, lShipmodeData, *pool)); + std::vector lCommentData = { + " the furiously final foxes. quickly final p", + "thely ironic", + "ate furiously. even, pending pinto bean", + "ackages af", + "odolites. slyl", + "ng the regular requests sleep above", + "lets above the slyly ironic theodolites sl", + "lyly regular excuses affi", + "lly unusual theodolites grow slyly above", + " the quickly ironic pains lose car"}; + vectors.emplace_back(createSpecificStringVector(10, lCommentData, *pool)); + + BufferPtr nulls = nullptr; + uint64_t nullCount = 0; + RowVectorPtr rv = std::make_shared( + &(*pool), type, nulls, 10, vectors, nullCount); + std::vector batches; + // Batches has only one RowVector here. + batches.reserve(1); + batches.emplace_back(rv); + + // Will write the data into an ORC file. + std::string currentPath = fs::current_path().c_str(); + auto sink = std::make_unique( + currentPath + "/mock_lineitem.orc"); + auto config = std::make_shared(); + const int64_t writerMemoryCap = std::numeric_limits::max(); + facebook::velox::dwrf::WriterOptions options; + options.config = config; + options.schema = type; + options.memoryBudget = writerMemoryCap; + options.flushPolicy = nullptr; + options.layoutPlannerFactory = nullptr; + auto writer = std::make_unique( + options, + std::move(sink), + facebook::velox::memory::getProcessDefaultMemoryManager().getRoot()); + for (size_t i = 0; i < batches.size(); ++i) { + writer->write(batches[i]); + } + writer->close(); + + // Find the Velox path according current path. + std::string veloxPath; + size_t pos = 0; + if ((pos = currentPath.find("project")) != std::string::npos) { + // In Github test, the Velox home is /root/project. + veloxPath = currentPath.substr(0, pos) + "project"; + } else if ((pos = currentPath.find("velox")) != std::string::npos) { + veloxPath = currentPath.substr(0, pos) + "velox"; + } else if ((pos = currentPath.find("fbcode")) != std::string::npos) { + veloxPath = currentPath; + } else { + throw std::runtime_error("Current path is not a valid Velox path."); + } + + // Begin to trigger Velox's computing with the Substrait plan. + std::string subPlanPath = veloxPath + "/velox/substrait/tests/sub.json"; + auto veloxConverter = std::make_shared(); + auto resIter = veloxConverter->getResIter(subPlanPath); + while (resIter->HasNext()) { + auto rv = resIter->Next(); + auto size = rv->size(); + ASSERT_EQ(size, 1); + std::string res = rv->toString(0); + ASSERT_EQ(res, "{ [child at 0]: 13613.1921}"); + } +} + +VELOX_INSTANTIATE_TEST_SUITE_P( + PlanConversionTests, + PlanConversionTest, + testing::Values(true, false)); diff --git a/velox/substrait/tests/sub.json b/velox/substrait/tests/sub.json new file mode 100644 index 000000000000..b0ff1faf4f56 --- /dev/null +++ b/velox/substrait/tests/sub.json @@ -0,0 +1,806 @@ +{ + "extension_uris": [], + "extensions": [ + { + "extension_function": { + "extension_uri_reference": 0, + "function_anchor": 4, + "name": "lte:fp64_fp64" + } + }, + { + "extension_function": { + "extension_uri_reference": 0, + "function_anchor": 5, + "name": "sum:opt_fp64" + } + }, + { + "extension_function": { + "extension_uri_reference": 0, + "function_anchor": 3, + "name": "lt:fp64_fp64" + } + }, + { + "extension_function": { + "extension_uri_reference": 0, + "function_anchor": 0, + "name": "is_not_null:fp64" + } + }, + { + "extension_function": { + "extension_uri_reference": 0, + "function_anchor": 1, + "name": "and:bool_bool" + } + }, + { + "extension_function": { + "extension_uri_reference": 0, + "function_anchor": 2, + "name": "gte:fp64_fp64" + } + }, + { + "extension_function": { + "extension_uri_reference": 0, + "function_anchor": 6, + "name": "multiply:opt_fp64_fp64" + } + } + ], + "relations": [ + { + "root": { + "input": { + "aggregate": { + "common": { + "direct": {} + }, + "input": { + "project": { + "common": { + "direct": {} + }, + "input": { + "filter": { + "common": { + "direct": {} + }, + "input": { + "read": { + "common": { + "direct": {} + }, + "base_schema": { + "names": [ + "l_quantity", + "l_extendedprice", + "l_discount", + "l_shipdate_new" + ], + "struct": { + "types": [ + { + "fp64": { + "type_variation_reference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "fp64": { + "type_variation_reference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "fp64": { + "type_variation_reference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "fp64": { + "type_variation_reference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + } + ], + "type_variation_reference": 0, + "nullability": "NULLABILITY_UNSPECIFIED" + } + }, + "filter": { + "scalar_function": { + "function_reference": 1, + "args": [ + { + "scalar_function": { + "function_reference": 1, + "args": [ + { + "scalar_function": { + "function_reference": 1, + "args": [ + { + "scalar_function": { + "function_reference": 1, + "args": [ + { + "scalar_function": { + "function_reference": 1, + "args": [ + { + "scalar_function": { + "function_reference": 1, + "args": [ + { + "scalar_function": { + "function_reference": 1, + "args": [ + { + "scalar_function": { + "function_reference": 0, + "args": [ + { + "selection": { + "direct_reference": { + "struct_field": { + "field": 3 + } + } + } + } + ], + "output_type": { + "bool": { + "type_variation_reference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + } + } + }, + { + "scalar_function": { + "function_reference": 0, + "args": [ + { + "selection": { + "direct_reference": { + "struct_field": { + "field": 2 + } + } + } + } + ], + "output_type": { + "bool": { + "type_variation_reference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + } + } + } + ], + "output_type": { + "bool": { + "type_variation_reference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + } + } + }, + { + "scalar_function": { + "function_reference": 0, + "args": [ + { + "selection": { + "direct_reference": { + "struct_field": { + "field": 0 + } + } + } + } + ], + "output_type": { + "bool": { + "type_variation_reference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + } + } + } + ], + "output_type": { + "bool": { + "type_variation_reference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + } + } + }, + { + "scalar_function": { + "function_reference": 2, + "args": [ + { + "selection": { + "direct_reference": { + "struct_field": { + "field": 3 + } + } + } + }, + { + "literal": { + "nullable": false, + "fp64": 8766 + } + } + ], + "output_type": { + "bool": { + "type_variation_reference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + } + } + } + ], + "output_type": { + "bool": { + "type_variation_reference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + } + } + }, + { + "scalar_function": { + "function_reference": 3, + "args": [ + { + "selection": { + "direct_reference": { + "struct_field": { + "field": 3 + } + } + } + }, + { + "literal": { + "nullable": false, + "fp64": 9131 + } + } + ], + "output_type": { + "bool": { + "type_variation_reference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + } + } + } + ], + "output_type": { + "bool": { + "type_variation_reference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + } + } + }, + { + "scalar_function": { + "function_reference": 2, + "args": [ + { + "selection": { + "direct_reference": { + "struct_field": { + "field": 2 + } + } + } + }, + { + "literal": { + "nullable": false, + "fp64": 0.05 + } + } + ], + "output_type": { + "bool": { + "type_variation_reference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + } + } + } + ], + "output_type": { + "bool": { + "type_variation_reference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + } + } + }, + { + "scalar_function": { + "function_reference": 4, + "args": [ + { + "selection": { + "direct_reference": { + "struct_field": { + "field": 2 + } + } + } + }, + { + "literal": { + "nullable": false, + "fp64": 0.07 + } + } + ], + "output_type": { + "bool": { + "type_variation_reference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + } + } + } + ], + "output_type": { + "bool": { + "type_variation_reference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + } + } + }, + { + "scalar_function": { + "function_reference": 3, + "args": [ + { + "selection": { + "direct_reference": { + "struct_field": { + "field": 0 + } + } + } + }, + { + "literal": { + "nullable": false, + "fp64": 24 + } + } + ], + "output_type": { + "bool": { + "type_variation_reference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + } + } + } + ], + "output_type": { + "bool": { + "type_variation_reference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + } + } + }, + "local_files": { + "items": [ + { + "format": "FILE_FORMAT_UNSPECIFIED", + "partition_index": "0", + "start": "0", + "length": "3719", + "uri_file": "/mock_lineitem.orc" + } + ] + } + } + }, + "condition": { + "scalar_function": { + "function_reference": 1, + "args": [ + { + "scalar_function": { + "function_reference": 1, + "args": [ + { + "scalar_function": { + "function_reference": 1, + "args": [ + { + "scalar_function": { + "function_reference": 1, + "args": [ + { + "scalar_function": { + "function_reference": 1, + "args": [ + { + "scalar_function": { + "function_reference": 1, + "args": [ + { + "scalar_function": { + "function_reference": 1, + "args": [ + { + "scalar_function": { + "function_reference": 0, + "args": [ + { + "selection": { + "direct_reference": { + "struct_field": { + "field": 3 + } + } + } + } + ], + "output_type": { + "bool": { + "type_variation_reference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + } + } + }, + { + "scalar_function": { + "function_reference": 0, + "args": [ + { + "selection": { + "direct_reference": { + "struct_field": { + "field": 2 + } + } + } + } + ], + "output_type": { + "bool": { + "type_variation_reference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + } + } + } + ], + "output_type": { + "bool": { + "type_variation_reference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + } + } + }, + { + "scalar_function": { + "function_reference": 0, + "args": [ + { + "selection": { + "direct_reference": { + "struct_field": { + "field": 0 + } + } + } + } + ], + "output_type": { + "bool": { + "type_variation_reference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + } + } + } + ], + "output_type": { + "bool": { + "type_variation_reference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + } + } + }, + { + "scalar_function": { + "function_reference": 2, + "args": [ + { + "selection": { + "direct_reference": { + "struct_field": { + "field": 3 + } + } + } + }, + { + "literal": { + "nullable": false, + "fp64": 8766 + } + } + ], + "output_type": { + "bool": { + "type_variation_reference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + } + } + } + ], + "output_type": { + "bool": { + "type_variation_reference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + } + } + }, + { + "scalar_function": { + "function_reference": 3, + "args": [ + { + "selection": { + "direct_reference": { + "struct_field": { + "field": 3 + } + } + } + }, + { + "literal": { + "nullable": false, + "fp64": 9131 + } + } + ], + "output_type": { + "bool": { + "type_variation_reference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + } + } + } + ], + "output_type": { + "bool": { + "type_variation_reference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + } + } + }, + { + "scalar_function": { + "function_reference": 2, + "args": [ + { + "selection": { + "direct_reference": { + "struct_field": { + "field": 2 + } + } + } + }, + { + "literal": { + "nullable": false, + "fp64": 0.05 + } + } + ], + "output_type": { + "bool": { + "type_variation_reference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + } + } + } + ], + "output_type": { + "bool": { + "type_variation_reference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + } + } + }, + { + "scalar_function": { + "function_reference": 4, + "args": [ + { + "selection": { + "direct_reference": { + "struct_field": { + "field": 2 + } + } + } + }, + { + "literal": { + "nullable": false, + "fp64": 0.07 + } + } + ], + "output_type": { + "bool": { + "type_variation_reference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + } + } + } + ], + "output_type": { + "bool": { + "type_variation_reference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + } + } + }, + { + "scalar_function": { + "function_reference": 3, + "args": [ + { + "selection": { + "direct_reference": { + "struct_field": { + "field": 0 + } + } + } + }, + { + "literal": { + "nullable": false, + "fp64": 24 + } + } + ], + "output_type": { + "bool": { + "type_variation_reference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + } + } + } + ], + "output_type": { + "bool": { + "type_variation_reference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + } + } + } + } + }, + "expressions": [ + { + "selection": { + "direct_reference": { + "struct_field": { + "field": 1 + } + } + } + }, + { + "selection": { + "direct_reference": { + "struct_field": { + "field": 2 + } + } + } + } + ] + } + }, + "groupings": [ + { + "grouping_expressions": [] + } + ], + "measures": [ + { + "measure": { + "function_reference": 5, + "args": [ + { + "scalar_function": { + "function_reference": 6, + "args": [ + { + "selection": { + "direct_reference": { + "struct_field": { + "field": 0 + } + } + } + }, + { + "selection": { + "direct_reference": { + "struct_field": { + "field": 1 + } + } + } + } + ], + "output_type": { + "fp64": { + "type_variation_reference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + } + } + } + ], + "sorts": [], + "phase": "AGGREGATION_PHASE_INITIAL_TO_INTERMEDIATE", + "output_type": { + "fp64": { + "type_variation_reference": 0, + "nullability": "NULLABILITY_NULLABLE" + } + } + } + } + ] + } + }, + "names": [] + } + } + ], + "expected_type_urls": [] +}