Skip to content

Commit

Permalink
validate and adjust Substrait NamedTable schemas (apache#12223) (apac…
Browse files Browse the repository at this point in the history
…he#12245)

* fix: producer did not emit base_schema struct field for ReadRel

Substrait plans are not valid without this, and it is generally useful
for round trip testing

* feat: include field_qualifier param for from_substrait_named_struct

* feat: verify that Substrait and DataFusion agree on NamedScan schemas

If the schema registered with DataFusion and the schema as given by the
Substrait NamedScan do not have the same names and types, DataFusion
should reject it

* test: update existing substrait test + substrait validation test

* added substrait_validation test
* extracted useful test utilities

The utils::test::TestSchemaCollector::generate_context_from_plan
function can be used to dynamically generate a SessionContext from a
Substrait plan, which will include the schemas for NamedTables as given
in the Substrait plan.

This helps us avoid the issue of DataFusion test schemas and Substrait
plan schemas not being in sync.

* feat: expose from_substrait_named_struct

* refactor: remove unused imports

* docs: add missing licenses

* refactor: deal with unused code warnings

* remove optional qualifier from from_substrait_named_struct

* return DFSchema from from_substrait_named_struct

* one must imagine clippy happy

* accidental blah

* loosen the validation for schemas

allow cases where the Substrait schema is a subset of the DataFusion
schema

* minor doc tweaks

* update test data to deal with case issues in tests

* fix error message

* improve readability of field compatability check

* make TestSchemaCollector more flexible

* fix doc typo

Co-authored-by: Arttu <[email protected]>

* remove unecessary TODO

* handle ReadRel projection on top of mismatched schema

---------

Co-authored-by: Arttu <[email protected]>
  • Loading branch information
vbarua and Blizzara authored Sep 10, 2024
1 parent c71a9d7 commit 41c5f4e
Show file tree
Hide file tree
Showing 13 changed files with 697 additions and 113 deletions.
131 changes: 121 additions & 10 deletions datafusion/substrait/src/logical_plan/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ use crate::variation_const::{
};
use datafusion::arrow::array::{new_empty_array, AsArray};
use datafusion::common::scalar::ScalarStructBuilder;
use datafusion::dataframe::DataFrame;
use datafusion::logical_expr::builder::project;
use datafusion::logical_expr::expr::InList;
use datafusion::logical_expr::{
col, expr, Cast, Extension, GroupingSet, Like, LogicalPlanBuilder, Partitioning,
Expand Down Expand Up @@ -277,6 +279,20 @@ pub fn extract_projection(
);
Ok(LogicalPlan::TableScan(scan))
}
LogicalPlan::Projection(projection) => {
// create another Projection around the Projection to handle the field masking
let fields: Vec<Expr> = column_indices
.into_iter()
.map(|i| {
let (qualifier, field) =
projection.schema.qualified_field(i);
let column =
Column::new(qualifier.cloned(), field.name());
Expr::Column(column)
})
.collect();
project(LogicalPlan::Projection(projection), fields)
}
_ => plan_err!("unexpected plan for table"),
}
}
Expand Down Expand Up @@ -640,6 +656,10 @@ pub async fn from_substrait_rel(
}
Some(RelType::Read(read)) => match &read.as_ref().read_type {
Some(ReadType::NamedTable(nt)) => {
let named_struct = read.base_schema.as_ref().ok_or_else(|| {
substrait_datafusion_err!("No base schema provided for Named Table")
})?;

let table_reference = match nt.names.len() {
0 => {
return plan_err!("No table name found in NamedTable");
Expand All @@ -657,7 +677,13 @@ pub async fn from_substrait_rel(
table: nt.names[2].clone().into(),
},
};
let t = ctx.table(table_reference).await?;

let substrait_schema =
from_substrait_named_struct(named_struct, extensions)?
.replace_qualifier(table_reference.clone());

let t = ctx.table(table_reference.clone()).await?;
let t = ensure_schema_compatability(t, substrait_schema)?;
let t = t.into_optimized_plan()?;
extract_projection(t, &read.projection)
}
Expand All @@ -671,7 +697,7 @@ pub async fn from_substrait_rel(
if vt.values.is_empty() {
return Ok(LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema,
schema: DFSchemaRef::new(schema),
}));
}

Expand Down Expand Up @@ -704,7 +730,10 @@ pub async fn from_substrait_rel(
})
.collect::<Result<_>>()?;

Ok(LogicalPlan::Values(Values { schema, values }))
Ok(LogicalPlan::Values(Values {
schema: DFSchemaRef::new(schema),
values,
}))
}
Some(ReadType::LocalFiles(lf)) => {
fn extract_filename(name: &str) -> Option<String> {
Expand Down Expand Up @@ -850,6 +879,87 @@ pub async fn from_substrait_rel(
}
}

/// Ensures that the given Substrait schema is compatible with the schema as given by DataFusion
///
/// This means:
/// 1. All fields present in the Substrait schema are present in the DataFusion schema. The
/// DataFusion schema may have MORE fields, but not the other way around.
/// 2. All fields are compatible. See [`ensure_field_compatability`] for details
///
/// This function returns a DataFrame with fields adjusted if necessary in the event that the
/// Substrait schema is a subset of the DataFusion schema.
fn ensure_schema_compatability(
table: DataFrame,
substrait_schema: DFSchema,
) -> Result<DataFrame> {
let df_schema = table.schema().to_owned().strip_qualifiers();
if df_schema.logically_equivalent_names_and_types(&substrait_schema) {
return Ok(table);
}
let selected_columns = substrait_schema
.strip_qualifiers()
.fields()
.iter()
.map(|substrait_field| {
let df_field =
df_schema.field_with_unqualified_name(substrait_field.name())?;
ensure_field_compatability(df_field, substrait_field)?;
Ok(col(format!("\"{}\"", df_field.name())))
})
.collect::<Result<_>>()?;

table.select(selected_columns)
}

/// Ensures that the given Substrait field is compatible with the given DataFusion field
///
/// A field is compatible between Substrait and DataFusion if:
/// 1. They have logically equivalent types.
/// 2. They have the same nullability OR the Substrait field is nullable and the DataFusion fields
/// is not nullable.
///
/// If a Substrait field is not nullable, the Substrait plan may be built around assuming it is not
/// nullable. As such if DataFusion has that field as nullable the plan should be rejected.
fn ensure_field_compatability(
datafusion_field: &Field,
substrait_field: &Field,
) -> Result<()> {
if !DFSchema::datatype_is_logically_equal(
datafusion_field.data_type(),
substrait_field.data_type(),
) {
return substrait_err!(
"Field '{}' in Substrait schema has a different type ({}) than the corresponding field in the table schema ({}).",
substrait_field.name(),
substrait_field.data_type(),
datafusion_field.data_type()
);
}

if !compatible_nullabilities(
datafusion_field.is_nullable(),
substrait_field.is_nullable(),
) {
// TODO: from_substrait_struct_type needs to be updated to set the nullability correctly. It defaults to true for now.
return substrait_err!(
"Field '{}' is nullable in the DataFusion schema but not nullable in the Substrait schema.",
substrait_field.name()
);
}
Ok(())
}

/// Returns true if the DataFusion and Substrait nullabilities are compatible, false otherwise
fn compatible_nullabilities(
datafusion_nullability: bool,
substrait_nullability: bool,
) -> bool {
// DataFusion and Substrait have the same nullability
(datafusion_nullability == substrait_nullability)
// DataFusion is not nullable and Substrait is nullable
|| (!datafusion_nullability && substrait_nullability)
}

/// (Re)qualify the sides of a join if needed, i.e. if the columns from one side would otherwise
/// conflict with the columns from the other.
/// Substrait doesn't currently allow specifying aliases, neither for columns nor for tables. For
Expand Down Expand Up @@ -1588,10 +1698,11 @@ fn next_struct_field_name(
}
}

fn from_substrait_named_struct(
/// Convert Substrait NamedStruct to DataFusion DFSchemaRef
pub fn from_substrait_named_struct(
base_schema: &NamedStruct,
extensions: &Extensions,
) -> Result<DFSchemaRef> {
) -> Result<DFSchema> {
let mut name_idx = 0;
let fields = from_substrait_struct_type(
base_schema.r#struct.as_ref().ok_or_else(|| {
Expand All @@ -1603,12 +1714,12 @@ fn from_substrait_named_struct(
);
if name_idx != base_schema.names.len() {
return substrait_err!(
"Names list must match exactly to nested schema, but found {} uses for {} names",
name_idx,
base_schema.names.len()
);
"Names list must match exactly to nested schema, but found {} uses for {} names",
name_idx,
base_schema.names.len()
);
}
Ok(DFSchemaRef::new(DFSchema::try_from(Schema::new(fields?))?))
DFSchema::try_from(Schema::new(fields?))
}

fn from_substrait_bound(
Expand Down
16 changes: 5 additions & 11 deletions datafusion/substrait/src/logical_plan/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ use crate::variation_const::{
use datafusion::arrow::array::{Array, GenericListArray, OffsetSizeTrait};
use datafusion::common::{
exec_err, internal_err, not_impl_err, plan_err, substrait_datafusion_err,
substrait_err, DFSchemaRef, ToDFSchema,
};
use datafusion::common::{substrait_err, DFSchemaRef};
#[allow(unused_imports)]
use datafusion::logical_expr::expr::{
Alias, BinaryExpr, Case, Cast, GroupingSet, InList, InSubquery, Sort, WindowFunction,
Expand Down Expand Up @@ -139,19 +139,13 @@ pub fn to_substrait_rel(
maintain_singular_struct: false,
});

let table_schema = scan.source.schema().to_dfschema_ref()?;
let base_schema = to_substrait_named_struct(&table_schema, extensions)?;

Ok(Box::new(Rel {
rel_type: Some(RelType::Read(Box::new(ReadRel {
common: None,
base_schema: Some(NamedStruct {
names: scan
.source
.schema()
.fields()
.iter()
.map(|f| f.name().to_owned())
.collect(),
r#struct: None,
}),
base_schema: Some(base_schema),
filter: None,
best_effort_filter: None,
projection,
Expand Down
Loading

0 comments on commit 41c5f4e

Please sign in to comment.