Skip to content

Commit

Permalink
refactor(substrait): refactor ReadRel consumer (#12983)
Browse files Browse the repository at this point in the history
  • Loading branch information
tokoko authored Oct 18, 2024
1 parent e9435a9 commit 97f7491
Showing 1 changed file with 87 additions and 94 deletions.
181 changes: 87 additions & 94 deletions datafusion/substrait/src/logical_plan/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -794,60 +794,61 @@ pub async fn from_substrait_rel(
let (left, right) = requalify_sides_if_needed(left, right)?;
left.cross_join(right.build()?)?.build()
}
Some(RelType::Read(read)) => match &read.as_ref().read_type {
Some(ReadType::NamedTable(nt)) => {
let named_struct = read.base_schema.as_ref().ok_or_else(|| {
substrait_datafusion_err!("No base schema provided for Named Table")
})?;
Some(RelType::Read(read)) => {
fn read_with_schema(
df: DataFrame,
schema: DFSchema,
projection: &Option<MaskExpression>,
) -> Result<LogicalPlan> {
ensure_schema_compatability(df.schema().to_owned(), schema.clone())?;

let table_reference = match nt.names.len() {
0 => {
return plan_err!("No table name found in NamedTable");
}
1 => TableReference::Bare {
table: nt.names[0].clone().into(),
},
2 => TableReference::Partial {
schema: nt.names[0].clone().into(),
table: nt.names[1].clone().into(),
},
_ => TableReference::Full {
catalog: nt.names[0].clone().into(),
schema: nt.names[1].clone().into(),
table: nt.names[2].clone().into(),
},
};
let schema = apply_masking(schema, projection)?;

let t = ctx.table(table_reference.clone()).await?;
apply_projection(df, schema)
}

let substrait_schema =
from_substrait_named_struct(named_struct, extensions)?
.replace_qualifier(table_reference);
let named_struct = read.base_schema.as_ref().ok_or_else(|| {
substrait_datafusion_err!("No base schema provided for Read Relation")
})?;

ensure_schema_compatability(
t.schema().to_owned(),
substrait_schema.clone(),
)?;
let substrait_schema = from_substrait_named_struct(named_struct, extensions)?;

let substrait_schema = apply_masking(substrait_schema, &read.projection)?;
match &read.as_ref().read_type {
Some(ReadType::NamedTable(nt)) => {
let table_reference = match nt.names.len() {
0 => {
return plan_err!("No table name found in NamedTable");
}
1 => TableReference::Bare {
table: nt.names[0].clone().into(),
},
2 => TableReference::Partial {
schema: nt.names[0].clone().into(),
table: nt.names[1].clone().into(),
},
_ => TableReference::Full {
catalog: nt.names[0].clone().into(),
schema: nt.names[1].clone().into(),
table: nt.names[2].clone().into(),
},
};

apply_projection(t, substrait_schema)
}
Some(ReadType::VirtualTable(vt)) => {
let base_schema = read.base_schema.as_ref().ok_or_else(|| {
substrait_datafusion_err!("No base schema provided for Virtual Table")
})?;
let t = ctx.table(table_reference.clone()).await?;

let schema = from_substrait_named_struct(base_schema, extensions)?;
let substrait_schema =
substrait_schema.replace_qualifier(table_reference);

if vt.values.is_empty() {
return Ok(LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema: DFSchemaRef::new(schema),
}));
read_with_schema(t, substrait_schema, &read.projection)
}
Some(ReadType::VirtualTable(vt)) => {
if vt.values.is_empty() {
return Ok(LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema: DFSchemaRef::new(substrait_schema),
}));
}

let values = vt
let values = vt
.values
.iter()
.map(|row| {
Expand All @@ -860,79 +861,71 @@ pub async fn from_substrait_rel(
Ok(Expr::Literal(from_substrait_literal(
lit,
extensions,
&base_schema.names,
&named_struct.names,
&mut name_idx,
)?))
})
.collect::<Result<_>>()?;
if name_idx != base_schema.names.len() {
if name_idx != named_struct.names.len() {
return substrait_err!(
"Names list must match exactly to nested schema, but found {} uses for {} names",
name_idx,
base_schema.names.len()
named_struct.names.len()
);
}
Ok(lits)
})
.collect::<Result<_>>()?;

Ok(LogicalPlan::Values(Values {
schema: DFSchemaRef::new(schema),
values,
}))
}
Some(ReadType::LocalFiles(lf)) => {
let named_struct = read.base_schema.as_ref().ok_or_else(|| {
substrait_datafusion_err!("No base schema provided for LocalFiles")
})?;

fn extract_filename(name: &str) -> Option<String> {
let corrected_url =
if name.starts_with("file://") && !name.starts_with("file:///") {
Ok(LogicalPlan::Values(Values {
schema: DFSchemaRef::new(substrait_schema),
values,
}))
}
Some(ReadType::LocalFiles(lf)) => {
fn extract_filename(name: &str) -> Option<String> {
let corrected_url = if name.starts_with("file://")
&& !name.starts_with("file:///")
{
name.replacen("file://", "file:///", 1)
} else {
name.to_string()
};

Url::parse(&corrected_url).ok().and_then(|url| {
let path = url.path();
std::path::Path::new(path)
.file_name()
.map(|filename| filename.to_string_lossy().to_string())
})
}

// we could use the file name to check the original table provider
// TODO: currently does not support multiple local files
let filename: Option<String> =
lf.items.first().and_then(|x| match x.path_type.as_ref() {
Some(UriFile(name)) => extract_filename(name),
_ => None,
});

if lf.items.len() > 1 || filename.is_none() {
return not_impl_err!("Only single file reads are supported");
}
let name = filename.unwrap();
// directly use unwrap here since we could determine it is a valid one
let table_reference = TableReference::Bare { table: name.into() };
let t = ctx.table(table_reference.clone()).await?;
Url::parse(&corrected_url).ok().and_then(|url| {
let path = url.path();
std::path::Path::new(path)
.file_name()
.map(|filename| filename.to_string_lossy().to_string())
})
}

let substrait_schema =
from_substrait_named_struct(named_struct, extensions)?
.replace_qualifier(table_reference);
// we could use the file name to check the original table provider
// TODO: currently does not support multiple local files
let filename: Option<String> =
lf.items.first().and_then(|x| match x.path_type.as_ref() {
Some(UriFile(name)) => extract_filename(name),
_ => None,
});

ensure_schema_compatability(
t.schema().to_owned(),
substrait_schema.clone(),
)?;
if lf.items.len() > 1 || filename.is_none() {
return not_impl_err!("Only single file reads are supported");
}
let name = filename.unwrap();
// directly use unwrap here since we could determine it is a valid one
let table_reference = TableReference::Bare { table: name.into() };
let t = ctx.table(table_reference.clone()).await?;

let substrait_schema = apply_masking(substrait_schema, &read.projection)?;
let substrait_schema =
substrait_schema.replace_qualifier(table_reference);

apply_projection(t, substrait_schema)
read_with_schema(t, substrait_schema, &read.projection)
}
_ => {
not_impl_err!("Unsupported ReadType: {:?}", &read.as_ref().read_type)
}
}
_ => not_impl_err!("Unsupported ReadType: {:?}", &read.as_ref().read_type),
},
}
Some(RelType::Set(set)) => match set_rel::SetOp::try_from(set.op) {
Ok(set_op) => match set_op {
set_rel::SetOp::UnionAll => {
Expand Down

0 comments on commit 97f7491

Please sign in to comment.