Skip to content

Commit

Permalink
Create SchemaAdapter trait to map table schema to file schemas (#1709)
Browse files Browse the repository at this point in the history
* Create SchemaAdapter trait to map table schema to file schemas

* Linting fix

* Remove commented code
  • Loading branch information
thinkharderdev authored Jan 31, 2022
1 parent d01d8d5 commit 7bec762
Show file tree
Hide file tree
Showing 6 changed files with 330 additions and 58 deletions.
64 changes: 63 additions & 1 deletion datafusion/src/physical_plan/file_format/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,13 +165,14 @@ impl ExecutionPlan for AvroExec {
#[cfg(test)]
#[cfg(feature = "avro")]
mod tests {

use crate::datasource::file_format::{avro::AvroFormat, FileFormat};
use crate::datasource::object_store::local::{
local_object_reader_stream, local_unpartitioned_file, LocalFileSystem,
};
use crate::scalar::ScalarValue;
use arrow::datatypes::{DataType, Field, Schema};
use futures::StreamExt;
use sqlparser::ast::ObjectType::Schema;

use super::*;

Expand Down Expand Up @@ -228,6 +229,67 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn avro_exec_missing_column() -> Result<()> {
let testdata = crate::test_util::arrow_test_data();
let filename = format!("{}/avro/alltypes_plain.avro", testdata);
let actual_schema = AvroFormat {}
.infer_schema(local_object_reader_stream(vec![filename]))
.await?;

let mut fields = actual_schema.fields().clone();
fields.push(Field::new("missing_col", DataType::Int32, true));

let file_schema = Arc::new(Schema::new(fields));

let avro_exec = AvroExec::new(FileScanConfig {
object_store: Arc::new(LocalFileSystem {}),
file_groups: vec![vec![local_unpartitioned_file(filename.clone())]],
file_schema,
statistics: Statistics::default(),
// Include the missing column in the projection
projection: Some(vec![0, 1, 2, file_schema.fields().len()]),
limit: None,
table_partition_cols: vec![],
});
assert_eq!(avro_exec.output_partitioning().partition_count(), 1);

let mut results = avro_exec.execute(0).await.expect("plan execution failed");
let batch = results
.next()
.await
.expect("plan iterator empty")
.expect("plan iterator returned an error");

let expected = vec![
"+----+----------+-------------+-------------+",
"| id | bool_col | tinyint_col | missing_col |",
"+----+----------+-------------+-------------+",
"| 4 | true | 0 | |",
"| 5 | false | 1 | |",
"| 6 | true | 0 | |",
"| 7 | false | 1 | |",
"| 2 | true | 0 | |",
"| 3 | false | 1 | |",
"| 0 | true | 0 | |",
"| 1 | false | 1 | |",
"+----+----------+-------------+-------------+",
];

crate::assert_batches_eq!(expected, &[batch]);

let batch = results.next().await;
assert!(batch.is_none());

let batch = results.next().await;
assert!(batch.is_none());

let batch = results.next().await;
assert!(batch.is_none());

Ok(())
}

#[tokio::test]
async fn avro_exec_with_partition() -> Result<()> {
let testdata = crate::test_util::arrow_test_data();
Expand Down
47 changes: 47 additions & 0 deletions datafusion/src/physical_plan/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ impl ExecutionPlan for CsvExec {
#[cfg(test)]
mod tests {
use super::*;
use crate::test_util::aggr_test_schema_with_missing_col;
use crate::{
datasource::object_store::local::{local_unpartitioned_file, LocalFileSystem},
scalar::ScalarValue,
Expand Down Expand Up @@ -269,6 +270,52 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn csv_exec_with_missing_column() -> Result<()> {
let runtime = Arc::new(RuntimeEnv::default());
let file_schema = aggr_test_schema_with_missing_col();
let testdata = crate::test_util::arrow_test_data();
let filename = "aggregate_test_100.csv";
let path = format!("{}/csv/{}", testdata, filename);
let csv = CsvExec::new(
FileScanConfig {
object_store: Arc::new(LocalFileSystem {}),
file_schema,
file_groups: vec![vec![local_unpartitioned_file(path)]],
statistics: Statistics::default(),
projection: None,
limit: Some(5),
table_partition_cols: vec![],
},
true,
b',',
);
assert_eq!(14, csv.base_config.file_schema.fields().len());
assert_eq!(14, csv.projected_schema.fields().len());
assert_eq!(14, csv.schema().fields().len());

let mut it = csv.execute(0, runtime).await?;
let batch = it.next().await.unwrap()?;
assert_eq!(14, batch.num_columns());
assert_eq!(5, batch.num_rows());

let expected = vec![
"+----+----+-----+--------+------------+----------------------+-----+-------+------------+----------------------+-------------+---------------------+--------------------------------+-------------+",
"| c1 | c2 | c3 | c4 | c5 | c6 | c7 | c8 | c9 | c10 | c11 | c12 | c13 | missing_col |",
"+----+----+-----+--------+------------+----------------------+-----+-------+------------+----------------------+-------------+---------------------+--------------------------------+-------------+",
"| c | 2 | 1 | 18109 | 2033001162 | -6513304855495910254 | 25 | 43062 | 1491205016 | 5863949479783605708 | 0.110830784 | 0.9294097332465232 | 6WfVFBVGJSQb7FhA7E0lBwdvjfZnSW | |",
"| d | 5 | -40 | 22614 | 706441268 | -7542719935673075327 | 155 | 14337 | 3373581039 | 11720144131976083864 | 0.69632107 | 0.3114712539863804 | C2GT5KVyOPZpgKVl110TyZO0NcJ434 | |",
"| b | 1 | 29 | -18218 | 994303988 | 5983957848665088916 | 204 | 9489 | 3275293996 | 14857091259186476033 | 0.53840446 | 0.17909035118828576 | AyYVExXK6AR2qUTxNZ7qRHQOVGMLcz | |",
"| a | 1 | -85 | -15154 | 1171968280 | 1919439543497968449 | 77 | 52286 | 774637006 | 12101411955859039553 | 0.12285209 | 0.6864391962767343 | 0keZ5G8BffGwgF2RwQD59TFzMStxCB | |",
"| b | 5 | -82 | 22080 | 1824882165 | 7373730676428214987 | 208 | 34331 | 3342719438 | 3330177516592499461 | 0.82634634 | 0.40975383525297016 | Ig1QcuKsjHXkproePdERo2w0mYzIqd | |",
"+----+----+-----+--------+------------+----------------------+-----+-------+------------+----------------------+-------------+---------------------+--------------------------------+-------------+",
];

crate::assert_batches_eq!(expected, &[batch]);

Ok(())
}

#[tokio::test]
async fn csv_exec_with_partition() -> Result<()> {
let runtime = Arc::new(RuntimeEnv::default());
Expand Down
43 changes: 43 additions & 0 deletions datafusion/src/physical_plan/file_format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ impl ExecutionPlan for NdJsonExec {

#[cfg(test)]
mod tests {
use arrow::array::Array;
use arrow::datatypes::{Field, Schema};
use futures::StreamExt;

use crate::datasource::{
Expand Down Expand Up @@ -211,6 +213,47 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn nd_json_exec_file_with_missing_column() -> Result<()> {
let runtime = Arc::new(RuntimeEnv::default());
use arrow::datatypes::DataType;
let path = format!("{}/1.json", TEST_DATA_BASE);

let actual_schema = infer_schema(path.clone()).await?;

let mut fields = actual_schema.fields().clone();
fields.push(Field::new("missing_col", DataType::Int32, true));
let missing_field_idx = fields.len() - 1;

let file_schema = Arc::new(Schema::new(fields));

let exec = NdJsonExec::new(FileScanConfig {
object_store: Arc::new(LocalFileSystem {}),
file_groups: vec![vec![local_unpartitioned_file(path.clone())]],
file_schema,
statistics: Statistics::default(),
projection: None,
limit: Some(3),
table_partition_cols: vec![],
});

let mut it = exec.execute(0, runtime).await?;
let batch = it.next().await.unwrap()?;

assert_eq!(batch.num_rows(), 3);
let values = batch
.column(missing_field_idx)
.as_any()
.downcast_ref::<arrow::array::Int32Array>()
.unwrap();
assert_eq!(values.len(), 3);
assert!(values.is_null(0));
assert!(values.is_null(1));
assert!(values.is_null(2));

Ok(())
}

#[tokio::test]
async fn nd_json_exec_file_projection() -> Result<()> {
let runtime = Arc::new(RuntimeEnv::default());
Expand Down
140 changes: 140 additions & 0 deletions datafusion/src/physical_plan/file_format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,15 @@ pub use avro::AvroExec;
pub use csv::CsvExec;
pub use json::NdJsonExec;

use crate::error::DataFusionError;
use crate::{
datasource::{object_store::ObjectStore, PartitionedFile},
error::Result,
scalar::ScalarValue,
};
use arrow::array::new_null_array;
use lazy_static::lazy_static;
use log::info;
use std::{
collections::HashMap,
fmt::{Display, Formatter, Result as FmtResult},
Expand Down Expand Up @@ -165,6 +169,87 @@ impl<'a> Display for FileGroupsDisplay<'a> {
}
}

/// A utility which can adapt file-level record batches to a table schema which may have a schema
/// obtained from merging multiple file-level schemas.
///
/// This is useful for enabling schema evolution in partitioned datasets.
///
/// This has to be done in two stages.
///
/// 1. Before reading the file, we have to map projected column indexes from the table schema to
/// the file schema.
///
/// 2. After reading a record batch we need to map the read columns back to the expected columns
/// indexes and insert null-valued columns wherever the file schema was missing a colum present
/// in the table schema.
#[derive(Clone, Debug)]
pub(crate) struct SchemaAdapter {
/// Schema for the table
table_schema: SchemaRef,
}

impl SchemaAdapter {
pub(crate) fn new(table_schema: SchemaRef) -> SchemaAdapter {
Self { table_schema }
}

/// Map projected column indexes to the file schema. This will fail if the table schema
/// and the file schema contain a field with the same name and different types.
pub fn map_projections(
&self,
file_schema: &Schema,
projections: &[usize],
) -> Result<Vec<usize>> {
let mut mapped: Vec<usize> = vec![];
for idx in projections {
let field = self.table_schema.field(*idx);
if let Ok(mapped_idx) = file_schema.index_of(field.name().as_str()) {
if file_schema.field(mapped_idx).data_type() == field.data_type() {
mapped.push(mapped_idx)
} else {
let msg = format!("Failed to map column projection for field {}. Incompatible data types {:?} and {:?}", field.name(), file_schema.field(mapped_idx).data_type(), field.data_type());
info!("{}", msg);
return Err(DataFusionError::Execution(msg));
}
}
}
Ok(mapped)
}

/// Re-order projected columns by index in record batch to match table schema column ordering. If the record
/// batch does not contain a column for an expected field, insert a null-valued column at the
/// required column index.
pub fn adapt_batch(
&self,
batch: RecordBatch,
projections: &[usize],
) -> Result<RecordBatch> {
let batch_rows = batch.num_rows();

let batch_schema = batch.schema();

let mut cols: Vec<ArrayRef> = Vec::with_capacity(batch.columns().len());
let batch_cols = batch.columns().to_vec();

for field_idx in projections {
let table_field = &self.table_schema.fields()[*field_idx];
if let Some((batch_idx, _name)) =
batch_schema.column_with_name(table_field.name().as_str())
{
cols.push(batch_cols[batch_idx].clone());
} else {
cols.push(new_null_array(table_field.data_type(), batch_rows))
}
}

let projected_schema = Arc::new(self.table_schema.clone().project(projections)?);

let merged_batch = RecordBatch::try_new(projected_schema, cols)?;

Ok(merged_batch)
}
}

/// A helper that projects partition columns into the file record batches.
///
/// One interesting trick is the usage of a cache for the key buffers of the partition column
Expand Down Expand Up @@ -467,6 +552,61 @@ mod tests {
crate::assert_batches_eq!(expected, &[projected_batch]);
}

#[test]
fn schema_adapter_adapt_projections() {
let table_schema = Arc::new(Schema::new(vec![
Field::new("c1", DataType::Utf8, true),
Field::new("c2", DataType::Int64, true),
Field::new("c3", DataType::Int8, true),
]));

let file_schema = Schema::new(vec![
Field::new("c1", DataType::Utf8, true),
Field::new("c2", DataType::Int64, true),
]);

let file_schema_2 = Arc::new(Schema::new(vec![
Field::new("c3", DataType::Int8, true),
Field::new("c2", DataType::Int64, true),
]));

let file_schema_3 =
Arc::new(Schema::new(vec![Field::new("c3", DataType::Float32, true)]));

let adapter = SchemaAdapter::new(table_schema);

let projections1: Vec<usize> = vec![0, 1, 2];
let projections2: Vec<usize> = vec![2];

let mapped = adapter
.map_projections(&file_schema, projections1.as_slice())
.expect("mapping projections");

assert_eq!(mapped, vec![0, 1]);

let mapped = adapter
.map_projections(&file_schema, projections2.as_slice())
.expect("mapping projections");

assert!(mapped.is_empty());

let mapped = adapter
.map_projections(&file_schema_2, projections1.as_slice())
.expect("mapping projections");

assert_eq!(mapped, vec![1, 0]);

let mapped = adapter
.map_projections(&file_schema_2, projections2.as_slice())
.expect("mapping projections");

assert_eq!(mapped, vec![0]);

let mapped = adapter.map_projections(&file_schema_3, projections1.as_slice());

assert!(mapped.is_err());
}

// sets default for configs that play no role in projections
fn config_for_projection(
file_schema: SchemaRef,
Expand Down
Loading

0 comments on commit 7bec762

Please sign in to comment.