From 56ec4dff02ed1b4e801842a66bfe151eaf461fce Mon Sep 17 00:00:00 2001 From: yahoNanJing <90197956+yahoNanJing@users.noreply.github.com> Date: Thu, 14 Jul 2022 03:30:52 +0800 Subject: [PATCH] Use latest DataFusion (#86) * Update datafusion dependency to commit d0d5564b8f689a01e542b8c1df829d74d0fab2b0 * Fix inconsistency * Use latest DataFusion * Fix tomlfmt * Fix PR review Co-authored-by: yangzhong --- Cargo.toml | 2 +- ballista-cli/Cargo.toml | 4 +- ballista/rust/client/Cargo.toml | 4 +- ballista/rust/client/src/context.rs | 14 +- ballista/rust/core/Cargo.toml | 7 +- ballista/rust/core/proto/ballista.proto | 223 +------- ballista/rust/core/proto/datafusion.proto | 516 ++++++++++++++---- .../core/src/serde/logical_plan/from_proto.rs | 52 -- .../rust/core/src/serde/logical_plan/mod.rs | 424 -------------- ballista/rust/core/src/serde/mod.rs | 52 +- .../src/serde/physical_plan/from_proto.rs | 84 +-- .../rust/core/src/serde/physical_plan/mod.rs | 53 +- .../core/src/serde/physical_plan/to_proto.rs | 10 +- ballista/rust/executor/Cargo.toml | 8 +- ballista/rust/executor/src/execution_loop.rs | 1 + ballista/rust/executor/src/executor_server.rs | 1 + ballista/rust/scheduler/Cargo.toml | 5 +- .../scheduler/src/scheduler_server/grpc.rs | 69 ++- .../scheduler/src/scheduler_server/mod.rs | 57 +- benchmarks/Cargo.toml | 4 +- examples/Cargo.toml | 2 +- 21 files changed, 594 insertions(+), 998 deletions(-) delete mode 100644 ballista/rust/core/src/serde/logical_plan/from_proto.rs delete mode 100644 ballista/rust/core/src/serde/logical_plan/mod.rs diff --git a/Cargo.toml b/Cargo.toml index 9b9e79324..fc4c25ca5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,7 +28,7 @@ exclude = ["ballista-cli", "python"] # cargo build --profile release-lto [profile.release-lto] -inherits = "release" codegen-units = 1 +inherits = "release" lto = true diff --git a/ballista-cli/Cargo.toml b/ballista-cli/Cargo.toml index db0ad6b76..11c2e7723 100644 --- a/ballista-cli/Cargo.toml +++ b/ballista-cli/Cargo.toml @@ -31,8 +31,8 @@ readme = "README.md" [dependencies] ballista = { path = "../ballista/rust/client", version = "0.7.0" } clap = { version = "3", features = ["derive", "cargo"] } -datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "0289bfe6a98bdae371eee29d3f257b173ddb4437" } -datafusion-cli = { git = "https://github.com/apache/arrow-datafusion", rev = "0289bfe6a98bdae371eee29d3f257b173ddb4437" } +datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "6a5de4fe08597896ab6375e3e4b76c5744dcfba7" } +datafusion-cli = { git = "https://github.com/apache/arrow-datafusion", rev = "6a5de4fe08597896ab6375e3e4b76c5744dcfba7" } dirs = "4.0.0" env_logger = "0.9" mimalloc = { version = "0.1", default-features = false } diff --git a/ballista/rust/client/Cargo.toml b/ballista/rust/client/Cargo.toml index f20ed7129..9cf4eb33a 100644 --- a/ballista/rust/client/Cargo.toml +++ b/ballista/rust/client/Cargo.toml @@ -31,8 +31,8 @@ rust-version = "1.59" ballista-core = { path = "../core", version = "0.7.0" } ballista-executor = { path = "../executor", version = "0.7.0", optional = true } ballista-scheduler = { path = "../scheduler", version = "0.7.0", optional = true } -datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "0289bfe6a98bdae371eee29d3f257b173ddb4437" } -datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev = "0289bfe6a98bdae371eee29d3f257b173ddb4437" } +datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "6a5de4fe08597896ab6375e3e4b76c5744dcfba7" } +datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev = "6a5de4fe08597896ab6375e3e4b76c5744dcfba7" } futures = "0.3" log = "0.4" parking_lot = "0.12" diff --git a/ballista/rust/client/src/context.rs b/ballista/rust/client/src/context.rs index 09ff9cee4..03cd0799b 100644 --- a/ballista/rust/client/src/context.rs +++ b/ballista/rust/client/src/context.rs @@ -440,6 +440,8 @@ impl BallistaContext { #[cfg(test)] mod tests { + #[cfg(feature = "standalone")] + use datafusion::datasource::listing::ListingTableUrl; #[tokio::test] #[cfg(feature = "standalone")] @@ -591,10 +593,14 @@ mod tests { target_partitions: x.target_partitions, }; - let config = - ListingTableConfig::new(listing_table.table_path().clone()) - .with_schema(Arc::new(Schema::new(vec![]))) - .with_listing_options(error_options); + let table_paths = listing_table + .table_paths() + .iter() + .map(|t| ListingTableUrl::parse(t).unwrap()) + .collect(); + let config = ListingTableConfig::new_with_multi_paths(table_paths) + .with_schema(Arc::new(Schema::new(vec![]))) + .with_listing_options(error_options); let error_table = ListingTable::try_new(config).unwrap(); diff --git a/ballista/rust/core/Cargo.toml b/ballista/rust/core/Cargo.toml index a3803ab5b..32f1c47ea 100644 --- a/ballista/rust/core/Cargo.toml +++ b/ballista/rust/core/Cargo.toml @@ -35,17 +35,18 @@ simd = ["datafusion/simd"] [dependencies] ahash = { version = "0.7", default-features = false } -arrow-flight = { version = "16.0.0" } +arrow-flight = { version = "18.0.0" } async-trait = "0.1.41" chrono = { version = "0.4", default-features = false } clap = { version = "3", features = ["derive", "cargo"] } -datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "0289bfe6a98bdae371eee29d3f257b173ddb4437" } -datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev = "0289bfe6a98bdae371eee29d3f257b173ddb4437" } +datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "6a5de4fe08597896ab6375e3e4b76c5744dcfba7" } +datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev = "6a5de4fe08597896ab6375e3e4b76c5744dcfba7" } futures = "0.3" hashbrown = "0.12" libloading = "0.7.3" log = "0.4" +object_store = "0.3.0" once_cell = "1.9.0" parking_lot = "0.12" diff --git a/ballista/rust/core/proto/ballista.proto b/ballista/rust/core/proto/ballista.proto index 0af951cf8..f899a6035 100644 --- a/ballista/rust/core/proto/ballista.proto +++ b/ballista/rust/core/proto/ballista.proto @@ -21,7 +21,7 @@ syntax = "proto3"; package ballista.protobuf; option java_multiple_files = true; -option java_package = "org.ballistacompute.protobuf"; +option java_package = "org.apache.arrow.ballista.protobuf"; option java_outer_classname = "BallistaProto"; import "datafusion.proto"; @@ -30,43 +30,6 @@ import "datafusion.proto"; // Ballista Logical Plan /////////////////////////////////////////////////////////////////////////////////////////////////// -// LogicalPlan is a nested type -message LogicalPlanNode { - oneof LogicalPlanType { - ListingTableScanNode listing_scan = 1; - ProjectionNode projection = 3; - SelectionNode selection = 4; - LimitNode limit = 5; - AggregateNode aggregate = 6; - JoinNode join = 7; - SortNode sort = 8; - RepartitionNode repartition = 9; - EmptyRelationNode empty_relation = 10; - CreateExternalTableNode create_external_table = 11; - ExplainNode explain = 12; - WindowNode window = 13; - AnalyzeNode analyze = 14; - CrossJoinNode cross_join = 15; - ValuesNode values = 16; - LogicalExtensionNode extension = 17; - CreateCatalogSchemaNode create_catalog_schema = 18; - UnionNode union = 19; - CreateCatalogNode create_catalog = 20; - SubqueryAliasNode subquery_alias = 21; - CreateViewNode create_view = 22; - OffsetNode offset = 23; - } -} - -message LogicalExtensionNode { - bytes node = 1; - repeated LogicalPlanNode inputs = 2; -} - -message ProjectionColumns { - repeated string columns = 1; -} - message Statistics { int64 num_rows = 1; int64 total_byte_size = 2; @@ -87,186 +50,6 @@ message PartitionedFile { FileRange range = 5; } -message CsvFormat { - bool has_header = 1; - string delimiter = 2; -} - -message ParquetFormat { - bool enable_pruning = 1; -} - -message AvroFormat {} - -message ListingTableScanNode { - string table_name = 1; - string path = 2; - string file_extension = 3; - ProjectionColumns projection = 4; - datafusion.Schema schema = 5; - repeated datafusion.LogicalExprNode filters = 6; - repeated string table_partition_cols = 7; - bool collect_stat = 8; - uint32 target_partitions = 9; - oneof FileFormatType { - CsvFormat csv = 10; - ParquetFormat parquet = 11; - AvroFormat avro = 12; - } -} - -message ProjectionNode { - LogicalPlanNode input = 1; - repeated datafusion.LogicalExprNode expr = 2; - oneof optional_alias { - string alias = 3; - } -} - -message SelectionNode { - LogicalPlanNode input = 1; - datafusion.LogicalExprNode expr = 2; -} - -message SortNode { - LogicalPlanNode input = 1; - repeated datafusion.LogicalExprNode expr = 2; -} - -message RepartitionNode { - LogicalPlanNode input = 1; - oneof partition_method { - uint64 round_robin = 2; - HashRepartition hash = 3; - } -} - -message HashRepartition { - repeated datafusion.LogicalExprNode hash_expr = 1; - uint64 partition_count = 2; -} - -message EmptyRelationNode { - bool produce_one_row = 1; -} - -message CreateExternalTableNode { - string name = 1; - string location = 2; - FileType file_type = 3; - bool has_header = 4; - datafusion.DfSchema schema = 5; - repeated string table_partition_cols = 6; - bool if_not_exists = 7; - string delimiter = 8; -} - -message CreateCatalogSchemaNode { - string schema_name = 1; - bool if_not_exists = 2; - datafusion.DfSchema schema = 3; -} - -message CreateCatalogNode { - string catalog_name = 1; - bool if_not_exists = 2; - datafusion.DfSchema schema = 3; -} - -message CreateViewNode { - string name = 1; - LogicalPlanNode input = 2; - bool or_replace = 3; -} - -// a node containing data for defining values list. unlike in SQL where it's two dimensional, here -// the list is flattened, and with the field n_cols it can be parsed and partitioned into rows -message ValuesNode { - uint64 n_cols = 1; - repeated datafusion.LogicalExprNode values_list = 2; -} - -enum FileType { - NdJson = 0; - Parquet = 1; - CSV = 2; - Avro = 3; -} - -message AnalyzeNode { - LogicalPlanNode input = 1; - bool verbose = 2; -} - -message ExplainNode { - LogicalPlanNode input = 1; - bool verbose = 2; -} - -message AggregateNode { - LogicalPlanNode input = 1; - repeated datafusion.LogicalExprNode group_expr = 2; - repeated datafusion.LogicalExprNode aggr_expr = 3; -} - -message WindowNode { - LogicalPlanNode input = 1; - repeated datafusion.LogicalExprNode window_expr = 2; -} - -enum JoinType { - INNER = 0; - LEFT = 1; - RIGHT = 2; - FULL = 3; - SEMI = 4; - ANTI = 5; -} - -enum JoinConstraint { - ON = 0; - USING = 1; -} - -message JoinNode { - LogicalPlanNode left = 1; - LogicalPlanNode right = 2; - JoinType join_type = 3; - JoinConstraint join_constraint = 4; - repeated datafusion.Column left_join_column = 5; - repeated datafusion.Column right_join_column = 6; - bool null_equals_null = 7; - datafusion.LogicalExprNode filter = 8; -} - -message UnionNode { - repeated LogicalPlanNode inputs = 1; -} - -message CrossJoinNode { - LogicalPlanNode left = 1; - LogicalPlanNode right = 2; -} - -message LimitNode { - LogicalPlanNode input = 1; - uint32 limit = 2; -} - -message OffsetNode { - LogicalPlanNode input = 1; - uint32 offset = 2; -} - -message SelectionExecNode { - datafusion.LogicalExprNode expr = 1; -} - -message SubqueryAliasNode { - LogicalPlanNode input = 1; - string alias = 2; -} - /////////////////////////////////////////////////////////////////////////////////////////////////// // Ballista Physical Plan /////////////////////////////////////////////////////////////////////////////////////////////////// @@ -480,7 +263,7 @@ message HashJoinExecNode { PhysicalPlanNode left = 1; PhysicalPlanNode right = 2; repeated JoinOn on = 3; - JoinType join_type = 4; + datafusion.JoinType join_type = 4; PartitionMode partition_mode = 6; bool null_equals_null = 7; JoinFilter filter = 8; @@ -893,7 +676,7 @@ message GetJobStatusResult { message GetFileMetadataParams { string path = 1; - FileType file_type = 2; + datafusion.FileType file_type = 2; } message GetFileMetadataResult { diff --git a/ballista/rust/core/proto/datafusion.proto b/ballista/rust/core/proto/datafusion.proto index 9999abbf2..09c970292 100644 --- a/ballista/rust/core/proto/datafusion.proto +++ b/ballista/rust/core/proto/datafusion.proto @@ -21,7 +21,7 @@ syntax = "proto3"; package datafusion; option java_multiple_files = true; -option java_package = "org.datafusioncompute.protobuf"; +option java_package = "org.apache.arrow.datafusion.protobuf"; option java_outer_classname = "DatafusionProto"; message ColumnRelation { @@ -40,6 +40,228 @@ message DfField{ message DfSchema { repeated DfField columns = 1; + map metadata = 2; +} + +// logical plan +// LogicalPlan is a nested type +message LogicalPlanNode { + oneof LogicalPlanType { + ListingTableScanNode listing_scan = 1; + ProjectionNode projection = 3; + SelectionNode selection = 4; + LimitNode limit = 5; + AggregateNode aggregate = 6; + JoinNode join = 7; + SortNode sort = 8; + RepartitionNode repartition = 9; + EmptyRelationNode empty_relation = 10; + CreateExternalTableNode create_external_table = 11; + ExplainNode explain = 12; + WindowNode window = 13; + AnalyzeNode analyze = 14; + CrossJoinNode cross_join = 15; + ValuesNode values = 16; + LogicalExtensionNode extension = 17; + CreateCatalogSchemaNode create_catalog_schema = 18; + UnionNode union = 19; + CreateCatalogNode create_catalog = 20; + SubqueryAliasNode subquery_alias = 21; + CreateViewNode create_view = 22; + DistinctNode distinct = 23; + } +} + +message LogicalExtensionNode { + bytes node = 1; + repeated LogicalPlanNode inputs = 2; +} + +message ProjectionColumns { + repeated string columns = 1; +} + +message CsvFormat { + bool has_header = 1; + string delimiter = 2; +} + +message ParquetFormat { + bool enable_pruning = 1; +} + +message AvroFormat {} + +message ListingTableScanNode { + string table_name = 1; + repeated string paths = 2; + string file_extension = 3; + ProjectionColumns projection = 4; + datafusion.Schema schema = 5; + repeated datafusion.LogicalExprNode filters = 6; + repeated string table_partition_cols = 7; + bool collect_stat = 8; + uint32 target_partitions = 9; + oneof FileFormatType { + CsvFormat csv = 10; + ParquetFormat parquet = 11; + AvroFormat avro = 12; + } +} + +message ProjectionNode { + LogicalPlanNode input = 1; + repeated datafusion.LogicalExprNode expr = 2; + oneof optional_alias { + string alias = 3; + } +} + +message SelectionNode { + LogicalPlanNode input = 1; + datafusion.LogicalExprNode expr = 2; +} + +message SortNode { + LogicalPlanNode input = 1; + repeated datafusion.LogicalExprNode expr = 2; +} + +message RepartitionNode { + LogicalPlanNode input = 1; + oneof partition_method { + uint64 round_robin = 2; + HashRepartition hash = 3; + } +} + +message HashRepartition { + repeated datafusion.LogicalExprNode hash_expr = 1; + uint64 partition_count = 2; +} + +message EmptyRelationNode { + bool produce_one_row = 1; +} + +message CreateExternalTableNode { + string name = 1; + string location = 2; + FileType file_type = 3; + bool has_header = 4; + datafusion.DfSchema schema = 5; + repeated string table_partition_cols = 6; + bool if_not_exists = 7; + string delimiter = 8; +} + +message CreateCatalogSchemaNode { + string schema_name = 1; + bool if_not_exists = 2; + datafusion.DfSchema schema = 3; +} + +message CreateCatalogNode { + string catalog_name = 1; + bool if_not_exists = 2; + datafusion.DfSchema schema = 3; +} + +message CreateViewNode { + string name = 1; + LogicalPlanNode input = 2; + bool or_replace = 3; + string definition = 4; +} + +// a node containing data for defining values list. unlike in SQL where it's two dimensional, here +// the list is flattened, and with the field n_cols it can be parsed and partitioned into rows +message ValuesNode { + uint64 n_cols = 1; + repeated datafusion.LogicalExprNode values_list = 2; +} + +enum FileType { + NdJson = 0; + Parquet = 1; + CSV = 2; + Avro = 3; +} + +message AnalyzeNode { + LogicalPlanNode input = 1; + bool verbose = 2; +} + +message ExplainNode { + LogicalPlanNode input = 1; + bool verbose = 2; +} + +message AggregateNode { + LogicalPlanNode input = 1; + repeated datafusion.LogicalExprNode group_expr = 2; + repeated datafusion.LogicalExprNode aggr_expr = 3; +} + +message WindowNode { + LogicalPlanNode input = 1; + repeated datafusion.LogicalExprNode window_expr = 2; +} + +enum JoinType { + INNER = 0; + LEFT = 1; + RIGHT = 2; + FULL = 3; + SEMI = 4; + ANTI = 5; +} + +enum JoinConstraint { + ON = 0; + USING = 1; +} + +message JoinNode { + LogicalPlanNode left = 1; + LogicalPlanNode right = 2; + JoinType join_type = 3; + JoinConstraint join_constraint = 4; + repeated datafusion.Column left_join_column = 5; + repeated datafusion.Column right_join_column = 6; + bool null_equals_null = 7; + LogicalExprNode filter = 8; +} + +message DistinctNode { + LogicalPlanNode input = 1; +} + +message UnionNode { + repeated LogicalPlanNode inputs = 1; +} + +message CrossJoinNode { + LogicalPlanNode left = 1; + LogicalPlanNode right = 2; +} + +message LimitNode { + LogicalPlanNode input = 1; + // The number of rows to skip before fetch; non-positive means don't skip any + int64 skip = 2; + // Maximum number of rows to fetch; negative means no limit + int64 fetch = 3; +} + +message SelectionExecNode { + datafusion.LogicalExprNode expr = 1; +} + +message SubqueryAliasNode { + LogicalPlanNode input = 1; + string alias = 2; } // logical expressions @@ -76,9 +298,46 @@ message LogicalExprNode { // window expressions WindowExprNode window_expr = 18; + + // AggregateUDF expressions + AggregateUDFExprNode aggregate_udf_expr = 19; + + // Scalar UDF expressions + ScalarUDFExprNode scalar_udf_expr = 20; + + GetIndexedField get_indexed_field = 21; + + GroupingSetNode grouping_set = 22; + + CubeNode cube = 23; + + RollupNode rollup = 24; } } +message LogicalExprList { + repeated LogicalExprNode expr = 1; +} + +message GroupingSetNode { + repeated LogicalExprList expr = 1; +} + +message CubeNode { + repeated LogicalExprNode expr = 1; +} + +message RollupNode { + repeated LogicalExprNode expr = 1; +} + + + +message GetIndexedField { + LogicalExprNode expr = 1; + ScalarValue key = 2; +} + message IsNull { LogicalExprNode expr = 1; } @@ -177,6 +436,8 @@ enum ScalarFunction { Trim=61; Upper=62; Coalesce=63; + Power=64; + StructFun=65; } message ScalarFunctionNode { @@ -202,6 +463,7 @@ enum AggregateFunction { APPROX_PERCENTILE_CONT = 14; APPROX_MEDIAN=15; APPROX_PERCENTILE_CONT_WITH_WEIGHT = 16; + GROUPING = 17; } message AggregateExprNode { @@ -209,6 +471,16 @@ message AggregateExprNode { repeated LogicalExprNode expr = 2; } +message AggregateUDFExprNode { + string fun_name = 1; + repeated LogicalExprNode args = 2; +} + +message ScalarUDFExprNode { + string fun_name = 1; + repeated LogicalExprNode args = 2; +} + enum BuiltInWindowFunction { ROW_NUMBER = 0; RANK = 1; @@ -321,53 +593,53 @@ message Field { } message FixedSizeBinary{ - int32 length = 1; + int32 length = 1; } message Timestamp{ - TimeUnit time_unit = 1; - string timezone = 2; + TimeUnit time_unit = 1; + string timezone = 2; } enum DateUnit{ - Day = 0; - DateMillisecond = 1; + Day = 0; + DateMillisecond = 1; } enum TimeUnit{ - Second = 0; - TimeMillisecond = 1; - Microsecond = 2; - Nanosecond = 3; + Second = 0; + TimeMillisecond = 1; + Microsecond = 2; + Nanosecond = 3; } enum IntervalUnit{ - YearMonth = 0; - DayTime = 1; - MonthDayNano = 2; + YearMonth = 0; + DayTime = 1; + MonthDayNano = 2; } message Decimal{ - uint64 whole = 1; - uint64 fractional = 2; + uint64 whole = 1; + uint64 fractional = 2; } message List{ - Field field_type = 1; + Field field_type = 1; } message FixedSizeList{ - Field field_type = 1; - int32 list_size = 2; + Field field_type = 1; + int32 list_size = 2; } message Dictionary{ - ArrowType key = 1; - ArrowType value = 2; + ArrowType key = 1; + ArrowType value = 2; } message Struct{ - repeated Field sub_field_types = 1; + repeated Field sub_field_types = 1; } enum UnionMode{ @@ -376,45 +648,53 @@ enum UnionMode{ } message Union{ - repeated Field union_types = 1; - UnionMode union_mode = 2; + repeated Field union_types = 1; + UnionMode union_mode = 2; + repeated int32 type_ids = 3; } message ScalarListValue{ - ScalarType datatype = 1; - repeated ScalarValue values = 2; + ScalarType datatype = 1; + repeated ScalarValue values = 2; +} + +message ScalarTimestampValue { + oneof value { + int64 time_microsecond_value = 1; + int64 time_nanosecond_value = 2; + int64 time_second_value = 3; + int64 time_millisecond_value = 4; + }; + string timezone = 5; } message ScalarValue{ - oneof value { - bool bool_value = 1; - string utf8_value = 2; - string large_utf8_value = 3; - int32 int8_value = 4; - int32 int16_value = 5; - int32 int32_value = 6; - int64 int64_value = 7; - uint32 uint8_value = 8; - uint32 uint16_value = 9; - uint32 uint32_value = 10; - uint64 uint64_value = 11; - float float32_value = 12; - double float64_value = 13; - //Literal Date32 value always has a unit of day - int32 date_32_value = 14; - int64 time_microsecond_value = 15; - int64 time_nanosecond_value = 16; - ScalarListValue list_value = 17; - ScalarType null_list_value = 18; - - PrimitiveScalarType null_value = 19; - Decimal128 decimal128_value = 20; - int64 date_64_value = 21; - int64 time_second_value = 22; - int64 time_millisecond_value = 23; - int32 interval_yearmonth_value = 24; - int64 interval_daytime_value = 25; - } + oneof value { + bool bool_value = 1; + string utf8_value = 2; + string large_utf8_value = 3; + int32 int8_value = 4; + int32 int16_value = 5; + int32 int32_value = 6; + int64 int64_value = 7; + uint32 uint8_value = 8; + uint32 uint16_value = 9; + uint32 uint32_value = 10; + uint64 uint64_value = 11; + float float32_value = 12; + double float64_value = 13; + //Literal Date32 value always has a unit of day + int32 date_32_value = 14; + ScalarListValue list_value = 17; + ScalarType null_list_value = 18; + + PrimitiveScalarType null_value = 19; + Decimal128 decimal128_value = 20; + int64 date_64_value = 21; + int32 interval_yearmonth_value = 24; + int64 interval_daytime_value = 25; + ScalarTimestampValue timestamp_value = 26; + } } message Decimal128{ @@ -427,42 +707,42 @@ message Decimal128{ // List enum PrimitiveScalarType{ - BOOL = 0; // arrow::Type::BOOL - UINT8 = 1; // arrow::Type::UINT8 - INT8 = 2; // arrow::Type::INT8 - UINT16 = 3; // represents arrow::Type fields in src/arrow/type.h - INT16 = 4; - UINT32 = 5; - INT32 = 6; - UINT64 = 7; - INT64 = 8; - FLOAT32 = 9; - FLOAT64 = 10; - UTF8 = 11; - LARGE_UTF8 = 12; - DATE32 = 13; - TIME_MICROSECOND = 14; - TIME_NANOSECOND = 15; - NULL = 16; - - DECIMAL128 = 17; - DATE64 = 20; - TIME_SECOND = 21; - TIME_MILLISECOND = 22; - INTERVAL_YEARMONTH = 23; - INTERVAL_DAYTIME = 24; + BOOL = 0; // arrow::Type::BOOL + UINT8 = 1; // arrow::Type::UINT8 + INT8 = 2; // arrow::Type::INT8 + UINT16 = 3; // represents arrow::Type fields in src/arrow/type.h + INT16 = 4; + UINT32 = 5; + INT32 = 6; + UINT64 = 7; + INT64 = 8; + FLOAT32 = 9; + FLOAT64 = 10; + UTF8 = 11; + LARGE_UTF8 = 12; + DATE32 = 13; + TIME_MICROSECOND = 14; + TIME_NANOSECOND = 15; + NULL = 16; + + DECIMAL128 = 17; + DATE64 = 20; + TIME_SECOND = 21; + TIME_MILLISECOND = 22; + INTERVAL_YEARMONTH = 23; + INTERVAL_DAYTIME = 24; } message ScalarType{ - oneof datatype{ - PrimitiveScalarType scalar = 1; - ScalarListType list = 2; - } + oneof datatype{ + PrimitiveScalarType scalar = 1; + ScalarListType list = 2; + } } message ScalarListType{ - repeated string field_names = 3; - PrimitiveScalarType deepest_type = 2; + repeated string field_names = 3; + PrimitiveScalarType deepest_type = 2; } // Broke out into multiple message types so that type @@ -470,40 +750,40 @@ message ScalarListType{ //All types that are of the empty message types contain no additional metadata // about the type message ArrowType{ - oneof arrow_type_enum{ - EmptyMessage NONE = 1; // arrow::Type::NA - EmptyMessage BOOL = 2; // arrow::Type::BOOL - EmptyMessage UINT8 = 3; // arrow::Type::UINT8 - EmptyMessage INT8 = 4; // arrow::Type::INT8 - EmptyMessage UINT16 =5; // represents arrow::Type fields in src/arrow/type.h - EmptyMessage INT16 = 6; - EmptyMessage UINT32 =7; - EmptyMessage INT32 = 8; - EmptyMessage UINT64 =9; - EmptyMessage INT64 =10 ; - EmptyMessage FLOAT16 =11 ; - EmptyMessage FLOAT32 =12 ; - EmptyMessage FLOAT64 =13 ; - EmptyMessage UTF8 =14 ; - EmptyMessage LARGE_UTF8 = 32; - EmptyMessage BINARY =15 ; - int32 FIXED_SIZE_BINARY =16 ; - EmptyMessage LARGE_BINARY = 31; - EmptyMessage DATE32 =17 ; - EmptyMessage DATE64 =18 ; - TimeUnit DURATION = 19; - Timestamp TIMESTAMP =20 ; - TimeUnit TIME32 =21 ; - TimeUnit TIME64 =22 ; - IntervalUnit INTERVAL =23 ; - Decimal DECIMAL =24 ; - List LIST =25; - List LARGE_LIST = 26; - FixedSizeList FIXED_SIZE_LIST = 27; - Struct STRUCT =28; - Union UNION =29; - Dictionary DICTIONARY =30; - } + oneof arrow_type_enum{ + EmptyMessage NONE = 1; // arrow::Type::NA + EmptyMessage BOOL = 2; // arrow::Type::BOOL + EmptyMessage UINT8 = 3; // arrow::Type::UINT8 + EmptyMessage INT8 = 4; // arrow::Type::INT8 + EmptyMessage UINT16 =5; // represents arrow::Type fields in src/arrow/type.h + EmptyMessage INT16 = 6; + EmptyMessage UINT32 =7; + EmptyMessage INT32 = 8; + EmptyMessage UINT64 =9; + EmptyMessage INT64 =10 ; + EmptyMessage FLOAT16 =11 ; + EmptyMessage FLOAT32 =12 ; + EmptyMessage FLOAT64 =13 ; + EmptyMessage UTF8 =14 ; + EmptyMessage LARGE_UTF8 = 32; + EmptyMessage BINARY =15 ; + int32 FIXED_SIZE_BINARY =16 ; + EmptyMessage LARGE_BINARY = 31; + EmptyMessage DATE32 =17 ; + EmptyMessage DATE64 =18 ; + TimeUnit DURATION = 19; + Timestamp TIMESTAMP =20 ; + TimeUnit TIME32 =21 ; + TimeUnit TIME64 =22 ; + IntervalUnit INTERVAL =23 ; + Decimal DECIMAL =24 ; + List LIST =25; + List LARGE_LIST = 26; + FixedSizeList FIXED_SIZE_LIST = 27; + Struct STRUCT =28; + Union UNION =29; + Dictionary DICTIONARY =30; + } } //Useful for representing an empty enum variant in rust diff --git a/ballista/rust/core/src/serde/logical_plan/from_proto.rs b/ballista/rust/core/src/serde/logical_plan/from_proto.rs deleted file mode 100644 index a17c646ca..000000000 --- a/ballista/rust/core/src/serde/logical_plan/from_proto.rs +++ /dev/null @@ -1,52 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! Serde code to convert from protocol buffers to Rust data structures. - -use crate::error::BallistaError; -use crate::serde::protobuf; -use std::convert::TryFrom; - -impl TryFrom for protobuf::FileType { - type Error = BallistaError; - fn try_from(value: i32) -> Result { - use protobuf::FileType; - match value { - _x if _x == FileType::NdJson as i32 => Ok(FileType::NdJson), - _x if _x == FileType::Parquet as i32 => Ok(FileType::Parquet), - _x if _x == FileType::Csv as i32 => Ok(FileType::Csv), - _x if _x == FileType::Avro as i32 => Ok(FileType::Avro), - invalid => Err(BallistaError::General(format!( - "Attempted to convert invalid i32 to protobuf::Filetype: {}", - invalid - ))), - } - } -} - -#[allow(clippy::from_over_into)] -impl Into for protobuf::FileType { - fn into(self) -> datafusion::logical_plan::FileType { - use datafusion::logical_plan::FileType; - match self { - protobuf::FileType::NdJson => FileType::NdJson, - protobuf::FileType::Parquet => FileType::Parquet, - protobuf::FileType::Csv => FileType::CSV, - protobuf::FileType::Avro => FileType::Avro, - } - } -} diff --git a/ballista/rust/core/src/serde/logical_plan/mod.rs b/ballista/rust/core/src/serde/logical_plan/mod.rs deleted file mode 100644 index 5952a0952..000000000 --- a/ballista/rust/core/src/serde/logical_plan/mod.rs +++ /dev/null @@ -1,424 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -pub mod from_proto; - -#[macro_export] -macro_rules! into_logical_plan { - ($PB:expr, $CTX:expr, $CODEC:expr) => {{ - if let Some(field) = $PB.as_ref() { - field.as_ref().try_into_logical_plan($CTX, $CODEC) - } else { - Err(proto_error("Missing required field in protobuf")) - } - }}; -} - -#[cfg(test)] -mod roundtrip_tests { - - use super::super::{super::error::Result, protobuf}; - use crate::serde::{AsLogicalPlan, BallistaCodec}; - use async_trait::async_trait; - use core::panic; - use datafusion::common::DFSchemaRef; - use datafusion::datasource::listing::ListingTableUrl; - use datafusion::logical_plan::source_as_provider; - use datafusion::{ - arrow::datatypes::{DataType, Field, Schema}, - common::ToDFSchema, - datafusion_data_access::{ - self, - object_store::{FileMetaStream, ListEntryStream, ObjectReader, ObjectStore}, - SizedFile, - }, - datasource::listing::ListingTable, - logical_expr::{ - binary_expr, col, - logical_plan::{ - CreateExternalTable, FileType, LogicalPlan, LogicalPlanBuilder, - Repartition, - }, - Expr, Operator, - }, - prelude::*, - }; - use std::io; - use std::sync::Arc; - - #[derive(Debug)] - struct TestObjectStore {} - - #[async_trait] - impl ObjectStore for TestObjectStore { - async fn list_file( - &self, - _prefix: &str, - ) -> datafusion_data_access::Result { - Err(io::Error::new( - io::ErrorKind::Unsupported, - "this is only a test object store".to_string(), - )) - } - - async fn list_dir( - &self, - _prefix: &str, - _delimiter: Option, - ) -> datafusion_data_access::Result { - Err(io::Error::new( - io::ErrorKind::Unsupported, - "this is only a test object store".to_string(), - )) - } - - fn file_reader( - &self, - _file: SizedFile, - ) -> datafusion_data_access::Result> { - Err(io::Error::new( - io::ErrorKind::Unsupported, - "this is only a test object store".to_string(), - )) - } - } - - // Given a identity of a LogicalPlan converts it to protobuf and back, using debug formatting to test equality. - macro_rules! roundtrip_test { - ($initial_struct:ident, $proto_type:ty, $struct_type:ty) => { - let proto: $proto_type = (&$initial_struct).try_into()?; - - let round_trip: $struct_type = (&proto).try_into()?; - - assert_eq!( - format!("{:?}", $initial_struct), - format!("{:?}", round_trip) - ); - }; - ($initial_struct:ident, $struct_type:ty) => { - roundtrip_test!($initial_struct, protobuf::LogicalPlanNode, $struct_type); - }; - ($initial_struct:ident) => { - let ctx = SessionContext::new(); - let codec: BallistaCodec< - datafusion_proto::protobuf::LogicalPlanNode, - protobuf::PhysicalPlanNode, - > = BallistaCodec::default(); - let proto: datafusion_proto::protobuf::LogicalPlanNode = - datafusion_proto::protobuf::LogicalPlanNode::try_from_logical_plan( - &$initial_struct, - codec.logical_extension_codec(), - ) - .expect("from logical plan"); - let round_trip: LogicalPlan = proto - .try_into_logical_plan(&ctx, codec.logical_extension_codec()) - .expect("to logical plan"); - - assert_eq!( - format!("{:?}", $initial_struct), - format!("{:?}", round_trip) - ); - }; - ($initial_struct:ident, $ctx:ident) => { - let codec: BallistaCodec< - protobuf::LogicalPlanNode, - protobuf::PhysicalPlanNode, - > = BallistaCodec::default(); - let proto: datafusion_proto::protobuf::LogicalPlanNode = - protobuf::LogicalPlanNode::try_from_logical_plan(&$initial_struct) - .expect("from logical plan"); - let round_trip: LogicalPlan = proto - .try_into_logical_plan(&$ctx, codec.logical_extension_codec()) - .expect("to logical plan"); - - assert_eq!( - format!("{:?}", $initial_struct), - format!("{:?}", round_trip) - ); - }; - } - - #[tokio::test] - async fn roundtrip_repartition() -> Result<()> { - use datafusion::logical_plan::Partitioning; - - let test_partition_counts = [usize::MIN, usize::MAX, 43256]; - - let test_expr: Vec = - vec![col("c1") + col("c2"), Expr::Literal((4.0).into())]; - - let plan = std::sync::Arc::new( - test_scan_csv("employee", Some(vec![3, 4])) - .await? - .sort(vec![col("salary")])? - .build()?, - ); - - for partition_count in test_partition_counts.iter() { - let rr_repartition = Partitioning::RoundRobinBatch(*partition_count); - - let roundtrip_plan = LogicalPlan::Repartition(Repartition { - input: plan.clone(), - partitioning_scheme: rr_repartition, - }); - - roundtrip_test!(roundtrip_plan); - - let h_repartition = Partitioning::Hash(test_expr.clone(), *partition_count); - - let roundtrip_plan = LogicalPlan::Repartition(Repartition { - input: plan.clone(), - partitioning_scheme: h_repartition, - }); - - roundtrip_test!(roundtrip_plan); - - let no_expr_hrepartition = Partitioning::Hash(Vec::new(), *partition_count); - - let roundtrip_plan = LogicalPlan::Repartition(Repartition { - input: plan.clone(), - partitioning_scheme: no_expr_hrepartition, - }); - - roundtrip_test!(roundtrip_plan); - } - - Ok(()) - } - - #[test] - fn roundtrip_create_external_table() -> Result<()> { - let schema = test_schema(); - - let df_schema_ref = schema.to_dfschema_ref()?; - - let filetypes: [FileType; 4] = [ - FileType::NdJson, - FileType::Parquet, - FileType::CSV, - FileType::Avro, - ]; - - for file in filetypes.iter() { - let create_table_node = - LogicalPlan::CreateExternalTable(CreateExternalTable { - schema: df_schema_ref.clone(), - name: String::from("TestName"), - location: String::from("employee.csv"), - file_type: *file, - has_header: true, - delimiter: ',', - table_partition_cols: vec![], - if_not_exists: false, - }); - - roundtrip_test!(create_table_node); - } - - Ok(()) - } - - #[tokio::test] - async fn roundtrip_analyze() -> Result<()> { - let verbose_plan = test_scan_csv("employee", Some(vec![3, 4])) - .await? - .sort(vec![col("salary")])? - .explain(true, true)? - .build()?; - - let plan = test_scan_csv("employee", Some(vec![3, 4])) - .await? - .sort(vec![col("salary")])? - .explain(false, true)? - .build()?; - - roundtrip_test!(plan); - - roundtrip_test!(verbose_plan); - - Ok(()) - } - - #[tokio::test] - async fn roundtrip_explain() -> Result<()> { - let verbose_plan = test_scan_csv("employee", Some(vec![3, 4])) - .await? - .sort(vec![col("salary")])? - .explain(true, false)? - .build()?; - - let plan = test_scan_csv("employee", Some(vec![3, 4])) - .await? - .sort(vec![col("salary")])? - .explain(false, false)? - .build()?; - - roundtrip_test!(plan); - - roundtrip_test!(verbose_plan); - - Ok(()) - } - - #[tokio::test] - async fn roundtrip_join() -> Result<()> { - let scan_plan = test_scan_csv("employee1", Some(vec![0, 3, 4])) - .await? - .build()?; - let filter = binary_expr(col("employee1.x"), Operator::Gt, col("employee2.y")); - - let plan = test_scan_csv("employee2", Some(vec![0, 3, 4])) - .await? - .join( - &scan_plan, - JoinType::Inner, - (vec!["id"], vec!["id"]), - Some(filter), - )? - .build()?; - - roundtrip_test!(plan); - Ok(()) - } - - #[tokio::test] - async fn roundtrip_sort() -> Result<()> { - let plan = test_scan_csv("employee", Some(vec![3, 4])) - .await? - .sort(vec![col("salary")])? - .build()?; - roundtrip_test!(plan); - - Ok(()) - } - - #[tokio::test] - async fn roundtrip_empty_relation() -> Result<()> { - let plan_false = LogicalPlanBuilder::empty(false).build()?; - - roundtrip_test!(plan_false); - - let plan_true = LogicalPlanBuilder::empty(true).build()?; - - roundtrip_test!(plan_true); - - Ok(()) - } - - #[tokio::test] - async fn roundtrip_logical_plan() -> Result<()> { - let plan = test_scan_csv("employee", Some(vec![3, 4])) - .await? - .aggregate(vec![col("state")], vec![max(col("salary"))])? - .build()?; - - roundtrip_test!(plan); - - Ok(()) - } - - #[ignore] // see https://github.com/apache/arrow-datafusion/issues/2546 - #[tokio::test] - async fn roundtrip_logical_plan_custom_ctx() -> Result<()> { - let ctx = SessionContext::new(); - let codec: BallistaCodec< - datafusion_proto::protobuf::LogicalPlanNode, - protobuf::PhysicalPlanNode, - > = BallistaCodec::default(); - let custom_object_store = Arc::new(TestObjectStore {}); - ctx.runtime_env() - .register_object_store("test", custom_object_store.clone()); - - let table_path = "test:///employee.csv"; - let url = ListingTableUrl::parse(table_path).unwrap(); - let os = ctx.runtime_env().object_store(&url)?; - assert_eq!("TestObjectStore", &format!("{:?}", os)); - assert_eq!(table_path, &url.to_string()); - - let schema = test_schema(); - let plan = ctx - .read_csv( - table_path, - CsvReadOptions::new().schema(&schema).has_header(true), - ) - .await? - .to_logical_plan()?; - - let proto: datafusion_proto::protobuf::LogicalPlanNode = - datafusion_proto::protobuf::LogicalPlanNode::try_from_logical_plan( - &plan, - codec.logical_extension_codec(), - ) - .expect("from logical plan"); - let round_trip: LogicalPlan = proto - .try_into_logical_plan(&ctx, codec.logical_extension_codec()) - .expect("to logical plan"); - - assert_eq!(format!("{:?}", plan), format!("{:?}", round_trip)); - - let table_path = match round_trip { - LogicalPlan::TableScan(scan) => { - let source = source_as_provider(&scan.source)?; - match source.as_ref().as_any().downcast_ref::() { - Some(listing_table) => listing_table.table_path().clone(), - _ => panic!("expected a ListingTable"), - } - } - _ => panic!("expected a TableScan"), - }; - - assert_eq!(table_path.as_str(), url.as_str()); - - Ok(()) - } - - fn test_schema() -> Schema { - Schema::new(vec![ - Field::new("id", DataType::Int32, false), - Field::new("first_name", DataType::Utf8, false), - Field::new("last_name", DataType::Utf8, false), - Field::new("state", DataType::Utf8, false), - Field::new("salary", DataType::Int32, false), - ]) - } - - async fn test_scan_csv( - table_name: &str, - projection: Option>, - ) -> Result { - let schema = test_schema(); - let ctx = SessionContext::new(); - let options = CsvReadOptions::new().schema(&schema); - - let uri = format!("file:///{}.csv", table_name); - ctx.register_csv(table_name, &uri, options).await?; - - let df = ctx.table(table_name)?; - let plan = match df.to_logical_plan()? { - LogicalPlan::TableScan(ref scan) => { - let mut scan = scan.clone(); - scan.projection = projection; - let mut projected_schema = scan.projected_schema.as_ref().clone(); - projected_schema = projected_schema.replace_qualifier(table_name); - scan.projected_schema = DFSchemaRef::new(projected_schema); - LogicalPlan::TableScan(scan) - } - _ => unimplemented!(), - }; - Ok(LogicalPlanBuilder::from(plan)) - } -} diff --git a/ballista/rust/core/src/serde/mod.rs b/ballista/rust/core/src/serde/mod.rs index a955c099e..20979f12e 100644 --- a/ballista/rust/core/src/serde/mod.rs +++ b/ballista/rust/core/src/serde/mod.rs @@ -20,7 +20,7 @@ use crate::{error::BallistaError, serde::scheduler::Action as BallistaAction}; use datafusion::execution::runtime_env::RuntimeEnv; -use datafusion::logical_plan::{FunctionRegistry, JoinConstraint, JoinType, Operator}; +use datafusion::logical_plan::{FunctionRegistry, Operator}; use datafusion::physical_plan::join_utils::JoinSide; use datafusion::physical_plan::ExecutionPlan; use datafusion_proto::logical_plan::{ @@ -39,7 +39,6 @@ pub mod protobuf { include!(concat!(env!("OUT_DIR"), "/ballista.protobuf.rs")); } -pub mod logical_plan; pub mod physical_plan; pub mod scheduler; @@ -221,32 +220,6 @@ pub(crate) fn from_proto_binary_op(op: &str) -> Result } } -impl From for JoinType { - fn from(t: protobuf::JoinType) -> Self { - match t { - protobuf::JoinType::Inner => JoinType::Inner, - protobuf::JoinType::Left => JoinType::Left, - protobuf::JoinType::Right => JoinType::Right, - protobuf::JoinType::Full => JoinType::Full, - protobuf::JoinType::Semi => JoinType::Semi, - protobuf::JoinType::Anti => JoinType::Anti, - } - } -} - -impl From for protobuf::JoinType { - fn from(t: JoinType) -> Self { - match t { - JoinType::Inner => protobuf::JoinType::Inner, - JoinType::Left => protobuf::JoinType::Left, - JoinType::Right => protobuf::JoinType::Right, - JoinType::Full => protobuf::JoinType::Full, - JoinType::Semi => protobuf::JoinType::Semi, - JoinType::Anti => protobuf::JoinType::Anti, - } - } -} - impl From for JoinSide { fn from(t: protobuf::JoinSide) -> Self { match t { @@ -265,24 +238,6 @@ impl From for protobuf::JoinSide { } } -impl From for JoinConstraint { - fn from(t: protobuf::JoinConstraint) -> Self { - match t { - protobuf::JoinConstraint::On => JoinConstraint::On, - protobuf::JoinConstraint::Using => JoinConstraint::Using, - } - } -} - -impl From for protobuf::JoinConstraint { - fn from(t: JoinConstraint) -> Self { - match t { - JoinConstraint::On => protobuf::JoinConstraint::On, - JoinConstraint::Using => protobuf::JoinConstraint::Using, - } - } -} - fn byte_to_string(b: u8) -> Result { let b = &[b]; let b = std::str::from_utf8(b) @@ -397,7 +352,7 @@ mod tests { &self, exprs: &[Expr], inputs: &[LogicalPlan], - ) -> Arc { + ) -> Arc { assert_eq!(inputs.len(), 1, "input size inconsistent"); assert_eq!(exprs.len(), 1, "expression size inconsistent"); Arc::new(TopKPlanNode { @@ -494,9 +449,10 @@ mod tests { struct TopKPlanner {} + #[async_trait] impl ExtensionPlanner for TopKPlanner { /// Create a physical plan for an extension node - fn plan_extension( + async fn plan_extension( &self, _planner: &dyn PhysicalPlanner, node: &dyn UserDefinedLogicalNode, diff --git a/ballista/rust/core/src/serde/physical_plan/from_proto.rs b/ballista/rust/core/src/serde/physical_plan/from_proto.rs index d699a011c..6268ab281 100644 --- a/ballista/rust/core/src/serde/physical_plan/from_proto.rs +++ b/ballista/rust/core/src/serde/physical_plan/from_proto.rs @@ -21,13 +21,8 @@ use std::convert::{TryFrom, TryInto}; use std::ops::Deref; use std::sync::Arc; -use crate::error::BallistaError; - -use crate::convert_required; -use crate::serde::{from_proto_binary_op, proto_error, protobuf}; use chrono::{TimeZone, Utc}; - -use datafusion::datafusion_data_access::{FileMeta, SizedFile}; +use datafusion::arrow::datatypes::Schema; use datafusion::datasource::listing::{FileRange, PartitionedFile}; use datafusion::datasource::object_store::ObjectStoreUrl; use datafusion::execution::context::ExecutionProps; @@ -43,9 +38,15 @@ use datafusion::physical_plan::{ functions, Partitioning, }; use datafusion::physical_plan::{ColumnStatistics, PhysicalExpr, Statistics}; +use object_store::path::Path; +use object_store::ObjectMeta; use protobuf::physical_expr_node::ExprType; +use crate::convert_required; +use crate::error::BallistaError; +use crate::serde::{from_proto_binary_op, proto_error, protobuf}; + impl From<&protobuf::PhysicalColumn> for Column { fn from(c: &protobuf::PhysicalColumn) -> Column { Column::new(&c.name, c.index as usize) @@ -55,6 +56,7 @@ impl From<&protobuf::PhysicalColumn> for Column { pub(crate) fn parse_physical_expr( proto: &protobuf::PhysicalExprNode, registry: &dyn FunctionRegistry, + input_schema: &Schema, ) -> Result, BallistaError> { let expr_type = proto .expr_type @@ -70,9 +72,19 @@ pub(crate) fn parse_physical_expr( Arc::new(Literal::new(convert_required!(scalar.value)?)) } ExprType::BinaryExpr(binary_expr) => Arc::new(BinaryExpr::new( - parse_required_physical_box_expr(&binary_expr.l, registry, "left")?, + parse_required_physical_box_expr( + &binary_expr.l, + registry, + "left", + input_schema, + )?, from_proto_binary_op(&binary_expr.op)?, - parse_required_physical_box_expr(&binary_expr.r, registry, "right")?, + parse_required_physical_box_expr( + &binary_expr.r, + registry, + "right", + input_schema, + )?, )), ExprType::AggregateExpr(_) => { return Err(BallistaError::General( @@ -90,29 +102,33 @@ pub(crate) fn parse_physical_expr( )); } ExprType::IsNullExpr(e) => Arc::new(IsNullExpr::new( - parse_required_physical_box_expr(&e.expr, registry, "expr")?, + parse_required_physical_box_expr(&e.expr, registry, "expr", input_schema)?, )), ExprType::IsNotNullExpr(e) => Arc::new(IsNotNullExpr::new( - parse_required_physical_box_expr(&e.expr, registry, "expr")?, + parse_required_physical_box_expr(&e.expr, registry, "expr", input_schema)?, )), ExprType::NotExpr(e) => Arc::new(NotExpr::new(parse_required_physical_box_expr( - &e.expr, registry, "expr", + &e.expr, + registry, + "expr", + input_schema, )?)), ExprType::Negative(e) => Arc::new(NegativeExpr::new( - parse_required_physical_box_expr(&e.expr, registry, "expr")?, + parse_required_physical_box_expr(&e.expr, registry, "expr", input_schema)?, )), ExprType::InList(e) => Arc::new(InListExpr::new( - parse_required_physical_box_expr(&e.expr, registry, "expr")?, + parse_required_physical_box_expr(&e.expr, registry, "expr", input_schema)?, e.list .iter() - .map(|x| parse_physical_expr(x, registry)) + .map(|x| parse_physical_expr(x, registry, input_schema)) .collect::, _>>()?, e.negated, + input_schema, )), ExprType::Case(e) => Arc::new(CaseExpr::try_new( e.expr .as_ref() - .map(|e| parse_physical_expr(e.as_ref(), registry)) + .map(|e| parse_physical_expr(e.as_ref(), registry, input_schema)) .transpose()?, e.when_then_expr .iter() @@ -122,28 +138,29 @@ pub(crate) fn parse_physical_expr( &e.when_expr, registry, "when_expr", + input_schema, )?, parse_required_physical_expr( &e.then_expr, registry, "then_expr", + input_schema, )?, )) }) - .collect::, BallistaError>>()? - .as_slice(), + .collect::, BallistaError>>()?, e.else_expr .as_ref() - .map(|e| parse_physical_expr(e.as_ref(), registry)) + .map(|e| parse_physical_expr(e.as_ref(), registry, input_schema)) .transpose()?, )?), ExprType::Cast(e) => Arc::new(CastExpr::new( - parse_required_physical_box_expr(&e.expr, registry, "expr")?, + parse_required_physical_box_expr(&e.expr, registry, "expr", input_schema)?, convert_required!(e.arrow_type)?, DEFAULT_DATAFUSION_CAST_OPTIONS, )), ExprType::TryCast(e) => Arc::new(TryCastExpr::new( - parse_required_physical_box_expr(&e.expr, registry, "expr")?, + parse_required_physical_box_expr(&e.expr, registry, "expr", input_schema)?, convert_required!(e.arrow_type)?, )), ExprType::ScalarFunction(e) => { @@ -157,7 +174,7 @@ pub(crate) fn parse_physical_expr( let args = e .args .iter() - .map(|x| parse_physical_expr(x, registry)) + .map(|x| parse_physical_expr(x, registry, input_schema)) .collect::, _>>()?; // TODO Do not create new the ExecutionProps @@ -181,7 +198,7 @@ pub(crate) fn parse_physical_expr( let args = e .args .iter() - .map(|x| parse_physical_expr(x, registry)) + .map(|x| parse_physical_expr(x, registry, input_schema)) .collect::, _>>()?; Arc::new(ScalarFunctionExpr::new( @@ -200,9 +217,10 @@ fn parse_required_physical_box_expr( expr: &Option>, registry: &dyn FunctionRegistry, field: &str, + input_schema: &Schema, ) -> Result, BallistaError> { expr.as_ref() - .map(|e| parse_physical_expr(e.as_ref(), registry)) + .map(|e| parse_physical_expr(e.as_ref(), registry, input_schema)) .transpose()? .ok_or_else(|| { BallistaError::General(format!("Missing required field {:?}", field)) @@ -213,9 +231,10 @@ fn parse_required_physical_expr( expr: &Option, registry: &dyn FunctionRegistry, field: &str, + input_schema: &Schema, ) -> Result, BallistaError> { expr.as_ref() - .map(|e| parse_physical_expr(e, registry)) + .map(|e| parse_physical_expr(e, registry, input_schema)) .transpose()? .ok_or_else(|| { BallistaError::General(format!("Missing required field {:?}", field)) @@ -258,13 +277,14 @@ impl TryFrom<&protobuf::physical_window_expr_node::WindowFunction> for WindowFun pub fn parse_protobuf_hash_partitioning( partitioning: Option<&protobuf::PhysicalHashRepartition>, registry: &dyn FunctionRegistry, + input_schema: &Schema, ) -> Result, BallistaError> { match partitioning { Some(hash_part) => { let expr = hash_part .hash_expr .iter() - .map(|e| parse_physical_expr(e, registry)) + .map(|e| parse_physical_expr(e, registry, input_schema)) .collect::>, _>>()?; Ok(Some(Partitioning::Hash( @@ -281,16 +301,10 @@ impl TryFrom<&protobuf::PartitionedFile> for PartitionedFile { fn try_from(val: &protobuf::PartitionedFile) -> Result { Ok(PartitionedFile { - file_meta: FileMeta { - sized_file: SizedFile { - path: val.path.clone(), - size: val.size, - }, - last_modified: if val.last_modified_ns == 0 { - None - } else { - Some(Utc.timestamp_nanos(val.last_modified_ns as i64)) - }, + object_meta: ObjectMeta { + location: Path::from(val.path.as_str()), + last_modified: Utc.timestamp_nanos(val.last_modified_ns as i64), + size: val.size as usize, }, partition_values: val .partition_values diff --git a/ballista/rust/core/src/serde/physical_plan/mod.rs b/ballista/rust/core/src/serde/physical_plan/mod.rs index ac3cb22ea..b80e64b1e 100644 --- a/ballista/rust/core/src/serde/physical_plan/mod.rs +++ b/ballista/rust/core/src/serde/physical_plan/mod.rs @@ -127,7 +127,7 @@ impl AsExecutionPlan for PhysicalPlanNode { .expr .iter() .zip(projection.expr_name.iter()) - .map(|(expr, name)| Ok((parse_physical_expr(expr,registry)?, name.to_string()))) + .map(|(expr, name)| Ok((parse_physical_expr(expr,registry, input.schema().as_ref())?, name.to_string()))) .collect::, String)>, BallistaError>>( )?; Ok(Arc::new(ProjectionExec::try_new(exprs, input)?)) @@ -142,7 +142,9 @@ impl AsExecutionPlan for PhysicalPlanNode { let predicate = filter .expr .as_ref() - .map(|expr| parse_physical_expr(expr, registry)) + .map(|expr| { + parse_physical_expr(expr, registry, input.schema().as_ref()) + }) .transpose()? .ok_or_else(|| { BallistaError::General( @@ -200,7 +202,9 @@ impl AsExecutionPlan for PhysicalPlanNode { let expr = hash_part .hash_expr .iter() - .map(|e| parse_physical_expr(e, registry)) + .map(|e| { + parse_physical_expr(e, registry, input.schema().as_ref()) + }) .collect::>, _>>()?; Ok(Arc::new(RepartitionExec::try_new( @@ -285,7 +289,13 @@ impl AsExecutionPlan for PhysicalPlanNode { let window_node_expr = window_node .expr .as_ref() - .map(|e| parse_physical_expr(e.as_ref(), registry)) + .map(|e| { + parse_physical_expr( + e.as_ref(), + registry, + &physical_schema, + ) + }) .transpose()? .ok_or_else(|| { proto_error( @@ -347,7 +357,7 @@ impl AsExecutionPlan for PhysicalPlanNode { .iter() .zip(hash_agg.group_expr_name.iter()) .map(|(expr, name)| { - parse_physical_expr(expr, registry) + parse_physical_expr(expr, registry, input.schema().as_ref()) .map(|expr| (expr, name.to_string())) }) .collect::, _>>()?; @@ -357,7 +367,7 @@ impl AsExecutionPlan for PhysicalPlanNode { .iter() .zip(hash_agg.group_expr_name.iter()) .map(|(expr, name)| { - parse_physical_expr(expr, registry) + parse_physical_expr(expr, registry, input.schema().as_ref()) .map(|expr| (expr, name.to_string())) }) .collect::, _>>()?; @@ -409,7 +419,7 @@ impl AsExecutionPlan for PhysicalPlanNode { )?; let input_phy_expr: Vec> = agg_node.expr.iter() - .map(|e| parse_physical_expr(e, registry).unwrap()).collect(); + .map(|e| parse_physical_expr(e, registry, &physical_schema).unwrap()).collect(); Ok(create_aggregate_expr( &aggr_function.into(), @@ -457,22 +467,29 @@ impl AsExecutionPlan for PhysicalPlanNode { Ok((left, right)) }) .collect::>()?; - let join_type = protobuf::JoinType::from_i32(hashjoin.join_type) - .ok_or_else(|| { - proto_error(format!( + let join_type = + datafusion_proto::protobuf::JoinType::from_i32(hashjoin.join_type) + .ok_or_else(|| { + proto_error(format!( "Received a HashJoinNode message with unknown JoinType {}", hashjoin.join_type )) - })?; + })?; let filter = hashjoin .filter .as_ref() .map(|f| { + let schema = f + .schema + .as_ref() + .ok_or_else(|| proto_error("Missing JoinFilter schema"))? + .try_into()?; + let expression = parse_physical_expr( f.expression.as_ref().ok_or_else(|| { proto_error("Unexpected empty filter expression") })?, - registry, + registry, &schema )?; let column_indices = f.column_indices .iter() @@ -489,11 +506,6 @@ impl AsExecutionPlan for PhysicalPlanNode { }) }) .collect::, BallistaError>>()?; - let schema = f - .schema - .as_ref() - .ok_or_else(|| proto_error("Missing JoinFilter schema"))? - .try_into()?; Ok(JoinFilter::new(expression, column_indices, schema)) }) @@ -558,6 +570,7 @@ impl AsExecutionPlan for PhysicalPlanNode { let output_partitioning = parse_protobuf_hash_partitioning( shuffle_writer.output_partitioning.as_ref(), registry, + input.schema().as_ref(), )?; Ok(Arc::new(ShuffleWriterExec::try_new( @@ -613,7 +626,7 @@ impl AsExecutionPlan for PhysicalPlanNode { })? .as_ref(); Ok(PhysicalSortExpr { - expr: parse_physical_expr(expr,registry)?, + expr: parse_physical_expr(expr,registry, input.schema().as_ref())?, options: SortOptions { descending: !sort_expr.asc, nulls_first: sort_expr.nulls_first, @@ -766,7 +779,8 @@ impl AsExecutionPlan for PhysicalPlanNode { }), }) .collect(); - let join_type: protobuf::JoinType = exec.join_type().to_owned().into(); + let join_type: datafusion_proto::protobuf::JoinType = + exec.join_type().to_owned().into(); let filter = exec .filter() .as_ref() @@ -1372,6 +1386,7 @@ mod roundtrip_tests { lit(ScalarValue::Int64(Some(2))), ], false, + schema.as_ref(), )); let and = binary(not, Operator::And, in_list, &schema)?; roundtrip_test(Arc::new(FilterExec::try_new( diff --git a/ballista/rust/core/src/serde/physical_plan/to_proto.rs b/ballista/rust/core/src/serde/physical_plan/to_proto.rs index 7896dddec..16195377a 100644 --- a/ballista/rust/core/src/serde/physical_plan/to_proto.rs +++ b/ballista/rust/core/src/serde/physical_plan/to_proto.rs @@ -349,13 +349,9 @@ impl TryFrom<&PartitionedFile> for protobuf::PartitionedFile { fn try_from(pf: &PartitionedFile) -> Result { Ok(protobuf::PartitionedFile { - path: pf.file_meta.path().to_owned(), - size: pf.file_meta.size(), - last_modified_ns: pf - .file_meta - .last_modified - .map(|ts| ts.timestamp_nanos() as u64) - .unwrap_or(0), + path: pf.object_meta.location.as_ref().to_owned(), + size: pf.object_meta.size as u64, + last_modified_ns: pf.object_meta.last_modified.timestamp_nanos() as u64, partition_values: pf .partition_values .iter() diff --git a/ballista/rust/executor/Cargo.toml b/ballista/rust/executor/Cargo.toml index 9b32fb8b8..d05138ce4 100644 --- a/ballista/rust/executor/Cargo.toml +++ b/ballista/rust/executor/Cargo.toml @@ -34,14 +34,14 @@ snmalloc = ["snmalloc-rs"] [dependencies] anyhow = "1" -arrow = { version = "16.0.0" } -arrow-flight = { version = "16.0.0" } +arrow = { version = "18.0.0" } +arrow-flight = { version = "18.0.0" } async-trait = "0.1.41" ballista-core = { path = "../core", version = "0.7.0" } chrono = { version = "0.4", default-features = false } configure_me = "0.4.0" -datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "0289bfe6a98bdae371eee29d3f257b173ddb4437" } -datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev = "0289bfe6a98bdae371eee29d3f257b173ddb4437" } +datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "6a5de4fe08597896ab6375e3e4b76c5744dcfba7" } +datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev = "6a5de4fe08597896ab6375e3e4b76c5744dcfba7" } env_logger = "0.9" futures = "0.3" hyper = "0.14.4" diff --git a/ballista/rust/executor/src/execution_loop.rs b/ballista/rust/executor/src/execution_loop.rs index 1e382b022..0520ab65f 100644 --- a/ballista/rust/executor/src/execution_loop.rs +++ b/ballista/rust/executor/src/execution_loop.rs @@ -178,6 +178,7 @@ async fn run_received_tasks ExecutorServer SchedulerGrpc @@ -281,7 +285,7 @@ impl SchedulerGrpc request: Request, ) -> std::result::Result, tonic::Status> { // TODO support multiple object stores - let obj_store = Arc::new(LocalFileSystem {}) as Arc; + let obj_store: Arc = Arc::new(LocalFileSystem::new()); // TODO shouldn't this take a ListingOption object as input? let GetFileMetadataParams { path, file_type } = request.into_inner(); @@ -300,8 +304,9 @@ impl SchedulerGrpc )), }?; + let path = Path::from(path.as_str()); let file_metas: Vec<_> = obj_store - .list_file(&path) + .list(Some(&path)) .await .map_err(|e| { let msg = format!("Error listing files: {}", e); @@ -309,7 +314,12 @@ impl SchedulerGrpc tonic::Status::internal(msg) })? .try_collect() - .await?; + .await + .map_err(|e| { + let msg = format!("Error listing files: {}", e); + error!("{}", msg); + tonic::Status::internal(msg) + })?; let schema = file_format .infer_schema(&obj_store, &file_metas) @@ -556,9 +566,10 @@ fn generate_job_id() -> String { mod test { use std::sync::Arc; + use datafusion::execution::context::default_session_builder; + use datafusion_proto::protobuf::LogicalPlanNode; use tonic::Request; - use crate::state::{backend::standalone::StandaloneClient, SchedulerState}; use ballista_core::error::BallistaError; use ballista_core::serde::protobuf::{ executor_registration::OptionalHost, ExecutorRegistration, PhysicalPlanNode, @@ -566,8 +577,8 @@ mod test { }; use ballista_core::serde::scheduler::ExecutorSpecification; use ballista_core::serde::BallistaCodec; - use datafusion::execution::context::default_session_builder; - use datafusion_proto::protobuf::LogicalPlanNode; + + use crate::state::{backend::standalone::StandaloneClient, SchedulerState}; use super::{SchedulerGrpc, SchedulerServer}; diff --git a/ballista/rust/scheduler/src/scheduler_server/mod.rs b/ballista/rust/scheduler/src/scheduler_server/mod.rs index 68dede14c..10f26b1d1 100644 --- a/ballista/rust/scheduler/src/scheduler_server/mod.rs +++ b/ballista/rust/scheduler/src/scheduler_server/mod.rs @@ -15,25 +15,28 @@ // specific language governing permissions and limitations // under the License. -use crate::scheduler_server::event::{QueryStageSchedulerEvent, SchedulerServerEvent}; -use crate::scheduler_server::event_loop::SchedulerServerEventAction; -use crate::scheduler_server::query_stage_scheduler::QueryStageScheduler; -use crate::state::backend::StateBackendClient; -use crate::state::SchedulerState; +use std::collections::HashMap; +use std::sync::Arc; +use std::time::{SystemTime, UNIX_EPOCH}; + +use datafusion::execution::context::{default_session_builder, SessionState}; +use datafusion::prelude::{SessionConfig, SessionContext}; +use datafusion_proto::logical_plan::AsLogicalPlan; +use tokio::sync::RwLock; +use tonic::transport::Channel; + use ballista_core::config::{BallistaConfig, TaskSchedulingPolicy}; use ballista_core::error::Result; use ballista_core::event_loop::EventLoop; use ballista_core::serde::protobuf::executor_grpc_client::ExecutorGrpcClient; use ballista_core::serde::protobuf::TaskStatus; use ballista_core::serde::{AsExecutionPlan, BallistaCodec}; -use datafusion::execution::context::{default_session_builder, SessionState}; -use datafusion::prelude::{SessionConfig, SessionContext}; -use datafusion_proto::logical_plan::AsLogicalPlan; -use std::collections::HashMap; -use std::sync::Arc; -use std::time::{SystemTime, UNIX_EPOCH}; -use tokio::sync::RwLock; -use tonic::transport::Channel; + +use crate::scheduler_server::event::{QueryStageSchedulerEvent, SchedulerServerEvent}; +use crate::scheduler_server::event_loop::SchedulerServerEventAction; +use crate::scheduler_server::query_stage_scheduler::QueryStageScheduler; +use crate::state::backend::StateBackendClient; +use crate::state::SchedulerState; // include the generated protobuf source as a submodule #[allow(clippy::all)] @@ -221,12 +224,14 @@ pub fn update_datafusion_context( ) -> Arc { { let mut mut_state = session_ctx.state.write(); - mut_state.config.target_partitions = config.default_shuffle_partitions(); - mut_state.config.batch_size = config.default_batch_size(); - mut_state.config.repartition_joins = config.repartition_joins(); - mut_state.config.repartition_aggregations = config.repartition_aggregations(); - mut_state.config.repartition_windows = config.repartition_windows(); - mut_state.config.parquet_pruning = config.parquet_pruning(); + // TODO Currently we have to start from default session config due to the interface not support update + mut_state.config = SessionConfig::default() + .with_target_partitions(config.default_shuffle_partitions()) + .with_batch_size(config.default_batch_size()) + .with_repartition_joins(config.repartition_joins()) + .with_repartition_aggregations(config.repartition_aggregations()) + .with_repartition_windows(config.repartition_windows()) + .with_parquet_pruning(config.parquet_pruning()); } session_ctx } @@ -277,11 +282,19 @@ impl SessionContextRegistry { sessions.remove(session_id) } } + #[cfg(all(test, feature = "sled"))] mod test { use std::sync::Arc; use std::time::{Duration, Instant}; + use datafusion::arrow::datatypes::{DataType, Field, Schema}; + use datafusion::execution::context::default_session_builder; + use datafusion::logical_plan::{col, sum, LogicalPlan}; + use datafusion::prelude::{SessionConfig, SessionContext}; + use datafusion::test_util::scan_empty; + use datafusion_proto::protobuf::LogicalPlanNode; + use ballista_core::config::TaskSchedulingPolicy; use ballista_core::error::{BallistaError, Result}; use ballista_core::execution_plans::ShuffleWriterExec; @@ -290,12 +303,6 @@ mod test { }; use ballista_core::serde::scheduler::ExecutorData; use ballista_core::serde::BallistaCodec; - use datafusion::arrow::datatypes::{DataType, Field, Schema}; - use datafusion::execution::context::default_session_builder; - use datafusion::logical_plan::{col, sum, LogicalPlan}; - use datafusion::prelude::{SessionConfig, SessionContext}; - use datafusion::test_util::scan_empty; - use datafusion_proto::protobuf::LogicalPlanNode; use crate::scheduler_server::event::QueryStageSchedulerEvent; use crate::scheduler_server::SchedulerServer; diff --git a/benchmarks/Cargo.toml b/benchmarks/Cargo.toml index 9b12889ba..d27973229 100644 --- a/benchmarks/Cargo.toml +++ b/benchmarks/Cargo.toml @@ -33,8 +33,8 @@ snmalloc = ["snmalloc-rs"] [dependencies] ballista = { path = "../ballista/rust/client" } -datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "0289bfe6a98bdae371eee29d3f257b173ddb4437" } -datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev = "0289bfe6a98bdae371eee29d3f257b173ddb4437" } +datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "6a5de4fe08597896ab6375e3e4b76c5744dcfba7" } +datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev = "6a5de4fe08597896ab6375e3e4b76c5744dcfba7" } env_logger = "0.9" futures = "0.3" mimalloc = { version = "0.1", optional = true, default-features = false } diff --git a/examples/Cargo.toml b/examples/Cargo.toml index 5070aed21..c20194ac1 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -35,7 +35,7 @@ required-features = ["ballista/standalone"] [dependencies] ballista = { path = "../ballista/rust/client", version = "0.7.0" } -datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "0289bfe6a98bdae371eee29d3f257b173ddb4437" } +datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "6a5de4fe08597896ab6375e3e4b76c5744dcfba7" } futures = "0.3" num_cpus = "1.13.0" prost = "0.10"