Skip to content

Commit

Permalink
Use latest DataFusion (#86)
Browse files Browse the repository at this point in the history
* Update datafusion dependency to commit d0d5564b8f689a01e542b8c1df829d74d0fab2b0

* Fix inconsistency

* Use latest DataFusion

* Fix tomlfmt

* Fix PR review

Co-authored-by: yangzhong <[email protected]>
  • Loading branch information
yahoNanJing and kyotoYaho authored Jul 13, 2022
1 parent 489e807 commit 56ec4df
Show file tree
Hide file tree
Showing 21 changed files with 594 additions and 998 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ exclude = ["ballista-cli", "python"]

# cargo build --profile release-lto
[profile.release-lto]
inherits = "release"
codegen-units = 1
inherits = "release"
lto = true

4 changes: 2 additions & 2 deletions ballista-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ readme = "README.md"
[dependencies]
ballista = { path = "../ballista/rust/client", version = "0.7.0" }
clap = { version = "3", features = ["derive", "cargo"] }
datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "0289bfe6a98bdae371eee29d3f257b173ddb4437" }
datafusion-cli = { git = "https://github.com/apache/arrow-datafusion", rev = "0289bfe6a98bdae371eee29d3f257b173ddb4437" }
datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "6a5de4fe08597896ab6375e3e4b76c5744dcfba7" }
datafusion-cli = { git = "https://github.com/apache/arrow-datafusion", rev = "6a5de4fe08597896ab6375e3e4b76c5744dcfba7" }
dirs = "4.0.0"
env_logger = "0.9"
mimalloc = { version = "0.1", default-features = false }
Expand Down
4 changes: 2 additions & 2 deletions ballista/rust/client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ rust-version = "1.59"
ballista-core = { path = "../core", version = "0.7.0" }
ballista-executor = { path = "../executor", version = "0.7.0", optional = true }
ballista-scheduler = { path = "../scheduler", version = "0.7.0", optional = true }
datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "0289bfe6a98bdae371eee29d3f257b173ddb4437" }
datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev = "0289bfe6a98bdae371eee29d3f257b173ddb4437" }
datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "6a5de4fe08597896ab6375e3e4b76c5744dcfba7" }
datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev = "6a5de4fe08597896ab6375e3e4b76c5744dcfba7" }
futures = "0.3"
log = "0.4"
parking_lot = "0.12"
Expand Down
14 changes: 10 additions & 4 deletions ballista/rust/client/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,8 @@ impl BallistaContext {

#[cfg(test)]
mod tests {
#[cfg(feature = "standalone")]
use datafusion::datasource::listing::ListingTableUrl;

#[tokio::test]
#[cfg(feature = "standalone")]
Expand Down Expand Up @@ -591,10 +593,14 @@ mod tests {
target_partitions: x.target_partitions,
};

let config =
ListingTableConfig::new(listing_table.table_path().clone())
.with_schema(Arc::new(Schema::new(vec![])))
.with_listing_options(error_options);
let table_paths = listing_table
.table_paths()
.iter()
.map(|t| ListingTableUrl::parse(t).unwrap())
.collect();
let config = ListingTableConfig::new_with_multi_paths(table_paths)
.with_schema(Arc::new(Schema::new(vec![])))
.with_listing_options(error_options);

let error_table = ListingTable::try_new(config).unwrap();

Expand Down
7 changes: 4 additions & 3 deletions ballista/rust/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,18 @@ simd = ["datafusion/simd"]
[dependencies]
ahash = { version = "0.7", default-features = false }

arrow-flight = { version = "16.0.0" }
arrow-flight = { version = "18.0.0" }
async-trait = "0.1.41"
chrono = { version = "0.4", default-features = false }
clap = { version = "3", features = ["derive", "cargo"] }
datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "0289bfe6a98bdae371eee29d3f257b173ddb4437" }
datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev = "0289bfe6a98bdae371eee29d3f257b173ddb4437" }
datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "6a5de4fe08597896ab6375e3e4b76c5744dcfba7" }
datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev = "6a5de4fe08597896ab6375e3e4b76c5744dcfba7" }
futures = "0.3"
hashbrown = "0.12"

libloading = "0.7.3"
log = "0.4"
object_store = "0.3.0"
once_cell = "1.9.0"

parking_lot = "0.12"
Expand Down
223 changes: 3 additions & 220 deletions ballista/rust/core/proto/ballista.proto
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ syntax = "proto3";
package ballista.protobuf;

option java_multiple_files = true;
option java_package = "org.ballistacompute.protobuf";
option java_package = "org.apache.arrow.ballista.protobuf";
option java_outer_classname = "BallistaProto";

import "datafusion.proto";
Expand All @@ -30,43 +30,6 @@ import "datafusion.proto";
// Ballista Logical Plan
///////////////////////////////////////////////////////////////////////////////////////////////////

// LogicalPlan is a nested type
message LogicalPlanNode {
oneof LogicalPlanType {
ListingTableScanNode listing_scan = 1;
ProjectionNode projection = 3;
SelectionNode selection = 4;
LimitNode limit = 5;
AggregateNode aggregate = 6;
JoinNode join = 7;
SortNode sort = 8;
RepartitionNode repartition = 9;
EmptyRelationNode empty_relation = 10;
CreateExternalTableNode create_external_table = 11;
ExplainNode explain = 12;
WindowNode window = 13;
AnalyzeNode analyze = 14;
CrossJoinNode cross_join = 15;
ValuesNode values = 16;
LogicalExtensionNode extension = 17;
CreateCatalogSchemaNode create_catalog_schema = 18;
UnionNode union = 19;
CreateCatalogNode create_catalog = 20;
SubqueryAliasNode subquery_alias = 21;
CreateViewNode create_view = 22;
OffsetNode offset = 23;
}
}

message LogicalExtensionNode {
bytes node = 1;
repeated LogicalPlanNode inputs = 2;
}

message ProjectionColumns {
repeated string columns = 1;
}

message Statistics {
int64 num_rows = 1;
int64 total_byte_size = 2;
Expand All @@ -87,186 +50,6 @@ message PartitionedFile {
FileRange range = 5;
}

message CsvFormat {
bool has_header = 1;
string delimiter = 2;
}

message ParquetFormat {
bool enable_pruning = 1;
}

message AvroFormat {}

message ListingTableScanNode {
string table_name = 1;
string path = 2;
string file_extension = 3;
ProjectionColumns projection = 4;
datafusion.Schema schema = 5;
repeated datafusion.LogicalExprNode filters = 6;
repeated string table_partition_cols = 7;
bool collect_stat = 8;
uint32 target_partitions = 9;
oneof FileFormatType {
CsvFormat csv = 10;
ParquetFormat parquet = 11;
AvroFormat avro = 12;
}
}

message ProjectionNode {
LogicalPlanNode input = 1;
repeated datafusion.LogicalExprNode expr = 2;
oneof optional_alias {
string alias = 3;
}
}

message SelectionNode {
LogicalPlanNode input = 1;
datafusion.LogicalExprNode expr = 2;
}

message SortNode {
LogicalPlanNode input = 1;
repeated datafusion.LogicalExprNode expr = 2;
}

message RepartitionNode {
LogicalPlanNode input = 1;
oneof partition_method {
uint64 round_robin = 2;
HashRepartition hash = 3;
}
}

message HashRepartition {
repeated datafusion.LogicalExprNode hash_expr = 1;
uint64 partition_count = 2;
}

message EmptyRelationNode {
bool produce_one_row = 1;
}

message CreateExternalTableNode {
string name = 1;
string location = 2;
FileType file_type = 3;
bool has_header = 4;
datafusion.DfSchema schema = 5;
repeated string table_partition_cols = 6;
bool if_not_exists = 7;
string delimiter = 8;
}

message CreateCatalogSchemaNode {
string schema_name = 1;
bool if_not_exists = 2;
datafusion.DfSchema schema = 3;
}

message CreateCatalogNode {
string catalog_name = 1;
bool if_not_exists = 2;
datafusion.DfSchema schema = 3;
}

message CreateViewNode {
string name = 1;
LogicalPlanNode input = 2;
bool or_replace = 3;
}

// a node containing data for defining values list. unlike in SQL where it's two dimensional, here
// the list is flattened, and with the field n_cols it can be parsed and partitioned into rows
message ValuesNode {
uint64 n_cols = 1;
repeated datafusion.LogicalExprNode values_list = 2;
}

enum FileType {
NdJson = 0;
Parquet = 1;
CSV = 2;
Avro = 3;
}

message AnalyzeNode {
LogicalPlanNode input = 1;
bool verbose = 2;
}

message ExplainNode {
LogicalPlanNode input = 1;
bool verbose = 2;
}

message AggregateNode {
LogicalPlanNode input = 1;
repeated datafusion.LogicalExprNode group_expr = 2;
repeated datafusion.LogicalExprNode aggr_expr = 3;
}

message WindowNode {
LogicalPlanNode input = 1;
repeated datafusion.LogicalExprNode window_expr = 2;
}

enum JoinType {
INNER = 0;
LEFT = 1;
RIGHT = 2;
FULL = 3;
SEMI = 4;
ANTI = 5;
}

enum JoinConstraint {
ON = 0;
USING = 1;
}

message JoinNode {
LogicalPlanNode left = 1;
LogicalPlanNode right = 2;
JoinType join_type = 3;
JoinConstraint join_constraint = 4;
repeated datafusion.Column left_join_column = 5;
repeated datafusion.Column right_join_column = 6;
bool null_equals_null = 7;
datafusion.LogicalExprNode filter = 8;
}

message UnionNode {
repeated LogicalPlanNode inputs = 1;
}

message CrossJoinNode {
LogicalPlanNode left = 1;
LogicalPlanNode right = 2;
}

message LimitNode {
LogicalPlanNode input = 1;
uint32 limit = 2;
}

message OffsetNode {
LogicalPlanNode input = 1;
uint32 offset = 2;
}

message SelectionExecNode {
datafusion.LogicalExprNode expr = 1;
}

message SubqueryAliasNode {
LogicalPlanNode input = 1;
string alias = 2;
}

///////////////////////////////////////////////////////////////////////////////////////////////////
// Ballista Physical Plan
///////////////////////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -480,7 +263,7 @@ message HashJoinExecNode {
PhysicalPlanNode left = 1;
PhysicalPlanNode right = 2;
repeated JoinOn on = 3;
JoinType join_type = 4;
datafusion.JoinType join_type = 4;
PartitionMode partition_mode = 6;
bool null_equals_null = 7;
JoinFilter filter = 8;
Expand Down Expand Up @@ -893,7 +676,7 @@ message GetJobStatusResult {

message GetFileMetadataParams {
string path = 1;
FileType file_type = 2;
datafusion.FileType file_type = 2;
}

message GetFileMetadataResult {
Expand Down
Loading

0 comments on commit 56ec4df

Please sign in to comment.