From 6402200c3ff1ca9732e85ca39fc89ae5f30cd965 Mon Sep 17 00:00:00 2001 From: Guillaume Balaine Date: Wed, 15 Sep 2021 16:30:36 +0200 Subject: [PATCH] Avro Table Provider (#910) * Add avro as a datasource, file and table provider * wip * Added support composite identifiers for struct type. * Fixed build. * cheat and add unions to valid composite column types * Implement the AvroArrayReader * Add binary types * Enable Avro as a FileType * Enable registering an avro table in the sql parsing * Change package name for datafusion/avro * Implement Avro datasource tests and fix avro_rs::Value resolution to Arrow types * Test for AvroExec::try_from_path * external table avro test * Basic schema conversion tests * Complete test for avro_to_arrow_reader on alltypes_dictionnary * fix_stable: .rewind is 'unstable' * Fix license files and remove the unused avro-converter crate * fix example test in avro_to_arrow * add avro_sql test to default workflow * Adress clippies * Enable avro as a valid datasource for client execution * Add avro to available logical plan nodes * Add ToTimestampMillis as a scalar function in protos * Allow Avro in PhysicalPlan nodes * Remove remaining confusing references to 'json' in avro mod * rename 'parquet' words in avro test and examples * Handle Union of nested lists in arrow reader * test timestamp arrays * remove debug statement * Make avro optional * Remove debug statement * Remove GetField usage (see #628) * Fix docstring in parser tests * Test batch output rather than just rows individually * Remove 'csv' from error strings in physical_plan::avro * Avro sample sql and explain queries tests in sql.rs * Activate avro feature for cargo tests in github workflow * Add a test for avro registering multiple files in a single table * Switch to Result instead of Option for resolve_string * Address missing clippy warning should_implement_trait in arrow_to_avro/reader * Add fmt display implementation for AvroExec * ci: fix cargo sql run example, use datafusion/avro feature instead of 'avro' * license: missing license file for avro_to_arrow/schema.rs * only run avro datasource tests if features have 'avro' * refactor: rename infer_avro_schema_from_reader to read_avro_schema_from_reader * Pass None as props to avro schema schema_to_field_with_props until further notice * Change schema inferance to FixedSizeBinary(16) for Uuid * schema: prefix metadata coming from avro with 'avro' * make num traits optional and part of the avro feature flag * Fix avro schema tests regarding external props * split avro physical plan test feature wise and add a non-implemented test * submodule: switch back to apache/arrow-testing * fix_test: columns are now prefixed in the plan * avro_test: fix clippy warning cmp-owned * avro: move statistics to the physical plan * Increase min stack size for cargo tests Co-authored-by: Jorge C. Leitao --- .github/workflows/rust.yml | 5 +- ballista/rust/client/src/context.rs | 40 + ballista/rust/core/proto/ballista.proto | 24 + .../core/src/serde/logical_plan/from_proto.rs | 29 + .../rust/core/src/serde/logical_plan/mod.rs | 8 +- .../core/src/serde/logical_plan/to_proto.rs | 30 +- .../src/serde/physical_plan/from_proto.rs | 17 + .../core/src/serde/physical_plan/to_proto.rs | 23 + datafusion-examples/Cargo.toml | 4 + datafusion-examples/examples/avro_sql.rs | 49 + datafusion/Cargo.toml | 5 +- .../src/avro_to_arrow/arrow_array_reader.rs | 1090 +++++++++++++++++ datafusion/src/avro_to_arrow/mod.rs | 47 + datafusion/src/avro_to_arrow/reader.rs | 281 +++++ datafusion/src/avro_to_arrow/schema.rs | 464 +++++++ datafusion/src/datasource/avro.rs | 424 +++++++ datafusion/src/datasource/mod.rs | 1 + datafusion/src/error.rs | 16 + datafusion/src/execution/context.rs | 32 + datafusion/src/lib.rs | 1 + datafusion/src/logical_plan/builder.rs | 23 + datafusion/src/logical_plan/dfschema.rs | 1 - datafusion/src/logical_plan/expr.rs | 16 +- datafusion/src/physical_plan/avro.rs | 457 +++++++ datafusion/src/physical_plan/common.rs | 14 + .../src/physical_plan/datetime_expressions.rs | 15 +- datafusion/src/physical_plan/mod.rs | 1 + datafusion/src/physical_plan/source.rs | 2 +- .../src/physical_plan/string_expressions.rs | 1 + datafusion/src/sql/parser.rs | 18 +- datafusion/src/sql/planner.rs | 1 + datafusion/tests/sql.rs | 155 ++- testing | 2 +- 33 files changed, 3258 insertions(+), 38 deletions(-) create mode 100644 datafusion-examples/examples/avro_sql.rs create mode 100644 datafusion/src/avro_to_arrow/arrow_array_reader.rs create mode 100644 datafusion/src/avro_to_arrow/mod.rs create mode 100644 datafusion/src/avro_to_arrow/reader.rs create mode 100644 datafusion/src/avro_to_arrow/schema.rs create mode 100644 datafusion/src/datasource/avro.rs create mode 100644 datafusion/src/physical_plan/avro.rs diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index a75af64f20f5..d62b996f6903 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -105,13 +105,14 @@ jobs: run: | export ARROW_TEST_DATA=$(pwd)/testing/data export PARQUET_TEST_DATA=$(pwd)/parquet-testing/data - # run tests on all workspace members with default feature list - cargo test + # run tests on all workspace members with default feature list + avro + RUST_MIN_STACK=10485760 cargo test --features=avro # test datafusion examples cd datafusion-examples cargo test --no-default-features cargo run --example csv_sql cargo run --example parquet_sql + cargo run --example avro_sql --features=datafusion/avro env: CARGO_HOME: "/github/home/.cargo" CARGO_TARGET_DIR: "/github/home/target" diff --git a/ballista/rust/client/src/context.rs b/ballista/rust/client/src/context.rs index ee2f65699309..3671f349d45b 100644 --- a/ballista/rust/client/src/context.rs +++ b/ballista/rust/client/src/context.rs @@ -32,6 +32,7 @@ use datafusion::dataframe::DataFrame; use datafusion::error::{DataFusionError, Result}; use datafusion::execution::dataframe_impl::DataFrameImpl; use datafusion::logical_plan::LogicalPlan; +use datafusion::physical_plan::avro::AvroReadOptions; use datafusion::physical_plan::csv::CsvReadOptions; use datafusion::sql::parser::FileType; @@ -125,6 +126,30 @@ impl BallistaContext { }) } + /// Create a DataFrame representing an Avro table scan + + pub fn read_avro( + &self, + path: &str, + options: AvroReadOptions, + ) -> Result> { + // convert to absolute path because the executor likely has a different working directory + let path = PathBuf::from(path); + let path = fs::canonicalize(&path)?; + + // use local DataFusion context for now but later this might call the scheduler + let mut ctx = { + let guard = self.state.lock().unwrap(); + create_df_ctx_with_ballista_query_planner( + &guard.scheduler_host, + guard.scheduler_port, + guard.config(), + ) + }; + let df = ctx.read_avro(path.to_str().unwrap(), options)?; + Ok(df) + } + /// Create a DataFrame representing a Parquet table scan pub fn read_parquet(&self, path: &str) -> Result> { @@ -193,6 +218,17 @@ impl BallistaContext { self.register_table(name, df.as_ref()) } + pub fn register_avro( + &self, + name: &str, + path: &str, + options: AvroReadOptions, + ) -> Result<()> { + let df = self.read_avro(path, options)?; + self.register_table(name, df.as_ref())?; + Ok(()) + } + /// Create a DataFrame from a SQL statement pub fn sql(&self, sql: &str) -> Result> { let mut ctx = { @@ -240,6 +276,10 @@ impl BallistaContext { self.register_parquet(name, location)?; Ok(Arc::new(DataFrameImpl::new(ctx.state, &plan))) } + FileType::Avro => { + self.register_avro(name, location, AvroReadOptions::default())?; + Ok(Arc::new(DataFrameImpl::new(ctx.state, &plan))) + } _ => Err(DataFusionError::NotImplemented(format!( "Unsupported file type {:?}.", file_type diff --git a/ballista/rust/core/proto/ballista.proto b/ballista/rust/core/proto/ballista.proto index dd9978f5c26d..3fc291e3a83f 100644 --- a/ballista/rust/core/proto/ballista.proto +++ b/ballista/rust/core/proto/ballista.proto @@ -152,6 +152,7 @@ enum ScalarFunction { SHA384 = 32; SHA512 = 33; LN = 34; + TOTIMESTAMPMILLIS = 35; } message ScalarFunctionNode { @@ -253,6 +254,7 @@ message LogicalPlanNode { WindowNode window = 13; AnalyzeNode analyze = 14; CrossJoinNode cross_join = 15; + AvroTableScanNode avro_scan = 16; } } @@ -296,6 +298,15 @@ message ParquetTableScanNode { repeated LogicalExprNode filters = 4; } +message AvroTableScanNode { + string table_name = 1; + string path = 2; + string file_extension = 3; + ProjectionColumns projection = 4; + Schema schema = 5; + repeated LogicalExprNode filters = 6; +} + message ProjectionNode { LogicalPlanNode input = 1; repeated LogicalExprNode expr = 2; @@ -340,6 +351,7 @@ enum FileType{ NdJson = 0; Parquet = 1; CSV = 2; + Avro = 3; } message AnalyzeNode { @@ -456,6 +468,7 @@ message PhysicalPlanNode { WindowAggExecNode window = 17; ShuffleWriterExecNode shuffle_writer = 18; CrossJoinExecNode cross_join = 19; + AvroScanExecNode avro_scan = 20; } } @@ -609,6 +622,17 @@ message CsvScanExecNode { repeated string filename = 8; } +message AvroScanExecNode { + string path = 1; + repeated uint32 projection = 2; + Schema schema = 3; + string file_extension = 4; + uint32 batch_size = 5; + + // partition filenames + repeated string filename = 8; +} + enum PartitionMode { COLLECT_LEFT = 0; PARTITIONED = 1; diff --git a/ballista/rust/core/src/serde/logical_plan/from_proto.rs b/ballista/rust/core/src/serde/logical_plan/from_proto.rs index 38de341ed01d..8ffdb650aa21 100644 --- a/ballista/rust/core/src/serde/logical_plan/from_proto.rs +++ b/ballista/rust/core/src/serde/logical_plan/from_proto.rs @@ -32,6 +32,7 @@ use datafusion::logical_plan::{ LogicalPlan, LogicalPlanBuilder, Operator, }; use datafusion::physical_plan::aggregates::AggregateFunction; +use datafusion::physical_plan::avro::AvroReadOptions; use datafusion::physical_plan::csv::CsvReadOptions; use datafusion::physical_plan::window_functions::BuiltInWindowFunction; use datafusion::scalar::ScalarValue; @@ -171,6 +172,32 @@ impl TryInto for &protobuf::LogicalPlanNode { .build() .map_err(|e| e.into()) } + LogicalPlanType::AvroScan(scan) => { + let schema: Schema = convert_required!(scan.schema)?; + let options = AvroReadOptions { + schema: Some(Arc::new(schema.clone())), + file_extension: &scan.file_extension, + }; + + let mut projection = None; + if let Some(columns) = &scan.projection { + let column_indices = columns + .columns + .iter() + .map(|name| schema.index_of(name)) + .collect::, _>>()?; + projection = Some(column_indices); + } + + LogicalPlanBuilder::scan_avro_with_name( + &scan.path, + options, + projection, + &scan.table_name, + )? + .build() + .map_err(|e| e.into()) + } LogicalPlanType::Sort(sort) => { let input: LogicalPlan = convert_box_required!(sort.input)?; let sort_expr: Vec = sort @@ -1193,6 +1220,7 @@ impl TryFrom for protobuf::FileType { _x if _x == FileType::NdJson as i32 => Ok(FileType::NdJson), _x if _x == FileType::Parquet as i32 => Ok(FileType::Parquet), _x if _x == FileType::Csv as i32 => Ok(FileType::Csv), + _x if _x == FileType::Avro as i32 => Ok(FileType::Avro), invalid => Err(BallistaError::General(format!( "Attempted to convert invalid i32 to protobuf::Filetype: {}", invalid @@ -1209,6 +1237,7 @@ impl Into for protobuf::FileType { protobuf::FileType::NdJson => FileType::NdJson, protobuf::FileType::Parquet => FileType::Parquet, protobuf::FileType::Csv => FileType::CSV, + protobuf::FileType::Avro => FileType::Avro, } } } diff --git a/ballista/rust/core/src/serde/logical_plan/mod.rs b/ballista/rust/core/src/serde/logical_plan/mod.rs index dbaac1de7b57..ada3c85de674 100644 --- a/ballista/rust/core/src/serde/logical_plan/mod.rs +++ b/ballista/rust/core/src/serde/logical_plan/mod.rs @@ -643,8 +643,12 @@ mod roundtrip_tests { let df_schema_ref = schema.to_dfschema_ref()?; - let filetypes: [FileType; 3] = - [FileType::NdJson, FileType::Parquet, FileType::CSV]; + let filetypes: [FileType; 4] = [ + FileType::NdJson, + FileType::Parquet, + FileType::CSV, + FileType::Avro, + ]; for file in filetypes.iter() { let create_table_node = LogicalPlan::CreateExternalTable { diff --git a/ballista/rust/core/src/serde/logical_plan/to_proto.rs b/ballista/rust/core/src/serde/logical_plan/to_proto.rs index 10bc63e4807b..e195e82df48e 100644 --- a/ballista/rust/core/src/serde/logical_plan/to_proto.rs +++ b/ballista/rust/core/src/serde/logical_plan/to_proto.rs @@ -25,6 +25,7 @@ use crate::serde::{protobuf, BallistaError}; use datafusion::arrow::datatypes::{ DataType, Field, IntervalUnit, Schema, SchemaRef, TimeUnit, }; +use datafusion::datasource::avro::AvroFile; use datafusion::datasource::{CsvFile, PartitionedFile, TableDescriptor}; use datafusion::logical_plan::{ window_frames::{WindowFrame, WindowFrameBound, WindowFrameUnits}, @@ -793,6 +794,19 @@ impl TryInto for &LogicalPlan { }, )), }) + } else if let Some(avro) = source.downcast_ref::() { + Ok(protobuf::LogicalPlanNode { + logical_plan_type: Some(LogicalPlanType::AvroScan( + protobuf::AvroTableScanNode { + table_name: table_name.to_owned(), + path: avro.path().to_owned(), + projection, + schema: Some(schema), + file_extension: avro.file_extension().to_string(), + filters, + }, + )), + }) } else { Err(BallistaError::General(format!( "logical plan to_proto unsupported table provider {:?}", @@ -974,6 +988,7 @@ impl TryInto for &LogicalPlan { FileType::NdJson => protobuf::FileType::NdJson, FileType::Parquet => protobuf::FileType::Parquet, FileType::CSV => protobuf::FileType::Csv, + FileType::Avro => protobuf::FileType::Avro, }; Ok(protobuf::LogicalPlanNode { @@ -1098,7 +1113,13 @@ impl TryInto for &Expr { ) } }; - let arg = &args[0]; + let arg_expr: Option> = if !args.is_empty() + { + let arg = &args[0]; + Some(Box::new(arg.try_into()?)) + } else { + None + }; let partition_by = partition_by .iter() .map(|e| e.try_into()) @@ -1111,7 +1132,7 @@ impl TryInto for &Expr { protobuf::window_expr_node::WindowFrame::Frame(window_frame.into()) }); let window_expr = Box::new(protobuf::WindowExprNode { - expr: Some(Box::new(arg.try_into()?)), + expr: arg_expr, window_function: Some(window_function), partition_by, order_by, @@ -1284,7 +1305,7 @@ impl TryInto for &Expr { Expr::Wildcard => Ok(protobuf::LogicalExprNode { expr_type: Some(protobuf::logical_expr_node::ExprType::Wildcard(true)), }), - Expr::TryCast { .. } => unimplemented!(), + _ => unimplemented!(), } } } @@ -1473,6 +1494,9 @@ impl TryInto for &BuiltinScalarFunction { BuiltinScalarFunction::SHA256 => Ok(protobuf::ScalarFunction::Sha256), BuiltinScalarFunction::SHA384 => Ok(protobuf::ScalarFunction::Sha384), BuiltinScalarFunction::SHA512 => Ok(protobuf::ScalarFunction::Sha512), + BuiltinScalarFunction::ToTimestampMillis => { + Ok(protobuf::ScalarFunction::Totimestampmillis) + } _ => Err(BallistaError::General(format!( "logical_plan::to_proto() unsupported scalar function {:?}", self diff --git a/ballista/rust/core/src/serde/physical_plan/from_proto.rs b/ballista/rust/core/src/serde/physical_plan/from_proto.rs index 3cd8cf3871cf..0d233725fc9f 100644 --- a/ballista/rust/core/src/serde/physical_plan/from_proto.rs +++ b/ballista/rust/core/src/serde/physical_plan/from_proto.rs @@ -43,6 +43,7 @@ use datafusion::logical_plan::{ window_frames::WindowFrame, DFSchema, Expr, JoinConstraint, JoinType, }; use datafusion::physical_plan::aggregates::{create_aggregate_expr, AggregateFunction}; +use datafusion::physical_plan::avro::{AvroExec, AvroReadOptions}; use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec; use datafusion::physical_plan::hash_aggregate::{AggregateMode, HashAggregateExec}; use datafusion::physical_plan::hash_join::PartitionMode; @@ -153,6 +154,21 @@ impl TryInto> for &protobuf::PhysicalPlanNode { None, ))) } + PhysicalPlanType::AvroScan(scan) => { + let schema = Arc::new(convert_required!(scan.schema)?); + let options = AvroReadOptions { + schema: Some(schema), + file_extension: &scan.file_extension, + }; + let projection = scan.projection.iter().map(|i| *i as usize).collect(); + Ok(Arc::new(AvroExec::try_from_path( + &scan.path, + options, + Some(projection), + scan.batch_size as usize, + None, + )?)) + } PhysicalPlanType::CoalesceBatches(coalesce_batches) => { let input: Arc = convert_box_required!(coalesce_batches.input)?; @@ -544,6 +560,7 @@ impl From<&protobuf::ScalarFunction> for BuiltinScalarFunction { ScalarFunction::Sha384 => BuiltinScalarFunction::SHA384, ScalarFunction::Sha512 => BuiltinScalarFunction::SHA512, ScalarFunction::Ln => BuiltinScalarFunction::Ln, + ScalarFunction::Totimestampmillis => BuiltinScalarFunction::ToTimestampMillis, } } } diff --git a/ballista/rust/core/src/serde/physical_plan/to_proto.rs b/ballista/rust/core/src/serde/physical_plan/to_proto.rs index e7d4ac652874..22a49cb881ba 100644 --- a/ballista/rust/core/src/serde/physical_plan/to_proto.rs +++ b/ballista/rust/core/src/serde/physical_plan/to_proto.rs @@ -62,6 +62,7 @@ use crate::execution_plans::{ use crate::serde::protobuf::repartition_exec_node::PartitionMethod; use crate::serde::scheduler::PartitionLocation; use crate::serde::{protobuf, BallistaError}; +use datafusion::physical_plan::avro::AvroExec; use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec; use datafusion::physical_plan::functions::{BuiltinScalarFunction, ScalarFunctionExpr}; use datafusion::physical_plan::repartition::RepartitionExec; @@ -285,6 +286,28 @@ impl TryInto for Arc { }, )), }) + } else if let Some(exec) = plan.downcast_ref::() { + Ok(protobuf::PhysicalPlanNode { + physical_plan_type: Some(PhysicalPlanType::AvroScan( + protobuf::AvroScanExecNode { + path: exec.path().to_owned(), + filename: exec.filenames().to_vec(), + projection: exec + .projection() + .ok_or_else(|| { + BallistaError::General( + "projection in AvroExec doesn't exist.".to_owned(), + ) + })? + .iter() + .map(|n| *n as u32) + .collect(), + file_extension: exec.file_extension().to_owned(), + schema: Some(exec.file_schema().as_ref().into()), + batch_size: exec.batch_size() as u32, + }, + )), + }) } else if let Some(exec) = plan.downcast_ref::() { let mut partition = vec![]; for location in &exec.partition { diff --git a/datafusion-examples/Cargo.toml b/datafusion-examples/Cargo.toml index 9b859c6238f8..113cd5bb9103 100644 --- a/datafusion-examples/Cargo.toml +++ b/datafusion-examples/Cargo.toml @@ -27,6 +27,10 @@ keywords = [ "arrow", "query", "sql" ] edition = "2018" publish = false +[[example]] +name = "avro_sql" +path = "examples/avro_sql.rs" +required-features = ["datafusion/avro"] [dev-dependencies] arrow-flight = { version = "^5.3" } diff --git a/datafusion-examples/examples/avro_sql.rs b/datafusion-examples/examples/avro_sql.rs new file mode 100644 index 000000000000..e9676a05b1fc --- /dev/null +++ b/datafusion-examples/examples/avro_sql.rs @@ -0,0 +1,49 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use datafusion::arrow::util::pretty; + +use datafusion::error::Result; +use datafusion::physical_plan::avro::AvroReadOptions; +use datafusion::prelude::*; + +/// This example demonstrates executing a simple query against an Arrow data source (Avro) and +/// fetching results +#[tokio::main] +async fn main() -> Result<()> { + // create local execution context + let mut ctx = ExecutionContext::new(); + + let testdata = datafusion::arrow::util::test_util::arrow_test_data(); + + // register avro file with the execution context + let avro_file = &format!("{}/avro/alltypes_plain.avro", testdata); + ctx.register_avro("alltypes_plain", avro_file, AvroReadOptions::default())?; + + // execute the query + let df = ctx.sql( + "SELECT int_col, double_col, CAST(date_string_col as VARCHAR) \ + FROM alltypes_plain \ + WHERE id > 1 AND tinyint_col < double_col", + )?; + let results = df.collect().await?; + + // print the results + pretty::print_batches(&results)?; + + Ok(()) +} diff --git a/datafusion/Cargo.toml b/datafusion/Cargo.toml index f30db0296565..c1998bea13bb 100644 --- a/datafusion/Cargo.toml +++ b/datafusion/Cargo.toml @@ -44,7 +44,8 @@ regex_expressions = ["regex", "lazy_static"] unicode_expressions = ["unicode-segmentation"] # Used for testing ONLY: causes all values to hash to the same value (test for collisions) force_hash_collisions = [] - +# Used to enable the avro format +avro = ["avro-rs", "num-traits"] [dependencies] ahash = "0.7" @@ -69,6 +70,8 @@ regex = { version = "^1.4.3", optional = true } lazy_static = { version = "^1.4.0", optional = true } smallvec = { version = "1.6", features = ["union"] } rand = "0.8" +avro-rs = { version = "0.13", features = ["snappy"], optional = true } +num-traits = { version = "0.2", optional = true } [dev-dependencies] criterion = "0.3" diff --git a/datafusion/src/avro_to_arrow/arrow_array_reader.rs b/datafusion/src/avro_to_arrow/arrow_array_reader.rs new file mode 100644 index 000000000000..cc8ed8e66942 --- /dev/null +++ b/datafusion/src/avro_to_arrow/arrow_array_reader.rs @@ -0,0 +1,1090 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Avro to Arrow array readers + +use crate::arrow::array::{ + make_array, Array, ArrayBuilder, ArrayData, ArrayDataBuilder, ArrayRef, + BooleanBuilder, LargeStringArray, ListBuilder, NullArray, OffsetSizeTrait, + PrimitiveArray, PrimitiveBuilder, StringArray, StringBuilder, + StringDictionaryBuilder, +}; +use crate::arrow::buffer::{Buffer, MutableBuffer}; +use crate::arrow::datatypes::{ + ArrowDictionaryKeyType, ArrowNumericType, ArrowPrimitiveType, DataType, Date32Type, + Date64Type, Field, Float32Type, Float64Type, Int16Type, Int32Type, Int64Type, + Int8Type, Schema, Time32MillisecondType, Time32SecondType, Time64MicrosecondType, + Time64NanosecondType, TimeUnit, TimestampMicrosecondType, TimestampMillisecondType, + TimestampNanosecondType, TimestampSecondType, UInt16Type, UInt32Type, UInt64Type, + UInt8Type, +}; +use crate::arrow::error::ArrowError; +use crate::arrow::record_batch::RecordBatch; +use crate::arrow::util::bit_util; +use crate::error::{DataFusionError, Result}; +use arrow::array::{BinaryArray, GenericListArray}; +use arrow::datatypes::SchemaRef; +use arrow::error::ArrowError::SchemaError; +use arrow::error::Result as ArrowResult; +use avro_rs::{ + schema::{Schema as AvroSchema, SchemaKind}, + types::Value, + AvroResult, Error as AvroError, Reader as AvroReader, +}; +use num_traits::NumCast; +use std::collections::HashMap; +use std::io::Read; +use std::sync::Arc; + +type RecordSlice<'a> = &'a [Vec<(String, Value)>]; + +pub struct AvroArrowArrayReader<'a, R: Read> { + reader: AvroReader<'a, R>, + schema: SchemaRef, + projection: Option>, + schema_lookup: HashMap, +} + +impl<'a, R: Read> AvroArrowArrayReader<'a, R> { + pub fn try_new( + reader: R, + schema: SchemaRef, + projection: Option>, + ) -> Result { + let reader = AvroReader::new(reader)?; + let writer_schema = reader.writer_schema().clone(); + let schema_lookup = Self::schema_lookup(writer_schema)?; + Ok(Self { + reader, + schema, + projection, + schema_lookup, + }) + } + + pub fn schema_lookup(schema: AvroSchema) -> Result> { + match schema { + AvroSchema::Record { + lookup: ref schema_lookup, + .. + } => Ok(schema_lookup.clone()), + _ => Err(DataFusionError::ArrowError(SchemaError( + "expected avro schema to be a record".to_string(), + ))), + } + } + + /// Read the next batch of records + #[allow(clippy::should_implement_trait)] + pub fn next_batch(&mut self, batch_size: usize) -> ArrowResult> { + let mut rows = Vec::with_capacity(batch_size); + for value in self.reader.by_ref().take(batch_size) { + let v = value.map_err(|e| { + ArrowError::ParseError(format!("Failed to parse avro value: {:?}", e)) + })?; + match v { + Value::Record(v) => { + rows.push(v); + } + other => { + return Err(ArrowError::ParseError(format!( + "Row needs to be of type object, got: {:?}", + other + ))) + } + } + } + if rows.is_empty() { + // reached end of file + return Ok(None); + } + let rows = &rows[..]; + let projection = self.projection.clone().unwrap_or_else(Vec::new); + let arrays = self.build_struct_array(rows, self.schema.fields(), &projection); + let projected_fields: Vec = if projection.is_empty() { + self.schema.fields().to_vec() + } else { + projection + .iter() + .map(|name| self.schema.column_with_name(name)) + .flatten() + .map(|(_, field)| field.clone()) + .collect() + }; + let projected_schema = Arc::new(Schema::new(projected_fields)); + arrays.and_then(|arr| RecordBatch::try_new(projected_schema, arr).map(Some)) + } + + fn build_boolean_array( + &self, + rows: RecordSlice, + col_name: &str, + ) -> ArrowResult { + let mut builder = BooleanBuilder::new(rows.len()); + for row in rows { + if let Some(value) = self.field_lookup(col_name, row) { + if let Some(boolean) = resolve_boolean(&value) { + builder.append_value(boolean)? + } else { + builder.append_null()?; + } + } else { + builder.append_null()?; + } + } + Ok(Arc::new(builder.finish())) + } + + #[allow(clippy::unnecessary_wraps)] + fn build_primitive_array( + &self, + rows: RecordSlice, + col_name: &str, + ) -> ArrowResult + where + T: ArrowNumericType, + T::Native: num_traits::cast::NumCast, + { + Ok(Arc::new( + rows.iter() + .map(|row| { + self.field_lookup(col_name, row) + .and_then(|value| resolve_item::(&value)) + }) + .collect::>(), + )) + } + + #[inline(always)] + #[allow(clippy::unnecessary_wraps)] + fn build_string_dictionary_builder( + &self, + row_len: usize, + ) -> ArrowResult> + where + T: ArrowPrimitiveType + ArrowDictionaryKeyType, + { + let key_builder = PrimitiveBuilder::::new(row_len); + let values_builder = StringBuilder::new(row_len * 5); + Ok(StringDictionaryBuilder::new(key_builder, values_builder)) + } + + fn build_wrapped_list_array( + &self, + rows: RecordSlice, + col_name: &str, + key_type: &DataType, + ) -> ArrowResult { + match *key_type { + DataType::Int8 => { + let dtype = DataType::Dictionary( + Box::new(DataType::Int8), + Box::new(DataType::Utf8), + ); + self.list_array_string_array_builder::(&dtype, col_name, rows) + } + DataType::Int16 => { + let dtype = DataType::Dictionary( + Box::new(DataType::Int16), + Box::new(DataType::Utf8), + ); + self.list_array_string_array_builder::(&dtype, col_name, rows) + } + DataType::Int32 => { + let dtype = DataType::Dictionary( + Box::new(DataType::Int32), + Box::new(DataType::Utf8), + ); + self.list_array_string_array_builder::(&dtype, col_name, rows) + } + DataType::Int64 => { + let dtype = DataType::Dictionary( + Box::new(DataType::Int64), + Box::new(DataType::Utf8), + ); + self.list_array_string_array_builder::(&dtype, col_name, rows) + } + DataType::UInt8 => { + let dtype = DataType::Dictionary( + Box::new(DataType::UInt8), + Box::new(DataType::Utf8), + ); + self.list_array_string_array_builder::(&dtype, col_name, rows) + } + DataType::UInt16 => { + let dtype = DataType::Dictionary( + Box::new(DataType::UInt16), + Box::new(DataType::Utf8), + ); + self.list_array_string_array_builder::(&dtype, col_name, rows) + } + DataType::UInt32 => { + let dtype = DataType::Dictionary( + Box::new(DataType::UInt32), + Box::new(DataType::Utf8), + ); + self.list_array_string_array_builder::(&dtype, col_name, rows) + } + DataType::UInt64 => { + let dtype = DataType::Dictionary( + Box::new(DataType::UInt64), + Box::new(DataType::Utf8), + ); + self.list_array_string_array_builder::(&dtype, col_name, rows) + } + ref e => Err(SchemaError(format!( + "Data type is currently not supported for dictionaries in list : {:?}", + e + ))), + } + } + + #[inline(always)] + fn list_array_string_array_builder( + &self, + data_type: &DataType, + col_name: &str, + rows: RecordSlice, + ) -> ArrowResult + where + D: ArrowPrimitiveType + ArrowDictionaryKeyType, + { + let mut builder: Box = match data_type { + DataType::Utf8 => { + let values_builder = StringBuilder::new(rows.len() * 5); + Box::new(ListBuilder::new(values_builder)) + } + DataType::Dictionary(_, _) => { + let values_builder = + self.build_string_dictionary_builder::(rows.len() * 5)?; + Box::new(ListBuilder::new(values_builder)) + } + e => { + return Err(SchemaError(format!( + "Nested list data builder type is not supported: {:?}", + e + ))) + } + }; + + for row in rows { + if let Some(value) = self.field_lookup(col_name, row) { + // value can be an array or a scalar + let vals: Vec> = if let Value::String(v) = value { + vec![Some(v.to_string())] + } else if let Value::Array(n) = value { + n.into_iter() + .map(|v| resolve_string(&v)) + .collect::>>()? + .into_iter() + .map(Some) + .collect::>>() + } else if let Value::Null = value { + vec![None] + } else if !matches!(value, Value::Record(_)) { + vec![Some(resolve_string(&value)?)] + } else { + return Err(SchemaError( + "Only scalars are currently supported in Avro arrays".to_string(), + )); + }; + + // TODO: ARROW-10335: APIs of dictionary arrays and others are different. Unify + // them. + match data_type { + DataType::Utf8 => { + let builder = builder + .as_any_mut() + .downcast_mut::>() + .ok_or_else(||ArrowError::SchemaError( + "Cast failed for ListBuilder during nested data parsing".to_string(), + ))?; + for val in vals { + if let Some(v) = val { + builder.values().append_value(&v)? + } else { + builder.values().append_null()? + }; + } + + // Append to the list + builder.append(true)?; + } + DataType::Dictionary(_, _) => { + let builder = builder.as_any_mut().downcast_mut::>>().ok_or_else(||ArrowError::SchemaError( + "Cast failed for ListBuilder during nested data parsing".to_string(), + ))?; + for val in vals { + if let Some(v) = val { + let _ = builder.values().append(&v)?; + } else { + builder.values().append_null()? + }; + } + + // Append to the list + builder.append(true)?; + } + e => { + return Err(SchemaError(format!( + "Nested list data builder type is not supported: {:?}", + e + ))) + } + } + } + } + + Ok(builder.finish() as ArrayRef) + } + + #[inline(always)] + fn build_dictionary_array( + &self, + rows: RecordSlice, + col_name: &str, + ) -> ArrowResult + where + T::Native: num_traits::cast::NumCast, + T: ArrowPrimitiveType + ArrowDictionaryKeyType, + { + let mut builder: StringDictionaryBuilder = + self.build_string_dictionary_builder(rows.len())?; + for row in rows { + if let Some(value) = self.field_lookup(col_name, row) { + if let Ok(str_v) = resolve_string(&value) { + builder.append(str_v).map(drop)? + } else { + builder.append_null()? + } + } else { + builder.append_null()? + } + } + Ok(Arc::new(builder.finish()) as ArrayRef) + } + + #[inline(always)] + fn build_string_dictionary_array( + &self, + rows: RecordSlice, + col_name: &str, + key_type: &DataType, + value_type: &DataType, + ) -> ArrowResult { + if let DataType::Utf8 = *value_type { + match *key_type { + DataType::Int8 => self.build_dictionary_array::(rows, col_name), + DataType::Int16 => { + self.build_dictionary_array::(rows, col_name) + } + DataType::Int32 => { + self.build_dictionary_array::(rows, col_name) + } + DataType::Int64 => { + self.build_dictionary_array::(rows, col_name) + } + DataType::UInt8 => { + self.build_dictionary_array::(rows, col_name) + } + DataType::UInt16 => { + self.build_dictionary_array::(rows, col_name) + } + DataType::UInt32 => { + self.build_dictionary_array::(rows, col_name) + } + DataType::UInt64 => { + self.build_dictionary_array::(rows, col_name) + } + _ => Err(ArrowError::SchemaError( + "unsupported dictionary key type".to_string(), + )), + } + } else { + Err(ArrowError::SchemaError( + "dictionary types other than UTF-8 not yet supported".to_string(), + )) + } + } + + /// Build a nested GenericListArray from a list of unnested `Value`s + fn build_nested_list_array( + &self, + rows: &[Value], + list_field: &Field, + ) -> ArrowResult { + // build list offsets + let mut cur_offset = OffsetSize::zero(); + let list_len = rows.len(); + let num_list_bytes = bit_util::ceil(list_len, 8); + let mut offsets = Vec::with_capacity(list_len + 1); + let mut list_nulls = MutableBuffer::from_len_zeroed(num_list_bytes); + let list_nulls = list_nulls.as_slice_mut(); + offsets.push(cur_offset); + rows.iter().enumerate().for_each(|(i, v)| { + // TODO: unboxing Union(Array(Union(...))) should probably be done earlier + let v = maybe_resolve_union(v); + if let Value::Array(a) = v { + cur_offset += OffsetSize::from_usize(a.len()).unwrap(); + bit_util::set_bit(list_nulls, i); + } else if let Value::Null = v { + // value is null, not incremented + } else { + cur_offset += OffsetSize::one(); + } + offsets.push(cur_offset); + }); + let valid_len = cur_offset.to_usize().unwrap(); + let array_data = match list_field.data_type() { + DataType::Null => NullArray::new(valid_len).data().clone(), + DataType::Boolean => { + let num_bytes = bit_util::ceil(valid_len, 8); + let mut bool_values = MutableBuffer::from_len_zeroed(num_bytes); + let mut bool_nulls = + MutableBuffer::new(num_bytes).with_bitset(num_bytes, true); + let mut curr_index = 0; + rows.iter().for_each(|v| { + if let Value::Array(vs) = v { + vs.iter().for_each(|value| { + if let Value::Boolean(child) = value { + // if valid boolean, append value + if *child { + bit_util::set_bit( + bool_values.as_slice_mut(), + curr_index, + ); + } + } else { + // null slot + bit_util::unset_bit( + bool_nulls.as_slice_mut(), + curr_index, + ); + } + curr_index += 1; + }); + } + }); + ArrayData::builder(list_field.data_type().clone()) + .len(valid_len) + .add_buffer(bool_values.into()) + .null_bit_buffer(bool_nulls.into()) + .build() + } + DataType::Int8 => self.read_primitive_list_values::(rows), + DataType::Int16 => self.read_primitive_list_values::(rows), + DataType::Int32 => self.read_primitive_list_values::(rows), + DataType::Int64 => self.read_primitive_list_values::(rows), + DataType::UInt8 => self.read_primitive_list_values::(rows), + DataType::UInt16 => self.read_primitive_list_values::(rows), + DataType::UInt32 => self.read_primitive_list_values::(rows), + DataType::UInt64 => self.read_primitive_list_values::(rows), + DataType::Float16 => { + return Err(ArrowError::SchemaError("Float16 not supported".to_string())) + } + DataType::Float32 => self.read_primitive_list_values::(rows), + DataType::Float64 => self.read_primitive_list_values::(rows), + DataType::Timestamp(_, _) + | DataType::Date32 + | DataType::Date64 + | DataType::Time32(_) + | DataType::Time64(_) => { + return Err(ArrowError::SchemaError( + "Temporal types are not yet supported, see ARROW-4803".to_string(), + )) + } + DataType::Utf8 => flatten_string_values(rows) + .into_iter() + .collect::() + .data() + .clone(), + DataType::LargeUtf8 => flatten_string_values(rows) + .into_iter() + .collect::() + .data() + .clone(), + DataType::List(field) => { + let child = + self.build_nested_list_array::(&flatten_values(rows), field)?; + child.data().clone() + } + DataType::LargeList(field) => { + let child = + self.build_nested_list_array::(&flatten_values(rows), field)?; + child.data().clone() + } + DataType::Struct(fields) => { + // extract list values, with non-lists converted to Value::Null + let array_item_count = rows + .iter() + .map(|row| match row { + Value::Array(values) => values.len(), + _ => 1, + }) + .sum(); + let num_bytes = bit_util::ceil(array_item_count, 8); + let mut null_buffer = MutableBuffer::from_len_zeroed(num_bytes); + let mut struct_index = 0; + let rows: Vec> = rows + .iter() + .map(|row| { + if let Value::Array(values) = row { + values.iter().for_each(|_| { + bit_util::set_bit( + null_buffer.as_slice_mut(), + struct_index, + ); + struct_index += 1; + }); + values + .iter() + .map(|v| ("".to_string(), v.clone())) + .collect::>() + } else { + struct_index += 1; + vec![("null".to_string(), Value::Null)] + } + }) + .collect(); + let arrays = + self.build_struct_array(rows.as_slice(), fields.as_slice(), &[])?; + let data_type = DataType::Struct(fields.clone()); + let buf = null_buffer.into(); + ArrayDataBuilder::new(data_type) + .len(rows.len()) + .null_bit_buffer(buf) + .child_data(arrays.into_iter().map(|a| a.data().clone()).collect()) + .build() + } + datatype => { + return Err(ArrowError::SchemaError(format!( + "Nested list of {:?} not supported", + datatype + ))); + } + }; + // build list + let list_data = ArrayData::builder(DataType::List(Box::new(list_field.clone()))) + .len(list_len) + .add_buffer(Buffer::from_slice_ref(&offsets)) + .add_child_data(array_data) + .null_bit_buffer(list_nulls.into()) + .build(); + Ok(Arc::new(GenericListArray::::from(list_data))) + } + + /// Builds the child values of a `StructArray`, falling short of constructing the StructArray. + /// The function does not construct the StructArray as some callers would want the child arrays. + /// + /// *Note*: The function is recursive, and will read nested structs. + /// + /// If `projection` is not empty, then all values are returned. The first level of projection + /// occurs at the `RecordBatch` level. No further projection currently occurs, but would be + /// useful if plucking values from a struct, e.g. getting `a.b.c.e` from `a.b.c.{d, e}`. + fn build_struct_array( + &self, + rows: RecordSlice, + struct_fields: &[Field], + projection: &[String], + ) -> ArrowResult> { + let arrays: ArrowResult> = struct_fields + .iter() + .filter(|field| projection.is_empty() || projection.contains(field.name())) + .map(|field| { + match field.data_type() { + DataType::Null => { + Ok(Arc::new(NullArray::new(rows.len())) as ArrayRef) + } + DataType::Boolean => self.build_boolean_array(rows, field.name()), + DataType::Float64 => { + self.build_primitive_array::(rows, field.name()) + } + DataType::Float32 => { + self.build_primitive_array::(rows, field.name()) + } + DataType::Int64 => { + self.build_primitive_array::(rows, field.name()) + } + DataType::Int32 => { + self.build_primitive_array::(rows, field.name()) + } + DataType::Int16 => { + self.build_primitive_array::(rows, field.name()) + } + DataType::Int8 => { + self.build_primitive_array::(rows, field.name()) + } + DataType::UInt64 => { + self.build_primitive_array::(rows, field.name()) + } + DataType::UInt32 => { + self.build_primitive_array::(rows, field.name()) + } + DataType::UInt16 => { + self.build_primitive_array::(rows, field.name()) + } + DataType::UInt8 => { + self.build_primitive_array::(rows, field.name()) + } + // TODO: this is incomplete + DataType::Timestamp(unit, _) => match unit { + TimeUnit::Second => self + .build_primitive_array::( + rows, + field.name(), + ), + TimeUnit::Microsecond => self + .build_primitive_array::( + rows, + field.name(), + ), + TimeUnit::Millisecond => self + .build_primitive_array::( + rows, + field.name(), + ), + TimeUnit::Nanosecond => self + .build_primitive_array::( + rows, + field.name(), + ), + }, + DataType::Date64 => { + self.build_primitive_array::(rows, field.name()) + } + DataType::Date32 => { + self.build_primitive_array::(rows, field.name()) + } + DataType::Time64(unit) => match unit { + TimeUnit::Microsecond => self + .build_primitive_array::( + rows, + field.name(), + ), + TimeUnit::Nanosecond => self + .build_primitive_array::( + rows, + field.name(), + ), + t => Err(ArrowError::SchemaError(format!( + "TimeUnit {:?} not supported with Time64", + t + ))), + }, + DataType::Time32(unit) => match unit { + TimeUnit::Second => self + .build_primitive_array::( + rows, + field.name(), + ), + TimeUnit::Millisecond => self + .build_primitive_array::( + rows, + field.name(), + ), + t => Err(ArrowError::SchemaError(format!( + "TimeUnit {:?} not supported with Time32", + t + ))), + }, + DataType::Utf8 | DataType::LargeUtf8 => Ok(Arc::new( + rows.iter() + .map(|row| { + let maybe_value = self.field_lookup(field.name(), row); + maybe_value + .map(|value| resolve_string(&value)) + .transpose() + }) + .collect::>()?, + ) + as ArrayRef), + DataType::Binary | DataType::LargeBinary => Ok(Arc::new( + rows.iter() + .map(|row| { + let maybe_value = self.field_lookup(field.name(), row); + maybe_value.and_then(resolve_bytes) + }) + .collect::(), + ) + as ArrayRef), + DataType::List(ref list_field) => { + match list_field.data_type() { + DataType::Dictionary(ref key_ty, _) => { + self.build_wrapped_list_array(rows, field.name(), key_ty) + } + _ => { + // extract rows by name + let extracted_rows = rows + .iter() + .map(|row| { + self.field_lookup(field.name(), row) + .unwrap_or(Value::Null) + }) + .collect::>(); + self.build_nested_list_array::( + extracted_rows.as_slice(), + list_field, + ) + } + } + } + DataType::Dictionary(ref key_ty, ref val_ty) => self + .build_string_dictionary_array( + rows, + field.name(), + key_ty, + val_ty, + ), + DataType::Struct(fields) => { + let len = rows.len(); + let num_bytes = bit_util::ceil(len, 8); + let mut null_buffer = MutableBuffer::from_len_zeroed(num_bytes); + let struct_rows = rows + .iter() + .enumerate() + .map(|(i, row)| (i, self.field_lookup(field.name(), row))) + .map(|(i, v)| match v { + // we want the field as an object, if it's not, we treat as null + Some(Value::Record(ref value)) => { + bit_util::set_bit(null_buffer.as_slice_mut(), i); + value.clone() + } + _ => vec![], + }) + .collect::>>(); + let arrays = + self.build_struct_array(struct_rows.as_slice(), fields, &[])?; + // construct a struct array's data in order to set null buffer + let data_type = DataType::Struct(fields.clone()); + let data = ArrayDataBuilder::new(data_type) + .len(len) + .null_bit_buffer(null_buffer.into()) + .child_data( + arrays.into_iter().map(|a| a.data().clone()).collect(), + ) + .build(); + Ok(make_array(data)) + } + _ => Err(ArrowError::SchemaError(format!( + "type {:?} not supported", + field.data_type() + ))), + } + }) + .collect(); + arrays + } + + /// Read the primitive list's values into ArrayData + fn read_primitive_list_values(&self, rows: &[Value]) -> ArrayData + where + T: ArrowPrimitiveType + ArrowNumericType, + T::Native: num_traits::cast::NumCast, + { + let values = rows + .iter() + .flat_map(|row| { + let row = maybe_resolve_union(row); + if let Value::Array(values) = row { + values + .iter() + .map(resolve_item::) + .collect::>>() + } else if let Some(f) = resolve_item::(row) { + vec![Some(f)] + } else { + vec![] + } + }) + .collect::>>(); + let array = values.iter().collect::>(); + array.data().clone() + } + + fn field_lookup(&self, name: &str, row: &[(String, Value)]) -> Option { + self.schema_lookup + .get(name) + .and_then(|i| row.get(*i)) + .map(|o| o.1.clone()) + } +} + +/// Flattens a list of Avro values, by flattening lists, and treating all other values as +/// single-value lists. +/// This is used to read into nested lists (list of list, list of struct) and non-dictionary lists. +#[inline] +fn flatten_values(values: &[Value]) -> Vec { + values + .iter() + .flat_map(|row| { + if let Value::Array(values) = row { + values.clone() + } else if let Value::Null = row { + vec![Value::Null] + } else { + // we interpret a scalar as a single-value list to minimise data loss + vec![row.clone()] + } + }) + .collect() +} + +/// Flattens a list into string values, dropping Value::Null in the process. +/// This is useful for interpreting any Avro array as string, dropping nulls. +/// See `value_as_string`. +#[inline] +fn flatten_string_values(values: &[Value]) -> Vec> { + values + .iter() + .flat_map(|row| { + if let Value::Array(values) = row { + values + .iter() + .map(|s| resolve_string(s).ok()) + .collect::>>() + } else if let Value::Null = row { + vec![] + } else { + vec![resolve_string(row).ok()] + } + }) + .collect::>>() +} + +/// Reads an Avro value as a string, regardless of its type. +/// This is useful if the expected datatype is a string, in which case we preserve +/// all the values regardless of they type. +fn resolve_string(v: &Value) -> ArrowResult { + let v = if let Value::Union(b) = v { b } else { v }; + match v { + Value::String(s) => Ok(s.clone()), + Value::Bytes(bytes) => { + String::from_utf8(bytes.to_vec()).map_err(AvroError::ConvertToUtf8) + } + other => Err(AvroError::GetString(other.into())), + } + .map_err(|e| SchemaError(format!("expected resolvable string : {}", e))) +} + +fn resolve_u8(v: Value) -> AvroResult { + let int = v.resolve(&AvroSchema::Int)?; + if let Value::Int(n) = int { + if n >= 0 && n <= std::convert::From::from(u8::MAX) { + return Ok(n as u8); + } + } + + Err(AvroError::GetU8(int.into())) +} + +fn resolve_bytes(v: Value) -> Option> { + let v = if let Value::Union(b) = v { *b } else { v }; + match v { + Value::Bytes(bytes) => Ok(Value::Bytes(bytes)), + Value::String(s) => Ok(Value::Bytes(s.into_bytes())), + Value::Array(items) => Ok(Value::Bytes( + items + .into_iter() + .map(resolve_u8) + .collect::, _>>() + .ok()?, + )), + other => Err(AvroError::GetBytes(other.into())), + } + .ok() + .and_then(|v| match v { + Value::Bytes(s) => Some(s), + _ => None, + }) +} + +fn resolve_boolean(value: &Value) -> Option { + let v = if let Value::Union(b) = value { + b + } else { + value + }; + match v { + Value::Boolean(boolean) => Some(*boolean), + _ => None, + } +} + +trait Resolver: ArrowPrimitiveType { + fn resolve(value: &Value) -> Option; +} + +fn resolve_item(value: &Value) -> Option { + T::resolve(value) +} + +fn maybe_resolve_union(value: &Value) -> &Value { + if SchemaKind::from(value) == SchemaKind::Union { + // Pull out the Union, and attempt to resolve against it. + match value { + Value::Union(b) => b, + _ => unreachable!(), + } + } else { + value + } +} + +impl Resolver for N +where + N: ArrowNumericType, + N::Native: num_traits::cast::NumCast, +{ + fn resolve(value: &Value) -> Option { + let value = maybe_resolve_union(value); + match value { + Value::Int(i) | Value::TimeMillis(i) | Value::Date(i) => NumCast::from(*i), + Value::Long(l) + | Value::TimeMicros(l) + | Value::TimestampMillis(l) + | Value::TimestampMicros(l) => NumCast::from(*l), + Value::Float(f) => NumCast::from(*f), + Value::Double(f) => NumCast::from(*f), + Value::Duration(_d) => unimplemented!(), // shenanigans type + Value::Null => None, + _ => unreachable!(), + } + } +} + +#[cfg(test)] +mod test { + use crate::arrow::array::Array; + use crate::arrow::datatypes::{Field, TimeUnit}; + use crate::avro_to_arrow::{Reader, ReaderBuilder}; + use arrow::array::{Int32Array, Int64Array, ListArray, TimestampMicrosecondArray}; + use arrow::datatypes::DataType; + use std::fs::File; + + fn build_reader(name: &str, batch_size: usize) -> Reader { + let testdata = crate::test_util::arrow_test_data(); + let filename = format!("{}/avro/{}", testdata, name); + let builder = ReaderBuilder::new() + .read_schema() + .with_batch_size(batch_size); + builder.build(File::open(filename).unwrap()).unwrap() + } + + // TODO: Fixed, Enum, Dictionary + + #[test] + fn test_time_avro_milliseconds() { + let mut reader = build_reader("alltypes_plain.avro", 10); + let batch = reader.next().unwrap().unwrap(); + + assert_eq!(11, batch.num_columns()); + assert_eq!(8, batch.num_rows()); + + let schema = reader.schema(); + let batch_schema = batch.schema(); + assert_eq!(schema, batch_schema); + + let timestamp_col = schema.column_with_name("timestamp_col").unwrap(); + assert_eq!( + &DataType::Timestamp(TimeUnit::Microsecond, None), + timestamp_col.1.data_type() + ); + let timestamp_array = batch + .column(timestamp_col.0) + .as_any() + .downcast_ref::() + .unwrap(); + for i in 0..timestamp_array.len() { + assert!(timestamp_array.is_valid(i)); + } + assert_eq!(1235865600000000, timestamp_array.value(0)); + assert_eq!(1235865660000000, timestamp_array.value(1)); + assert_eq!(1238544000000000, timestamp_array.value(2)); + assert_eq!(1238544060000000, timestamp_array.value(3)); + assert_eq!(1233446400000000, timestamp_array.value(4)); + assert_eq!(1233446460000000, timestamp_array.value(5)); + assert_eq!(1230768000000000, timestamp_array.value(6)); + assert_eq!(1230768060000000, timestamp_array.value(7)); + } + + #[test] + fn test_avro_read_list() { + let mut reader = build_reader("list_columns.avro", 3); + let schema = reader.schema(); + let (col_id_index, _) = schema.column_with_name("int64_list").unwrap(); + let batch = reader.next().unwrap().unwrap(); + assert_eq!(batch.num_columns(), 2); + assert_eq!(batch.num_rows(), 3); + let a_array = batch + .column(col_id_index) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!( + *a_array.data_type(), + DataType::List(Box::new(Field::new("bigint", DataType::Int64, true))) + ); + let array = a_array.value(0); + assert_eq!(*array.data_type(), DataType::Int64); + + assert_eq!( + 6, + array + .as_any() + .downcast_ref::() + .unwrap() + .iter() + .flatten() + .sum::() + ); + } + #[test] + fn test_avro_read_nested_list() { + let mut reader = build_reader("nested_lists.snappy.avro", 3); + let batch = reader.next().unwrap().unwrap(); + assert_eq!(batch.num_columns(), 2); + assert_eq!(batch.num_rows(), 3); + } + + #[test] + fn test_avro_iterator() { + let reader = build_reader("alltypes_plain.avro", 5); + let schema = reader.schema(); + let (col_id_index, _) = schema.column_with_name("id").unwrap(); + + let mut sum_num_rows = 0; + let mut num_batches = 0; + let mut sum_id = 0; + for batch in reader { + let batch = batch.unwrap(); + assert_eq!(11, batch.num_columns()); + sum_num_rows += batch.num_rows(); + num_batches += 1; + let batch_schema = batch.schema(); + assert_eq!(schema, batch_schema); + let a_array = batch + .column(col_id_index) + .as_any() + .downcast_ref::() + .unwrap(); + sum_id += (0..a_array.len()).map(|i| a_array.value(i)).sum::(); + } + assert_eq!(8, sum_num_rows); + assert_eq!(2, num_batches); + assert_eq!(28, sum_id); + } +} diff --git a/datafusion/src/avro_to_arrow/mod.rs b/datafusion/src/avro_to_arrow/mod.rs new file mode 100644 index 000000000000..531b1092e1d6 --- /dev/null +++ b/datafusion/src/avro_to_arrow/mod.rs @@ -0,0 +1,47 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! This module contains utilities to manipulate avro metadata. + +#[cfg(feature = "avro")] +mod arrow_array_reader; +#[cfg(feature = "avro")] +mod reader; +#[cfg(feature = "avro")] +mod schema; + +use crate::arrow::datatypes::Schema; +use crate::error::Result; +#[cfg(feature = "avro")] +pub use reader::{Reader, ReaderBuilder}; +use std::io::{Read, Seek}; + +#[cfg(feature = "avro")] +/// Read Avro schema given a reader +pub fn read_avro_schema_from_reader(reader: &mut R) -> Result { + let avro_reader = avro_rs::Reader::new(reader)?; + let schema = avro_reader.writer_schema(); + schema::to_arrow_schema(schema) +} + +#[cfg(not(feature = "avro"))] +/// Read Avro schema given a reader (requires the avro feature) +pub fn read_avro_schema_from_reader(_: &mut R) -> Result { + Err(crate::error::DataFusionError::NotImplemented( + "cannot read avro schema without the 'avro' feature enabled".to_string(), + )) +} diff --git a/datafusion/src/avro_to_arrow/reader.rs b/datafusion/src/avro_to_arrow/reader.rs new file mode 100644 index 000000000000..8baad14746d3 --- /dev/null +++ b/datafusion/src/avro_to_arrow/reader.rs @@ -0,0 +1,281 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use super::arrow_array_reader::AvroArrowArrayReader; +use crate::arrow::datatypes::SchemaRef; +use crate::arrow::record_batch::RecordBatch; +use crate::error::Result; +use arrow::error::Result as ArrowResult; +use std::io::{Read, Seek, SeekFrom}; +use std::sync::Arc; + +/// Avro file reader builder +#[derive(Debug)] +pub struct ReaderBuilder { + /// Optional schema for the Avro file + /// + /// If the schema is not supplied, the reader will try to read the schema. + schema: Option, + /// Batch size (number of records to load each time) + /// + /// The default batch size when using the `ReaderBuilder` is 1024 records + batch_size: usize, + /// Optional projection for which columns to load (zero-based column indices) + projection: Option>, +} + +impl Default for ReaderBuilder { + fn default() -> Self { + Self { + schema: None, + batch_size: 1024, + projection: None, + } + } +} + +impl ReaderBuilder { + /// Create a new builder for configuring Avro parsing options. + /// + /// To convert a builder into a reader, call `Reader::from_builder` + /// + /// # Example + /// + /// ``` + /// extern crate avro_rs; + /// + /// use std::fs::File; + /// + /// fn example() -> crate::datafusion::avro_to_arrow::Reader<'static, File> { + /// let file = File::open("test/data/basic.avro").unwrap(); + /// + /// // create a builder, inferring the schema with the first 100 records + /// let builder = crate::datafusion::avro_to_arrow::ReaderBuilder::new().read_schema().with_batch_size(100); + /// + /// let reader = builder.build::(file).unwrap(); + /// + /// reader + /// } + /// ``` + pub fn new() -> Self { + Self::default() + } + + /// Set the Avro file's schema + pub fn with_schema(mut self, schema: SchemaRef) -> Self { + self.schema = Some(schema); + self + } + + /// Set the Avro reader to infer the schema of the file + pub fn read_schema(mut self) -> Self { + // remove any schema that is set + self.schema = None; + self + } + + /// Set the batch size (number of records to load at one time) + pub fn with_batch_size(mut self, batch_size: usize) -> Self { + self.batch_size = batch_size; + self + } + + /// Set the reader's column projection + pub fn with_projection(mut self, projection: Vec) -> Self { + self.projection = Some(projection); + self + } + + /// Create a new `Reader` from the `ReaderBuilder` + pub fn build<'a, R>(self, source: R) -> Result> + where + R: Read + Seek, + { + let mut source = source; + + // check if schema should be inferred + let schema = match self.schema { + Some(schema) => schema, + None => Arc::new(super::read_avro_schema_from_reader(&mut source)?), + }; + source.seek(SeekFrom::Start(0))?; + Reader::try_new(source, schema, self.batch_size, self.projection) + } +} + +/// Avro file record reader +pub struct Reader<'a, R: Read> { + array_reader: AvroArrowArrayReader<'a, R>, + schema: SchemaRef, + batch_size: usize, +} + +impl<'a, R: Read> Reader<'a, R> { + /// Create a new Avro Reader from any value that implements the `Read` trait. + /// + /// If reading a `File`, you can customise the Reader, such as to enable schema + /// inference, use `ReaderBuilder`. + pub fn try_new( + reader: R, + schema: SchemaRef, + batch_size: usize, + projection: Option>, + ) -> Result { + Ok(Self { + array_reader: AvroArrowArrayReader::try_new( + reader, + schema.clone(), + projection, + )?, + schema, + batch_size, + }) + } + + /// Returns the schema of the reader, useful for getting the schema without reading + /// record batches + pub fn schema(&self) -> SchemaRef { + self.schema.clone() + } + + /// Returns the next batch of results (defined by `self.batch_size`), or `None` if there + /// are no more results + #[allow(clippy::should_implement_trait)] + pub fn next(&mut self) -> ArrowResult> { + self.array_reader.next_batch(self.batch_size) + } +} + +impl<'a, R: Read> Iterator for Reader<'a, R> { + type Item = ArrowResult; + + fn next(&mut self) -> Option { + self.next().transpose() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::arrow::array::*; + use crate::arrow::datatypes::{DataType, Field}; + use arrow::datatypes::TimeUnit; + use std::fs::File; + + fn build_reader(name: &str) -> Reader { + let testdata = crate::test_util::arrow_test_data(); + let filename = format!("{}/avro/{}", testdata, name); + let builder = ReaderBuilder::new().read_schema().with_batch_size(64); + builder.build(File::open(filename).unwrap()).unwrap() + } + + fn get_col<'a, T: 'static>( + batch: &'a RecordBatch, + col: (usize, &Field), + ) -> Option<&'a T> { + batch.column(col.0).as_any().downcast_ref::() + } + + #[test] + fn test_avro_basic() { + let mut reader = build_reader("alltypes_dictionary.avro"); + let batch = reader.next().unwrap().unwrap(); + + assert_eq!(11, batch.num_columns()); + assert_eq!(2, batch.num_rows()); + + let schema = reader.schema(); + let batch_schema = batch.schema(); + assert_eq!(schema, batch_schema); + + let id = schema.column_with_name("id").unwrap(); + assert_eq!(0, id.0); + assert_eq!(&DataType::Int32, id.1.data_type()); + let col = get_col::(&batch, id).unwrap(); + assert_eq!(0, col.value(0)); + assert_eq!(1, col.value(1)); + let bool_col = schema.column_with_name("bool_col").unwrap(); + assert_eq!(1, bool_col.0); + assert_eq!(&DataType::Boolean, bool_col.1.data_type()); + let col = get_col::(&batch, bool_col).unwrap(); + assert!(col.value(0)); + assert!(!col.value(1)); + let tinyint_col = schema.column_with_name("tinyint_col").unwrap(); + assert_eq!(2, tinyint_col.0); + assert_eq!(&DataType::Int32, tinyint_col.1.data_type()); + let col = get_col::(&batch, tinyint_col).unwrap(); + assert_eq!(0, col.value(0)); + assert_eq!(1, col.value(1)); + let smallint_col = schema.column_with_name("smallint_col").unwrap(); + assert_eq!(3, smallint_col.0); + assert_eq!(&DataType::Int32, smallint_col.1.data_type()); + let col = get_col::(&batch, smallint_col).unwrap(); + assert_eq!(0, col.value(0)); + assert_eq!(1, col.value(1)); + let int_col = schema.column_with_name("int_col").unwrap(); + assert_eq!(4, int_col.0); + let col = get_col::(&batch, int_col).unwrap(); + assert_eq!(0, col.value(0)); + assert_eq!(1, col.value(1)); + assert_eq!(&DataType::Int32, int_col.1.data_type()); + let col = get_col::(&batch, int_col).unwrap(); + assert_eq!(0, col.value(0)); + assert_eq!(1, col.value(1)); + let bigint_col = schema.column_with_name("bigint_col").unwrap(); + assert_eq!(5, bigint_col.0); + let col = get_col::(&batch, bigint_col).unwrap(); + assert_eq!(0, col.value(0)); + assert_eq!(10, col.value(1)); + assert_eq!(&DataType::Int64, bigint_col.1.data_type()); + let float_col = schema.column_with_name("float_col").unwrap(); + assert_eq!(6, float_col.0); + let col = get_col::(&batch, float_col).unwrap(); + assert_eq!(0.0, col.value(0)); + assert_eq!(1.1, col.value(1)); + assert_eq!(&DataType::Float32, float_col.1.data_type()); + let col = get_col::(&batch, float_col).unwrap(); + assert_eq!(0.0, col.value(0)); + assert_eq!(1.1, col.value(1)); + let double_col = schema.column_with_name("double_col").unwrap(); + assert_eq!(7, double_col.0); + assert_eq!(&DataType::Float64, double_col.1.data_type()); + let col = get_col::(&batch, double_col).unwrap(); + assert_eq!(0.0, col.value(0)); + assert_eq!(10.1, col.value(1)); + let date_string_col = schema.column_with_name("date_string_col").unwrap(); + assert_eq!(8, date_string_col.0); + assert_eq!(&DataType::Binary, date_string_col.1.data_type()); + let col = get_col::(&batch, date_string_col).unwrap(); + assert_eq!("01/01/09".as_bytes(), col.value(0)); + assert_eq!("01/01/09".as_bytes(), col.value(1)); + let string_col = schema.column_with_name("string_col").unwrap(); + assert_eq!(9, string_col.0); + assert_eq!(&DataType::Binary, string_col.1.data_type()); + let col = get_col::(&batch, string_col).unwrap(); + assert_eq!("0".as_bytes(), col.value(0)); + assert_eq!("1".as_bytes(), col.value(1)); + let timestamp_col = schema.column_with_name("timestamp_col").unwrap(); + assert_eq!(10, timestamp_col.0); + assert_eq!( + &DataType::Timestamp(TimeUnit::Microsecond, None), + timestamp_col.1.data_type() + ); + let col = get_col::(&batch, timestamp_col).unwrap(); + assert_eq!(1230768000000000, col.value(0)); + assert_eq!(1230768060000000, col.value(1)); + } +} diff --git a/datafusion/src/avro_to_arrow/schema.rs b/datafusion/src/avro_to_arrow/schema.rs new file mode 100644 index 000000000000..c2927f0829ba --- /dev/null +++ b/datafusion/src/avro_to_arrow/schema.rs @@ -0,0 +1,464 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::arrow::datatypes::{DataType, IntervalUnit, Schema, TimeUnit}; +use crate::error::{DataFusionError, Result}; +use arrow::datatypes::Field; +use avro_rs::schema::Name; +use avro_rs::types::Value; +use avro_rs::Schema as AvroSchema; +use std::collections::BTreeMap; +use std::convert::TryFrom; + +/// Converts an avro schema to an arrow schema +pub fn to_arrow_schema(avro_schema: &avro_rs::Schema) -> Result { + let mut schema_fields = vec![]; + match avro_schema { + AvroSchema::Record { fields, .. } => { + for field in fields { + schema_fields.push(schema_to_field_with_props( + &field.schema, + Some(&field.name), + false, + Some(&external_props(&field.schema)), + )?) + } + } + schema => schema_fields.push(schema_to_field(schema, Some(""), false)?), + } + + let schema = Schema::new(schema_fields); + Ok(schema) +} + +fn schema_to_field( + schema: &avro_rs::Schema, + name: Option<&str>, + nullable: bool, +) -> Result { + schema_to_field_with_props(schema, name, nullable, None) +} + +fn schema_to_field_with_props( + schema: &AvroSchema, + name: Option<&str>, + nullable: bool, + props: Option<&BTreeMap>, +) -> Result { + let mut nullable = nullable; + let field_type: DataType = match schema { + AvroSchema::Null => DataType::Null, + AvroSchema::Boolean => DataType::Boolean, + AvroSchema::Int => DataType::Int32, + AvroSchema::Long => DataType::Int64, + AvroSchema::Float => DataType::Float32, + AvroSchema::Double => DataType::Float64, + AvroSchema::Bytes => DataType::Binary, + AvroSchema::String => DataType::Utf8, + AvroSchema::Array(item_schema) => DataType::List(Box::new( + schema_to_field_with_props(item_schema, None, false, None)?, + )), + AvroSchema::Map(value_schema) => { + let value_field = + schema_to_field_with_props(value_schema, Some("value"), false, None)?; + DataType::Dictionary( + Box::new(DataType::Utf8), + Box::new(value_field.data_type().clone()), + ) + } + AvroSchema::Union(us) => { + // If there are only two variants and one of them is null, set the other type as the field data type + let has_nullable = us.find_schema(&Value::Null).is_some(); + let sub_schemas = us.variants(); + if has_nullable && sub_schemas.len() == 2 { + nullable = true; + if let Some(schema) = sub_schemas + .iter() + .find(|&schema| !matches!(schema, AvroSchema::Null)) + { + schema_to_field_with_props(schema, None, has_nullable, None)? + .data_type() + .clone() + } else { + return Err(DataFusionError::AvroError( + avro_rs::Error::GetUnionDuplicate, + )); + } + } else { + let fields = sub_schemas + .iter() + .map(|s| schema_to_field_with_props(s, None, has_nullable, None)) + .collect::>>()?; + DataType::Union(fields) + } + } + AvroSchema::Record { name, fields, .. } => { + let fields: Result> = fields + .iter() + .map(|field| { + let mut props = BTreeMap::new(); + if let Some(doc) = &field.doc { + props.insert("avro::doc".to_string(), doc.clone()); + } + /*if let Some(aliases) = fields.aliases { + props.insert("aliases", aliases); + }*/ + schema_to_field_with_props( + &field.schema, + Some(&format!("{}.{}", name.fullname(None), field.name)), + false, + Some(&props), + ) + }) + .collect(); + DataType::Struct(fields?) + } + AvroSchema::Enum { symbols, name, .. } => { + return Ok(Field::new_dict( + &name.fullname(None), + index_type(symbols.len()), + false, + 0, + false, + )) + } + AvroSchema::Fixed { size, .. } => DataType::FixedSizeBinary(*size as i32), + AvroSchema::Decimal { + precision, scale, .. + } => DataType::Decimal(*precision, *scale), + AvroSchema::Uuid => DataType::FixedSizeBinary(16), + AvroSchema::Date => DataType::Date32, + AvroSchema::TimeMillis => DataType::Time32(TimeUnit::Millisecond), + AvroSchema::TimeMicros => DataType::Time64(TimeUnit::Microsecond), + AvroSchema::TimestampMillis => DataType::Timestamp(TimeUnit::Millisecond, None), + AvroSchema::TimestampMicros => DataType::Timestamp(TimeUnit::Microsecond, None), + AvroSchema::Duration => DataType::Duration(TimeUnit::Millisecond), + }; + + let data_type = field_type.clone(); + let name = name.unwrap_or_else(|| default_field_name(&data_type)); + + let mut field = Field::new(name, field_type, nullable); + field.set_metadata(props.cloned()); + Ok(field) +} + +fn default_field_name(dt: &DataType) -> &str { + match dt { + DataType::Null => "null", + DataType::Boolean => "bit", + DataType::Int8 => "tinyint", + DataType::Int16 => "smallint", + DataType::Int32 => "int", + DataType::Int64 => "bigint", + DataType::UInt8 => "uint1", + DataType::UInt16 => "uint2", + DataType::UInt32 => "uint4", + DataType::UInt64 => "uint8", + DataType::Float16 => "float2", + DataType::Float32 => "float4", + DataType::Float64 => "float8", + DataType::Date32 => "dateday", + DataType::Date64 => "datemilli", + DataType::Time32(tu) | DataType::Time64(tu) => match tu { + TimeUnit::Second => "timesec", + TimeUnit::Millisecond => "timemilli", + TimeUnit::Microsecond => "timemicro", + TimeUnit::Nanosecond => "timenano", + }, + DataType::Timestamp(tu, tz) => { + if tz.is_some() { + match tu { + TimeUnit::Second => "timestampsectz", + TimeUnit::Millisecond => "timestampmillitz", + TimeUnit::Microsecond => "timestampmicrotz", + TimeUnit::Nanosecond => "timestampnanotz", + } + } else { + match tu { + TimeUnit::Second => "timestampsec", + TimeUnit::Millisecond => "timestampmilli", + TimeUnit::Microsecond => "timestampmicro", + TimeUnit::Nanosecond => "timestampnano", + } + } + } + DataType::Duration(_) => "duration", + DataType::Interval(unit) => match unit { + IntervalUnit::YearMonth => "intervalyear", + IntervalUnit::DayTime => "intervalmonth", + }, + DataType::Binary => "varbinary", + DataType::FixedSizeBinary(_) => "fixedsizebinary", + DataType::LargeBinary => "largevarbinary", + DataType::Utf8 => "varchar", + DataType::LargeUtf8 => "largevarchar", + DataType::List(_) => "list", + DataType::FixedSizeList(_, _) => "fixed_size_list", + DataType::LargeList(_) => "largelist", + DataType::Struct(_) => "struct", + DataType::Union(_) => "union", + DataType::Dictionary(_, _) => "map", + DataType::Decimal(_, _) => "decimal", + } +} + +fn index_type(len: usize) -> DataType { + if len <= usize::from(u8::MAX) { + DataType::Int8 + } else if len <= usize::from(u16::MAX) { + DataType::Int16 + } else if usize::try_from(u32::MAX).map(|i| len < i).unwrap_or(false) { + DataType::Int32 + } else { + DataType::Int64 + } +} + +fn external_props(schema: &AvroSchema) -> BTreeMap { + let mut props = BTreeMap::new(); + match &schema { + AvroSchema::Record { + doc: Some(ref doc), .. + } + | AvroSchema::Enum { + doc: Some(ref doc), .. + } => { + props.insert("avro::doc".to_string(), doc.clone()); + } + _ => {} + } + match &schema { + AvroSchema::Record { + name: + Name { + aliases: Some(aliases), + namespace, + .. + }, + .. + } + | AvroSchema::Enum { + name: + Name { + aliases: Some(aliases), + namespace, + .. + }, + .. + } + | AvroSchema::Fixed { + name: + Name { + aliases: Some(aliases), + namespace, + .. + }, + .. + } => { + let aliases: Vec = aliases + .iter() + .map(|alias| aliased(alias, namespace.as_deref(), None)) + .collect(); + props.insert( + "avro::aliases".to_string(), + format!("[{}]", aliases.join(",")), + ); + } + _ => {} + } + props +} + +#[allow(dead_code)] +fn get_metadata( + _schema: AvroSchema, + props: BTreeMap, +) -> BTreeMap { + let mut metadata: BTreeMap = Default::default(); + metadata.extend(props); + metadata +} + +/// Returns the fully qualified name for a field +pub fn aliased( + name: &str, + namespace: Option<&str>, + default_namespace: Option<&str>, +) -> String { + if name.contains('.') { + name.to_string() + } else { + let namespace = namespace.as_ref().copied().or(default_namespace); + + match namespace { + Some(ref namespace) => format!("{}.{}", namespace, name), + None => name.to_string(), + } + } +} + +#[cfg(test)] +mod test { + use super::{aliased, external_props, to_arrow_schema}; + use crate::arrow::datatypes::DataType::{Binary, Float32, Float64, Timestamp, Utf8}; + use crate::arrow::datatypes::TimeUnit::Microsecond; + use crate::arrow::datatypes::{Field, Schema}; + use arrow::datatypes::DataType::{Boolean, Int32, Int64}; + use avro_rs::schema::Name; + use avro_rs::Schema as AvroSchema; + + #[test] + fn test_alias() { + assert_eq!(aliased("foo.bar", None, None), "foo.bar"); + assert_eq!(aliased("bar", Some("foo"), None), "foo.bar"); + assert_eq!(aliased("bar", Some("foo"), Some("cat")), "foo.bar"); + assert_eq!(aliased("bar", None, Some("cat")), "cat.bar"); + } + + #[test] + fn test_external_props() { + let record_schema = AvroSchema::Record { + name: Name { + name: "record".to_string(), + namespace: None, + aliases: Some(vec!["fooalias".to_string(), "baralias".to_string()]), + }, + doc: Some("record documentation".to_string()), + fields: vec![], + lookup: Default::default(), + }; + let props = external_props(&record_schema); + assert_eq!( + props.get("avro::doc"), + Some(&"record documentation".to_string()) + ); + assert_eq!( + props.get("avro::aliases"), + Some(&"[fooalias,baralias]".to_string()) + ); + let enum_schema = AvroSchema::Enum { + name: Name { + name: "enum".to_string(), + namespace: None, + aliases: Some(vec!["fooenum".to_string(), "barenum".to_string()]), + }, + doc: Some("enum documentation".to_string()), + symbols: vec![], + }; + let props = external_props(&enum_schema); + assert_eq!( + props.get("avro::doc"), + Some(&"enum documentation".to_string()) + ); + assert_eq!( + props.get("avro::aliases"), + Some(&"[fooenum,barenum]".to_string()) + ); + let fixed_schema = AvroSchema::Fixed { + name: Name { + name: "fixed".to_string(), + namespace: None, + aliases: Some(vec!["foofixed".to_string(), "barfixed".to_string()]), + }, + size: 1, + }; + let props = external_props(&fixed_schema); + assert_eq!( + props.get("avro::aliases"), + Some(&"[foofixed,barfixed]".to_string()) + ); + } + + #[test] + fn test_invalid_avro_schema() {} + + #[test] + fn test_plain_types_schema() { + let schema = AvroSchema::parse_str( + r#" + { + "type" : "record", + "name" : "topLevelRecord", + "fields" : [ { + "name" : "id", + "type" : [ "int", "null" ] + }, { + "name" : "bool_col", + "type" : [ "boolean", "null" ] + }, { + "name" : "tinyint_col", + "type" : [ "int", "null" ] + }, { + "name" : "smallint_col", + "type" : [ "int", "null" ] + }, { + "name" : "int_col", + "type" : [ "int", "null" ] + }, { + "name" : "bigint_col", + "type" : [ "long", "null" ] + }, { + "name" : "float_col", + "type" : [ "float", "null" ] + }, { + "name" : "double_col", + "type" : [ "double", "null" ] + }, { + "name" : "date_string_col", + "type" : [ "bytes", "null" ] + }, { + "name" : "string_col", + "type" : [ "bytes", "null" ] + }, { + "name" : "timestamp_col", + "type" : [ { + "type" : "long", + "logicalType" : "timestamp-micros" + }, "null" ] + } ] + }"#, + ); + assert!(schema.is_ok(), "{:?}", schema); + let arrow_schema = to_arrow_schema(&schema.unwrap()); + assert!(arrow_schema.is_ok(), "{:?}", arrow_schema); + let expected = Schema::new(vec![ + Field::new("id", Int32, true), + Field::new("bool_col", Boolean, true), + Field::new("tinyint_col", Int32, true), + Field::new("smallint_col", Int32, true), + Field::new("int_col", Int32, true), + Field::new("bigint_col", Int64, true), + Field::new("float_col", Float32, true), + Field::new("double_col", Float64, true), + Field::new("date_string_col", Binary, true), + Field::new("string_col", Binary, true), + Field::new("timestamp_col", Timestamp(Microsecond, None), true), + ]); + assert_eq!(arrow_schema.unwrap(), expected); + } + + #[test] + fn test_non_record_schema() { + let arrow_schema = to_arrow_schema(&AvroSchema::String); + assert!(arrow_schema.is_ok(), "{:?}", arrow_schema); + assert_eq!( + arrow_schema.unwrap(), + Schema::new(vec![Field::new("", Utf8, false)]) + ); + } +} diff --git a/datafusion/src/datasource/avro.rs b/datafusion/src/datasource/avro.rs new file mode 100644 index 000000000000..ee0fabfe0cc6 --- /dev/null +++ b/datafusion/src/datasource/avro.rs @@ -0,0 +1,424 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Line-delimited Avro data source +//! +//! This data source allows Line-delimited Avro records or files to be used as input for queries. +//! + +use std::{ + any::Any, + io::{Read, Seek}, + sync::{Arc, Mutex}, +}; + +use arrow::datatypes::SchemaRef; + +use crate::physical_plan::avro::{AvroExec, AvroReadOptions}; +use crate::{ + datasource::{Source, TableProvider}, + error::{DataFusionError, Result}, + physical_plan::{common, ExecutionPlan}, +}; + +trait SeekRead: Read + Seek {} + +impl SeekRead for T {} + +/// Represents a line-delimited Avro file with a provided schema +pub struct AvroFile { + source: Source>, + schema: SchemaRef, + file_extension: String, +} + +impl AvroFile { + /// Attempt to initialize a `AvroFile` from a path. The schema can be read automatically. + pub fn try_new(path: &str, options: AvroReadOptions) -> Result { + let schema = if let Some(schema) = options.schema { + schema + } else { + let filenames = + common::build_checked_file_list(path, options.file_extension)?; + Arc::new(AvroExec::try_read_schema(&filenames)?) + }; + + Ok(Self { + source: Source::Path(path.to_string()), + schema, + file_extension: options.file_extension.to_string(), + }) + } + + /// Attempt to initialize a `AvroFile` from a reader. The schema MUST be provided in options + pub fn try_new_from_reader( + reader: R, + options: AvroReadOptions, + ) -> Result { + let schema = match options.schema { + Some(s) => s, + None => { + return Err(DataFusionError::Execution( + "Schema must be provided to CsvRead".to_string(), + )); + } + }; + Ok(Self { + source: Source::Reader(Mutex::new(Some(Box::new(reader)))), + schema, + file_extension: String::new(), + }) + } + + /// Attempt to initialize an AvroFile from a reader impls Seek. The schema can be read automatically. + pub fn try_new_from_reader_schema( + mut reader: R, + options: AvroReadOptions, + ) -> Result { + let schema = { + if let Some(schema) = options.schema { + schema + } else { + Arc::new(crate::avro_to_arrow::read_avro_schema_from_reader( + &mut reader, + )?) + } + }; + + Ok(Self { + source: Source::Reader(Mutex::new(Some(Box::new(reader)))), + schema, + file_extension: String::new(), + }) + } + + /// Get the path for Avro file(s) represented by this AvroFile instance + pub fn path(&self) -> &str { + match &self.source { + Source::Reader(_) => "", + Source::Path(path) => path, + } + } + + /// Get the file extension for the Avro file(s) represented by this AvroFile instance + pub fn file_extension(&self) -> &str { + &self.file_extension + } +} + +impl TableProvider for AvroFile { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + self.schema.clone() + } + + fn scan( + &self, + projection: &Option>, + batch_size: usize, + _filters: &[crate::logical_plan::Expr], + limit: Option, + ) -> Result> { + let opts = AvroReadOptions { + schema: Some(self.schema.clone()), + file_extension: self.file_extension.as_str(), + }; + let batch_size = limit + .map(|l| std::cmp::min(l, batch_size)) + .unwrap_or(batch_size); + + let exec = match &self.source { + Source::Reader(maybe_reader) => { + if let Some(rdr) = maybe_reader.lock().unwrap().take() { + AvroExec::try_new_from_reader( + rdr, + opts, + projection.clone(), + batch_size, + limit, + )? + } else { + return Err(DataFusionError::Execution( + "You can only read once if the data comes from a reader" + .to_string(), + )); + } + } + Source::Path(p) => { + AvroExec::try_from_path(p, opts, projection.clone(), batch_size, limit)? + } + }; + Ok(Arc::new(exec)) + } +} + +#[cfg(test)] +#[cfg(feature = "avro")] +mod tests { + use arrow::array::{ + BinaryArray, BooleanArray, Float32Array, Float64Array, Int32Array, + TimestampMicrosecondArray, + }; + use arrow::record_batch::RecordBatch; + use futures::StreamExt; + + use super::*; + + #[tokio::test] + async fn read_small_batches() -> Result<()> { + let table = load_table("alltypes_plain.avro")?; + let projection = None; + let exec = table.scan(&projection, 2, &[], None)?; + let stream = exec.execute(0).await?; + + let _ = stream + .map(|batch| { + let batch = batch.unwrap(); + assert_eq!(11, batch.num_columns()); + assert_eq!(2, batch.num_rows()); + }) + .fold(0, |acc, _| async move { acc + 1i32 }) + .await; + + Ok(()) + } + + #[cfg(feature = "avro")] + #[tokio::test] + async fn read_alltypes_plain_avro() -> Result<()> { + let table = load_table("alltypes_plain.avro")?; + + let x: Vec = table + .schema() + .fields() + .iter() + .map(|f| format!("{}: {:?}", f.name(), f.data_type())) + .collect(); + let y = x.join("\n"); + assert_eq!( + "id: Int32\n\ + bool_col: Boolean\n\ + tinyint_col: Int32\n\ + smallint_col: Int32\n\ + int_col: Int32\n\ + bigint_col: Int64\n\ + float_col: Float32\n\ + double_col: Float64\n\ + date_string_col: Binary\n\ + string_col: Binary\n\ + timestamp_col: Timestamp(Microsecond, None)", + y + ); + + let projection = None; + let batch = get_first_batch(table, &projection).await?; + let expected = vec![ + "+----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+", + "| id | bool_col | tinyint_col | smallint_col | int_col | bigint_col | float_col | double_col | date_string_col | string_col | timestamp_col |", + "+----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+", + "| 4 | true | 0 | 0 | 0 | 0 | 0 | 0 | 30332f30312f3039 | 30 | 2009-03-01 00:00:00 |", + "| 5 | false | 1 | 1 | 1 | 10 | 1.1 | 10.1 | 30332f30312f3039 | 31 | 2009-03-01 00:01:00 |", + "| 6 | true | 0 | 0 | 0 | 0 | 0 | 0 | 30342f30312f3039 | 30 | 2009-04-01 00:00:00 |", + "| 7 | false | 1 | 1 | 1 | 10 | 1.1 | 10.1 | 30342f30312f3039 | 31 | 2009-04-01 00:01:00 |", + "| 2 | true | 0 | 0 | 0 | 0 | 0 | 0 | 30322f30312f3039 | 30 | 2009-02-01 00:00:00 |", + "| 3 | false | 1 | 1 | 1 | 10 | 1.1 | 10.1 | 30322f30312f3039 | 31 | 2009-02-01 00:01:00 |", + "| 0 | true | 0 | 0 | 0 | 0 | 0 | 0 | 30312f30312f3039 | 30 | 2009-01-01 00:00:00 |", + "| 1 | false | 1 | 1 | 1 | 10 | 1.1 | 10.1 | 30312f30312f3039 | 31 | 2009-01-01 00:01:00 |", + "+----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+", + ]; + + crate::assert_batches_eq!(expected, &[batch]); + Ok(()) + } + + #[tokio::test] + async fn read_bool_alltypes_plain_avro() -> Result<()> { + let table = load_table("alltypes_plain.avro")?; + let projection = Some(vec![1]); + let batch = get_first_batch(table, &projection).await?; + + assert_eq!(1, batch.num_columns()); + assert_eq!(8, batch.num_rows()); + + let array = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let mut values: Vec = vec![]; + for i in 0..batch.num_rows() { + values.push(array.value(i)); + } + + assert_eq!( + "[true, false, true, false, true, false, true, false]", + format!("{:?}", values) + ); + + Ok(()) + } + + #[tokio::test] + async fn read_i32_alltypes_plain_avro() -> Result<()> { + let table = load_table("alltypes_plain.avro")?; + let projection = Some(vec![0]); + let batch = get_first_batch(table, &projection).await?; + + assert_eq!(1, batch.num_columns()); + assert_eq!(8, batch.num_rows()); + + let array = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let mut values: Vec = vec![]; + for i in 0..batch.num_rows() { + values.push(array.value(i)); + } + + assert_eq!("[4, 5, 6, 7, 2, 3, 0, 1]", format!("{:?}", values)); + + Ok(()) + } + + #[tokio::test] + async fn read_i96_alltypes_plain_avro() -> Result<()> { + let table = load_table("alltypes_plain.avro")?; + let projection = Some(vec![10]); + let batch = get_first_batch(table, &projection).await?; + assert_eq!(1, batch.num_columns()); + assert_eq!(8, batch.num_rows()); + + let array = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let mut values: Vec = vec![]; + for i in 0..batch.num_rows() { + values.push(array.value(i)); + } + + assert_eq!("[1235865600000000, 1235865660000000, 1238544000000000, 1238544060000000, 1233446400000000, 1233446460000000, 1230768000000000, 1230768060000000]", format!("{:?}", values)); + + Ok(()) + } + + #[tokio::test] + async fn read_f32_alltypes_plain_avro() -> Result<()> { + let table = load_table("alltypes_plain.avro")?; + let projection = Some(vec![6]); + let batch = get_first_batch(table, &projection).await?; + + assert_eq!(1, batch.num_columns()); + assert_eq!(8, batch.num_rows()); + + let array = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let mut values: Vec = vec![]; + for i in 0..batch.num_rows() { + values.push(array.value(i)); + } + + assert_eq!( + "[0.0, 1.1, 0.0, 1.1, 0.0, 1.1, 0.0, 1.1]", + format!("{:?}", values) + ); + + Ok(()) + } + + #[tokio::test] + async fn read_f64_alltypes_plain_avro() -> Result<()> { + let table = load_table("alltypes_plain.avro")?; + let projection = Some(vec![7]); + let batch = get_first_batch(table, &projection).await?; + + assert_eq!(1, batch.num_columns()); + assert_eq!(8, batch.num_rows()); + + let array = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let mut values: Vec = vec![]; + for i in 0..batch.num_rows() { + values.push(array.value(i)); + } + + assert_eq!( + "[0.0, 10.1, 0.0, 10.1, 0.0, 10.1, 0.0, 10.1]", + format!("{:?}", values) + ); + + Ok(()) + } + + #[tokio::test] + async fn read_binary_alltypes_plain_avro() -> Result<()> { + let table = load_table("alltypes_plain.avro")?; + let projection = Some(vec![9]); + let batch = get_first_batch(table, &projection).await?; + + assert_eq!(1, batch.num_columns()); + assert_eq!(8, batch.num_rows()); + + let array = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let mut values: Vec<&str> = vec![]; + for i in 0..batch.num_rows() { + values.push(std::str::from_utf8(array.value(i)).unwrap()); + } + + assert_eq!( + "[\"0\", \"1\", \"0\", \"1\", \"0\", \"1\", \"0\", \"1\"]", + format!("{:?}", values) + ); + + Ok(()) + } + + fn load_table(name: &str) -> Result> { + let testdata = crate::test_util::arrow_test_data(); + let filename = format!("{}/avro/{}", testdata, name); + let table = AvroFile::try_new(&filename, AvroReadOptions::default())?; + Ok(Arc::new(table)) + } + + async fn get_first_batch( + table: Arc, + projection: &Option>, + ) -> Result { + let exec = table.scan(projection, 1024, &[], None)?; + let mut it = exec.execute(0).await?; + it.next() + .await + .expect("should have received at least one batch") + .map_err(|e| e.into()) + } +} diff --git a/datafusion/src/datasource/mod.rs b/datafusion/src/datasource/mod.rs index 53ba5177a2fc..cfa90036b163 100644 --- a/datafusion/src/datasource/mod.rs +++ b/datafusion/src/datasource/mod.rs @@ -17,6 +17,7 @@ //! DataFusion data sources +pub mod avro; pub mod csv; pub mod datasource; pub mod empty; diff --git a/datafusion/src/error.rs b/datafusion/src/error.rs index 903faeabf695..6b6bb1381111 100644 --- a/datafusion/src/error.rs +++ b/datafusion/src/error.rs @@ -23,6 +23,8 @@ use std::io; use std::result; use arrow::error::ArrowError; +#[cfg(feature = "avro")] +use avro_rs::Error as AvroError; use parquet::errors::ParquetError; use sqlparser::parser::ParserError; @@ -37,6 +39,9 @@ pub enum DataFusionError { ArrowError(ArrowError), /// Wraps an error from the Parquet crate ParquetError(ParquetError), + /// Wraps an error from the Avro crate + #[cfg(feature = "avro")] + AvroError(AvroError), /// Error associated to I/O operations and associated traits. IoError(io::Error), /// Error returned when SQL is syntactically incorrect. @@ -83,6 +88,13 @@ impl From for DataFusionError { } } +#[cfg(feature = "avro")] +impl From for DataFusionError { + fn from(e: AvroError) -> Self { + DataFusionError::AvroError(e) + } +} + impl From for DataFusionError { fn from(e: ParserError) -> Self { DataFusionError::SQL(e) @@ -96,6 +108,10 @@ impl Display for DataFusionError { DataFusionError::ParquetError(ref desc) => { write!(f, "Parquet error: {}", desc) } + #[cfg(feature = "avro")] + DataFusionError::AvroError(ref desc) => { + write!(f, "Avro error: {}", desc) + } DataFusionError::IoError(ref desc) => write!(f, "IO error: {}", desc), DataFusionError::SQL(ref desc) => { write!(f, "SQL error: {:?}", desc) diff --git a/datafusion/src/execution/context.rs b/datafusion/src/execution/context.rs index 82947aaee1ba..5327c583cf7a 100644 --- a/datafusion/src/execution/context.rs +++ b/datafusion/src/execution/context.rs @@ -67,6 +67,8 @@ use crate::physical_optimizer::coalesce_batches::CoalesceBatches; use crate::physical_optimizer::merge_exec::AddCoalescePartitionsExec; use crate::physical_optimizer::repartition::Repartition; +use crate::datasource::avro::AvroFile; +use crate::physical_plan::avro::AvroReadOptions; use crate::physical_plan::csv::CsvReadOptions; use crate::physical_plan::planner::DefaultPhysicalPlanner; use crate::physical_plan::udf::ScalarUDF; @@ -197,6 +199,11 @@ impl ExecutionContext { let plan = LogicalPlanBuilder::empty(false).build()?; Ok(Arc::new(DataFrameImpl::new(self.state.clone(), &plan))) } + FileType::Avro => { + self.register_avro(name, location, AvroReadOptions::default())?; + let plan = LogicalPlanBuilder::empty(false).build()?; + Ok(Arc::new(DataFrameImpl::new(self.state.clone(), &plan))) + } _ => Err(DataFusionError::NotImplemented(format!( "Unsupported file type {:?}.", file_type @@ -271,6 +278,19 @@ impl ExecutionContext { .insert(f.name.clone(), Arc::new(f)); } + /// Creates a DataFrame for reading an Avro data source. + + pub fn read_avro( + &mut self, + filename: impl Into, + options: AvroReadOptions, + ) -> Result> { + Ok(Arc::new(DataFrameImpl::new( + self.state.clone(), + &LogicalPlanBuilder::scan_avro(filename, options, None)?.build()?, + ))) + } + /// Creates a DataFrame for reading a CSV data source. pub fn read_csv( &mut self, @@ -334,6 +354,18 @@ impl ExecutionContext { Ok(()) } + /// Registers an Avro data source so that it can be referenced from SQL statements + /// executed against this context. + pub fn register_avro( + &mut self, + name: &str, + filename: &str, + options: AvroReadOptions, + ) -> Result<()> { + self.register_table(name, Arc::new(AvroFile::try_new(filename, options)?))?; + Ok(()) + } + /// Registers a named catalog using a custom `CatalogProvider` so that /// it can be referenced from SQL statements executed against this /// context. diff --git a/datafusion/src/lib.rs b/datafusion/src/lib.rs index eac9b5f5a78a..adaca114d2d6 100644 --- a/datafusion/src/lib.rs +++ b/datafusion/src/lib.rs @@ -212,6 +212,7 @@ extern crate sqlparser; +pub mod avro_to_arrow; pub mod catalog; pub mod dataframe; pub mod datasource; diff --git a/datafusion/src/logical_plan/builder.rs b/datafusion/src/logical_plan/builder.rs index f31dd3732883..5555939d74a8 100644 --- a/datafusion/src/logical_plan/builder.rs +++ b/datafusion/src/logical_plan/builder.rs @@ -36,10 +36,12 @@ use crate::{ use super::dfschema::ToDFSchema; use super::{exprlist_to_fields, Expr, JoinConstraint, JoinType, LogicalPlan, PlanType}; +use crate::datasource::avro::AvroFile; use crate::logical_plan::{ columnize_expr, normalize_col, normalize_cols, Column, DFField, DFSchema, DFSchemaRef, Partitioning, }; +use crate::physical_plan::avro::AvroReadOptions; /// Default table name for unnamed table pub const UNNAMED_TABLE: &str = "?table?"; @@ -154,6 +156,27 @@ impl LogicalPlanBuilder { Self::scan(table_name, provider, projection) } + /// Scan an Avro data source + pub fn scan_avro( + path: impl Into, + options: AvroReadOptions, + projection: Option>, + ) -> Result { + let path = path.into(); + Self::scan_avro_with_name(path.clone(), options, projection, path) + } + + /// Scan an Avro data source and register it with a given table name + pub fn scan_avro_with_name( + path: impl Into, + options: AvroReadOptions, + projection: Option>, + table_name: impl Into, + ) -> Result { + let provider = Arc::new(AvroFile::try_new(&path.into(), options)?); + Self::scan(table_name, provider, projection) + } + /// Scan an empty data source, mainly used in tests pub fn scan_empty( name: Option<&str>, diff --git a/datafusion/src/logical_plan/dfschema.rs b/datafusion/src/logical_plan/dfschema.rs index c067b5f963ee..1ef8ac764d46 100644 --- a/datafusion/src/logical_plan/dfschema.rs +++ b/datafusion/src/logical_plan/dfschema.rs @@ -167,7 +167,6 @@ impl DFSchema { (None, Some(_)) | (None, None) => field.name() == name, }) .map(|(idx, _)| idx); - match matches.next() { None => Err(DataFusionError::Plan(format!( "No field named '{}.{}'. Valid fields are {}.", diff --git a/datafusion/src/logical_plan/expr.rs b/datafusion/src/logical_plan/expr.rs index ec017d0765f9..eb46099dff64 100644 --- a/datafusion/src/logical_plan/expr.rs +++ b/datafusion/src/logical_plan/expr.rs @@ -1911,17 +1911,13 @@ mod tests { impl ExprRewriter for FooBarRewriter { fn mutate(&mut self, expr: Expr) -> Result { match expr { - Expr::Literal(scalar) => { - if let ScalarValue::Utf8(Some(utf8_val)) = scalar { - let utf8_val = if utf8_val == "foo" { - "bar".to_string() - } else { - utf8_val - }; - Ok(lit(utf8_val)) + Expr::Literal(ScalarValue::Utf8(Some(utf8_val))) => { + let utf8_val = if utf8_val == "foo" { + "bar".to_string() } else { - Ok(Expr::Literal(scalar)) - } + utf8_val + }; + Ok(lit(utf8_val)) } // otherwise, return the expression unchanged expr => Ok(expr), diff --git a/datafusion/src/physical_plan/avro.rs b/datafusion/src/physical_plan/avro.rs new file mode 100644 index 000000000000..3f0b007b26c0 --- /dev/null +++ b/datafusion/src/physical_plan/avro.rs @@ -0,0 +1,457 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Execution plan for reading line-delimited Avro files +#[cfg(feature = "avro")] +use super::RecordBatchStream; +use super::{common, source::Source, ExecutionPlan, Partitioning}; +use crate::avro_to_arrow::read_avro_schema_from_reader; +use crate::error::{DataFusionError, Result}; +use crate::physical_plan::{DisplayFormatType, Statistics}; +use arrow::datatypes::{Schema, SchemaRef}; +#[cfg(feature = "avro")] +use arrow::{error::Result as ArrowResult, record_batch::RecordBatch}; +use async_trait::async_trait; +#[cfg(feature = "avro")] +use futures::Stream; +use std::fs::File; +use std::{any::Any, io::Seek}; +use std::{ + io::Read, + sync::{Arc, Mutex}, +}; +#[cfg(feature = "avro")] +use std::{ + pin::Pin, + task::{Context, Poll}, +}; + +/// Line-delimited Avro read options +#[derive(Clone)] +pub struct AvroReadOptions<'a> { + /// The data source schema. + pub schema: Option, + + /// File extension; only files with this extension are selected for data input. + /// Defaults to ".avro". + pub file_extension: &'a str, +} + +impl<'a> Default for AvroReadOptions<'a> { + fn default() -> Self { + Self { + schema: None, + file_extension: ".avro", + } + } +} + +trait SeekRead: Read + Seek {} + +impl SeekRead for T {} +/// Execution plan for scanning Avro data source +#[derive(Debug)] +pub struct AvroExec { + source: Source>, + schema: SchemaRef, + projection: Option>, + projected_schema: SchemaRef, + file_extension: String, + batch_size: usize, + limit: Option, +} + +impl AvroExec { + /// Create a new execution plan for reading from a path + pub fn try_from_path( + path: &str, + options: AvroReadOptions, + projection: Option>, + batch_size: usize, + limit: Option, + ) -> Result { + let file_extension = options.file_extension.to_string(); + + let filenames = common::build_file_list(path, &file_extension)?; + + if filenames.is_empty() { + return Err(DataFusionError::Execution(format!( + "No files found at {path} with file extension {file_extension}", + path = path, + file_extension = file_extension.as_str() + ))); + } + + let schema = match options.schema { + Some(s) => s, + None => Arc::new(AvroExec::try_read_schema(filenames.as_slice())?), + }; + + let projected_schema = match &projection { + None => schema.clone(), + Some(p) => Arc::new(Schema::new( + p.iter().map(|i| schema.field(*i).clone()).collect(), + )), + }; + + Ok(Self { + source: Source::PartitionedFiles { + path: path.to_string(), + filenames, + }, + schema, + projected_schema, + file_extension, + projection, + batch_size, + limit, + }) + } + /// Create a new execution plan for reading from a reader + pub fn try_new_from_reader( + reader: impl Read + Seek + Send + Sync + 'static, + options: AvroReadOptions, + projection: Option>, + batch_size: usize, + limit: Option, + ) -> Result { + let schema = match options.schema { + Some(s) => s, + None => { + return Err(DataFusionError::Execution( + "The schema must be provided in options when reading from a reader" + .to_string(), + )); + } + }; + + let projected_schema = match &projection { + None => schema.clone(), + Some(p) => Arc::new(Schema::new( + p.iter().map(|i| schema.field(*i).clone()).collect(), + )), + }; + + Ok(Self { + source: Source::Reader(Mutex::new(Some(Box::new(reader)))), + schema, + file_extension: String::new(), + projection, + projected_schema, + batch_size, + limit, + }) + } + + /// Path to directory containing partitioned CSV files with the same schema + pub fn path(&self) -> &str { + self.source.path() + } + + /// The individual files under path + pub fn filenames(&self) -> &[String] { + self.source.filenames() + } + + /// File extension + pub fn file_extension(&self) -> &str { + &self.file_extension + } + + /// Get the schema of the avro file + pub fn file_schema(&self) -> SchemaRef { + self.schema.clone() + } + + /// Optional projection for which columns to load + pub fn projection(&self) -> Option<&Vec> { + self.projection.as_ref() + } + + /// Batch size + pub fn batch_size(&self) -> usize { + self.batch_size + } + + /// Limit + pub fn limit(&self) -> Option { + self.limit + } + + /// Read schema for given Avro dataset + pub fn try_read_schema(filenames: &[String]) -> Result { + let mut schemas = Vec::new(); + for filename in filenames { + let mut file = File::open(filename)?; + let schema = read_avro_schema_from_reader(&mut file)?; + schemas.push(schema); + } + + Ok(Schema::try_merge(schemas)?) + } +} + +#[async_trait] +impl ExecutionPlan for AvroExec { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + self.projected_schema.clone() + } + + fn output_partitioning(&self) -> Partitioning { + Partitioning::UnknownPartitioning(match &self.source { + Source::PartitionedFiles { filenames, .. } => filenames.len(), + Source::Reader(_) => 1, + }) + } + + fn children(&self) -> Vec> { + Vec::new() + } + + fn with_new_children( + &self, + children: Vec>, + ) -> Result> { + if !children.is_empty() { + Err(DataFusionError::Internal(format!( + "Children cannot be replaced in {:?}", + self + ))) + } else if let Source::PartitionedFiles { filenames, path } = &self.source { + Ok(Arc::new(Self { + source: Source::PartitionedFiles { + filenames: filenames.clone(), + path: path.clone(), + }, + schema: self.schema.clone(), + projection: self.projection.clone(), + projected_schema: self.projected_schema.clone(), + batch_size: self.batch_size, + limit: self.limit, + file_extension: self.file_extension.clone(), + })) + } else { + Err(DataFusionError::Internal( + "AvroExec with reader source cannot be used with `with_new_children`" + .to_string(), + )) + } + } + + #[cfg(not(feature = "avro"))] + async fn execute( + &self, + _partition: usize, + ) -> Result { + Err(DataFusionError::NotImplemented( + "Cannot execute avro plan without avro feature enabled".to_string(), + )) + } + + #[cfg(feature = "avro")] + async fn execute( + &self, + partition: usize, + ) -> Result { + let mut builder = crate::avro_to_arrow::ReaderBuilder::new() + .with_schema(self.schema.clone()) + .with_batch_size(self.batch_size); + if let Some(proj) = &self.projection { + builder = builder.with_projection( + proj.iter() + .map(|col_idx| self.schema.field(*col_idx).name()) + .cloned() + .collect(), + ); + } + match &self.source { + Source::PartitionedFiles { filenames, .. } => { + let file = File::open(&filenames[partition])?; + + Ok(Box::pin(AvroStream::new(builder.build(file)?, self.limit))) + } + Source::Reader(rdr) => { + if partition != 0 { + Err(DataFusionError::Internal( + "Only partition 0 is valid when Avro comes from a reader" + .to_string(), + )) + } else if let Some(rdr) = rdr.lock().unwrap().take() { + Ok(Box::pin(AvroStream::new(builder.build(rdr)?, self.limit))) + } else { + Err(DataFusionError::Execution( + "Error reading Avro: Data can only be read a single time when the source is a reader" + .to_string(), + )) + } + } + } + } + + fn fmt_as( + &self, + t: DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + match t { + DisplayFormatType::Default => { + write!( + f, + "AvroExec: source={}, batch_size={}, limit={:?}", + self.source, self.batch_size, self.limit + ) + } + } + } + + fn statistics(&self) -> Statistics { + Statistics::default() + } +} + +#[cfg(feature = "avro")] +struct AvroStream<'a, R: Read> { + reader: crate::avro_to_arrow::Reader<'a, R>, + remain: Option, +} + +#[cfg(feature = "avro")] +impl<'a, R: Read> AvroStream<'a, R> { + fn new(reader: crate::avro_to_arrow::Reader<'a, R>, limit: Option) -> Self { + Self { + reader, + remain: limit, + } + } +} + +#[cfg(feature = "avro")] +impl Stream for AvroStream<'_, R> { + type Item = ArrowResult; + + fn poll_next( + mut self: Pin<&mut Self>, + _cx: &mut Context<'_>, + ) -> Poll> { + if let Some(remain) = self.remain.as_mut() { + if *remain < 1 { + return Poll::Ready(None); + } + } + + Poll::Ready(match self.reader.next() { + Ok(Some(item)) => { + if let Some(remain) = self.remain.as_mut() { + if *remain >= item.num_rows() { + *remain -= item.num_rows(); + Some(Ok(item)) + } else { + let len = *remain; + *remain = 0; + Some(Ok(RecordBatch::try_new( + item.schema(), + item.columns() + .iter() + .map(|column| column.slice(0, len)) + .collect(), + )?)) + } + } else { + Some(Ok(item)) + } + } + Ok(None) => None, + Err(err) => Some(Err(err)), + }) + } +} + +#[cfg(feature = "avro")] +impl RecordBatchStream for AvroStream<'_, R> { + fn schema(&self) -> SchemaRef { + self.reader.schema() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + #[cfg(feature = "avro")] + async fn test() -> Result<()> { + use futures::StreamExt; + + let testdata = crate::test_util::arrow_test_data(); + let filename = format!("{}/avro/alltypes_plain.avro", testdata); + let avro_exec = AvroExec::try_from_path( + &filename, + AvroReadOptions::default(), + Some(vec![0, 1, 2]), + 1024, + None, + )?; + assert_eq!(avro_exec.output_partitioning().partition_count(), 1); + + let mut results = avro_exec.execute(0).await?; + let batch = results.next().await.unwrap()?; + + assert_eq!(8, batch.num_rows()); + assert_eq!(3, batch.num_columns()); + + let schema = batch.schema(); + let field_names: Vec<&str> = + schema.fields().iter().map(|f| f.name().as_str()).collect(); + assert_eq!(vec!["id", "bool_col", "tinyint_col"], field_names); + + let batch = results.next().await; + assert!(batch.is_none()); + + let batch = results.next().await; + assert!(batch.is_none()); + + let batch = results.next().await; + assert!(batch.is_none()); + + Ok(()) + } + + #[tokio::test] + #[cfg(not(feature = "avro"))] + async fn test() -> Result<()> { + let testdata = crate::test_util::arrow_test_data(); + let filename = format!("{}/avro/alltypes_plain.avro", testdata); + let avro_exec = AvroExec::try_from_path( + &filename, + AvroReadOptions::default(), + Some(vec![0, 1, 2]), + 1024, + None, + ); + assert!(matches!( + avro_exec, + Err(DataFusionError::NotImplemented(msg)) + if msg == *"cannot read avro schema without the 'avro' feature enabled" + )); + + Ok(()) + } +} diff --git a/datafusion/src/physical_plan/common.rs b/datafusion/src/physical_plan/common.rs index d0b7a07f3b79..3be9e7245eb7 100644 --- a/datafusion/src/physical_plan/common.rs +++ b/datafusion/src/physical_plan/common.rs @@ -107,6 +107,20 @@ pub(crate) fn combine_batches( } } +/// Recursively builds a list of files in a directory with a given extension +pub fn build_checked_file_list(dir: &str, ext: &str) -> Result> { + let mut filenames: Vec = Vec::new(); + build_file_list_recurse(dir, &mut filenames, ext)?; + if filenames.is_empty() { + return Err(DataFusionError::Plan(format!( + "No files found at {path} with file extension {file_extension}", + path = dir, + file_extension = ext + ))); + } + Ok(filenames) +} + /// Recursively builds a list of files in a directory with a given extension pub fn build_file_list(dir: &str, ext: &str) -> Result> { let mut filenames: Vec = Vec::new(); diff --git a/datafusion/src/physical_plan/datetime_expressions.rs b/datafusion/src/physical_plan/datetime_expressions.rs index 39ae70d1b5d0..a776c42f3e9d 100644 --- a/datafusion/src/physical_plan/datetime_expressions.rs +++ b/datafusion/src/physical_plan/datetime_expressions.rs @@ -236,14 +236,8 @@ pub fn date_trunc(args: &[ColumnarValue]) -> Result { let f = |x: Option| x.map(|x| date_trunc_single(granularity, x)).transpose(); Ok(match array { - ColumnarValue::Scalar(scalar) => { - if let ScalarValue::TimestampNanosecond(v) = scalar { - ColumnarValue::Scalar(ScalarValue::TimestampNanosecond((f)(*v)?)) - } else { - return Err(DataFusionError::Execution( - "array of `date_trunc` must be non-null scalar Utf8".to_string(), - )); - } + ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(v)) => { + ColumnarValue::Scalar(ScalarValue::TimestampNanosecond((f)(*v)?)) } ColumnarValue::Array(array) => { let array = array @@ -257,6 +251,11 @@ pub fn date_trunc(args: &[ColumnarValue]) -> Result { ColumnarValue::Array(Arc::new(array)) } + _ => { + return Err(DataFusionError::Execution( + "array of `date_trunc` must be non-null scalar Utf8".to_string(), + )); + } }) } diff --git a/datafusion/src/physical_plan/mod.rs b/datafusion/src/physical_plan/mod.rs index af868871abb8..3701e908f971 100644 --- a/datafusion/src/physical_plan/mod.rs +++ b/datafusion/src/physical_plan/mod.rs @@ -603,6 +603,7 @@ pub trait Accumulator: Send + Sync + Debug { pub mod aggregates; pub mod analyze; pub mod array_expressions; +pub mod avro; pub mod coalesce_batches; pub mod coalesce_partitions; pub mod common; diff --git a/datafusion/src/physical_plan/source.rs b/datafusion/src/physical_plan/source.rs index 012405a38a1a..32fa9c37c8a2 100644 --- a/datafusion/src/physical_plan/source.rs +++ b/datafusion/src/physical_plan/source.rs @@ -46,7 +46,7 @@ impl std::fmt::Debug for Source { Ok(()) } } -impl std::fmt::Display for Source { +impl std::fmt::Display for Source { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { Source::PartitionedFiles { path, filenames } => { diff --git a/datafusion/src/physical_plan/string_expressions.rs b/datafusion/src/physical_plan/string_expressions.rs index 09e19c4dfa47..7cbebcec7eb3 100644 --- a/datafusion/src/physical_plan/string_expressions.rs +++ b/datafusion/src/physical_plan/string_expressions.rs @@ -290,6 +290,7 @@ pub fn concat(args: &[ColumnarValue]) -> Result { .map(|index| { let mut owned_string: String = "".to_owned(); for arg in args { + #[allow(clippy::collapsible_match)] match arg { ColumnarValue::Scalar(ScalarValue::Utf8(maybe_value)) => { if let Some(value) = maybe_value { diff --git a/datafusion/src/sql/parser.rs b/datafusion/src/sql/parser.rs index bb2f9e6bbb24..864801c00f72 100644 --- a/datafusion/src/sql/parser.rs +++ b/datafusion/src/sql/parser.rs @@ -43,6 +43,8 @@ pub enum FileType { Parquet, /// Comma separated values CSV, + /// Avro binary records + Avro, } impl FromStr for FileType { @@ -53,8 +55,9 @@ impl FromStr for FileType { "PARQUET" => Ok(Self::Parquet), "NDJSON" => Ok(Self::NdJson), "CSV" => Ok(Self::CSV), + "AVRO" => Ok(Self::Avro), other => Err(ParserError::ParserError(format!( - "expect one of PARQUET, NDJSON, or CSV, found: {}", + "expect one of PARQUET, AVRO, NDJSON, or CSV, found: {}", other ))), } @@ -390,10 +393,21 @@ mod tests { }); expect_parse_ok(sql, expected)?; + // positive case: it is ok for avro files not to have columns specified + let sql = "CREATE EXTERNAL TABLE t STORED AS AVRO LOCATION 'foo.avro'"; + let expected = Statement::CreateExternalTable(CreateExternalTable { + name: "t".into(), + columns: vec![], + file_type: FileType::Avro, + has_header: false, + location: "foo.avro".into(), + }); + expect_parse_ok(sql, expected)?; + // Error cases: Invalid type let sql = "CREATE EXTERNAL TABLE t(c1 int) STORED AS UNKNOWN_TYPE LOCATION 'foo.csv'"; - expect_parse_error(sql, "expect one of PARQUET, NDJSON, or CSV"); + expect_parse_error(sql, "expect one of PARQUET, AVRO, NDJSON, or CSV"); Ok(()) } diff --git a/datafusion/src/sql/planner.rs b/datafusion/src/sql/planner.rs index e613ff385b39..50c36dde5831 100644 --- a/datafusion/src/sql/planner.rs +++ b/datafusion/src/sql/planner.rs @@ -212,6 +212,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { } } FileType::NdJson => {} + FileType::Avro => {} }; let schema = self.build_schema(columns)?; diff --git a/datafusion/tests/sql.rs b/datafusion/tests/sql.rs index 90173be6ac64..89163390b19e 100644 --- a/datafusion/tests/sql.rs +++ b/datafusion/tests/sql.rs @@ -27,20 +27,16 @@ use chrono::Duration; extern crate arrow; extern crate datafusion; -use arrow::{array::*, datatypes::TimeUnit}; -use arrow::{datatypes::Int32Type, datatypes::Int64Type, record_batch::RecordBatch}; use arrow::{ - datatypes::{ - ArrowNativeType, ArrowPrimitiveType, ArrowTimestampType, DataType, Field, Schema, - SchemaRef, TimestampMicrosecondType, TimestampMillisecondType, - TimestampNanosecondType, TimestampSecondType, - }, + array::*, datatypes::*, record_batch::RecordBatch, util::display::array_value_to_string, }; use datafusion::assert_batches_eq; use datafusion::assert_batches_sorted_eq; use datafusion::logical_plan::LogicalPlan; +#[cfg(feature = "avro")] +use datafusion::physical_plan::avro::AvroReadOptions; use datafusion::physical_plan::metrics::MetricValue; use datafusion::physical_plan::ExecutionPlan; use datafusion::physical_plan::ExecutionPlanVisitor; @@ -2960,6 +2956,17 @@ fn register_alltypes_parquet(ctx: &mut ExecutionContext) { .unwrap(); } +#[cfg(feature = "avro")] +fn register_alltypes_avro(ctx: &mut ExecutionContext) { + let testdata = datafusion::test_util::arrow_test_data(); + ctx.register_avro( + "alltypes_plain", + &format!("{}/avro/alltypes_plain.avro", testdata), + AvroReadOptions::default(), + ) + .unwrap(); +} + /// Execute query and return result set as 2-d table of Vecs /// `result[row][column]` async fn execute_to_batches(ctx: &mut ExecutionContext, sql: &str) -> Vec { @@ -4685,3 +4692,137 @@ async fn test_regexp_is_match() -> Result<()> { assert_batches_eq!(expected, &actual); Ok(()) } + +#[cfg(feature = "avro")] +#[tokio::test] +async fn avro_query() { + let mut ctx = ExecutionContext::new(); + register_alltypes_avro(&mut ctx); + // NOTE that string_col is actually a binary column and does not have the UTF8 logical type + // so we need an explicit cast + let sql = "SELECT id, CAST(string_col AS varchar) FROM alltypes_plain"; + let actual = execute_to_batches(&mut ctx, sql).await; + let expected = vec![ + "+----+-----------------------------------------+", + "| id | CAST(alltypes_plain.string_col AS Utf8) |", + "+----+-----------------------------------------+", + "| 4 | 0 |", + "| 5 | 1 |", + "| 6 | 0 |", + "| 7 | 1 |", + "| 2 | 0 |", + "| 3 | 1 |", + "| 0 | 0 |", + "| 1 | 1 |", + "+----+-----------------------------------------+", + ]; + + assert_batches_eq!(expected, &actual); +} + +#[cfg(feature = "avro")] +#[tokio::test] +async fn avro_query_multiple_files() { + let tempdir = tempfile::tempdir().unwrap(); + let table_path = tempdir.path(); + let testdata = datafusion::test_util::arrow_test_data(); + let alltypes_plain_file = format!("{}/avro/alltypes_plain.avro", testdata); + std::fs::copy( + &alltypes_plain_file, + format!("{}/alltypes_plain1.avro", table_path.display()), + ) + .unwrap(); + std::fs::copy( + &alltypes_plain_file, + format!("{}/alltypes_plain2.avro", table_path.display()), + ) + .unwrap(); + + let mut ctx = ExecutionContext::new(); + ctx.register_avro( + "alltypes_plain", + table_path.display().to_string().as_str(), + AvroReadOptions::default(), + ) + .unwrap(); + // NOTE that string_col is actually a binary column and does not have the UTF8 logical type + // so we need an explicit cast + let sql = "SELECT id, CAST(string_col AS varchar) FROM alltypes_plain"; + let actual = execute_to_batches(&mut ctx, sql).await; + let expected = vec![ + "+----+-----------------------------------------+", + "| id | CAST(alltypes_plain.string_col AS Utf8) |", + "+----+-----------------------------------------+", + "| 4 | 0 |", + "| 5 | 1 |", + "| 6 | 0 |", + "| 7 | 1 |", + "| 2 | 0 |", + "| 3 | 1 |", + "| 0 | 0 |", + "| 1 | 1 |", + "| 4 | 0 |", + "| 5 | 1 |", + "| 6 | 0 |", + "| 7 | 1 |", + "| 2 | 0 |", + "| 3 | 1 |", + "| 0 | 0 |", + "| 1 | 1 |", + "+----+-----------------------------------------+", + ]; + + assert_batches_eq!(expected, &actual); +} + +#[cfg(feature = "avro")] +#[tokio::test] +async fn avro_single_nan_schema() { + let mut ctx = ExecutionContext::new(); + let testdata = datafusion::test_util::arrow_test_data(); + ctx.register_avro( + "single_nan", + &format!("{}/avro/single_nan.avro", testdata), + AvroReadOptions::default(), + ) + .unwrap(); + let sql = "SELECT mycol FROM single_nan"; + let plan = ctx.create_logical_plan(sql).unwrap(); + let plan = ctx.optimize(&plan).unwrap(); + let plan = ctx.create_physical_plan(&plan).unwrap(); + let results = collect(plan).await.unwrap(); + for batch in results { + assert_eq!(1, batch.num_rows()); + assert_eq!(1, batch.num_columns()); + } +} + +#[cfg(feature = "avro")] +#[tokio::test] +async fn avro_explain() { + let mut ctx = ExecutionContext::new(); + register_alltypes_avro(&mut ctx); + + let sql = "EXPLAIN SELECT count(*) from alltypes_plain"; + let actual = execute(&mut ctx, sql).await; + let actual = normalize_vec_for_explain(actual); + let expected = vec![ + vec![ + "logical_plan", + "Projection: #COUNT(UInt8(1))\ + \n Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1))]]\ + \n TableScan: alltypes_plain projection=Some([0])", + ], + vec![ + "physical_plan", + "ProjectionExec: expr=[COUNT(UInt8(1))@0 as COUNT(UInt8(1))]\ + \n HashAggregateExec: mode=Final, gby=[], aggr=[COUNT(UInt8(1))]\ + \n CoalescePartitionsExec\ + \n HashAggregateExec: mode=Partial, gby=[], aggr=[COUNT(UInt8(1))]\ + \n RepartitionExec: partitioning=RoundRobinBatch(NUM_CORES)\ + \n AvroExec: source=Path(ARROW_TEST_DATA/avro/alltypes_plain.avro: [ARROW_TEST_DATA/avro/alltypes_plain.avro]), batch_size=8192, limit=None\ + \n", + ], + ]; + assert_eq!(expected, actual); +} diff --git a/testing b/testing index b658b087767b..a8f7be380531 160000 --- a/testing +++ b/testing @@ -1 +1 @@ -Subproject commit b658b087767b041b2081766814655b4dd5a9a439 +Subproject commit a8f7be380531758eb7962542a5eb020d8795aa20