Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add metadata to DFSchema, close #1806. #1914

Merged
merged 6 commits into from
Mar 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 69 additions & 18 deletions datafusion-common/src/dfschema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
//! DFSchema is an extended schema struct that DataFusion uses to provide support for
//! fields with optional relation names.

use std::collections::HashSet;
use std::collections::{HashMap, HashSet};
use std::convert::TryFrom;
use std::sync::Arc;

Expand All @@ -36,16 +36,30 @@ pub type DFSchemaRef = Arc<DFSchema>;
pub struct DFSchema {
/// Fields
fields: Vec<DFField>,
/// Additional metadata in form of key value pairs
metadata: HashMap<String, String>,
alamb marked this conversation as resolved.
Show resolved Hide resolved
}

impl DFSchema {
/// Creates an empty `DFSchema`
pub fn empty() -> Self {
Self { fields: vec![] }
Self {
fields: vec![],
metadata: HashMap::new(),
}
}

#[deprecated(since = "7.0.0", note = "please use `new_with_metadata` instead")]
/// Create a new `DFSchema`
pub fn new(fields: Vec<DFField>) -> Result<Self> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we can directly delete the API?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be a breaking change if we delete it since it's a public API.

Self::new_with_metadata(fields, HashMap::new())
}

/// Create a new `DFSchema`
pub fn new_with_metadata(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would you think about a more "builder-like" API here that perhaps could remain backwards compatible.

Something like:

/// Sets the metadata on this DFSchema to `metadata`
pub fn with_metadata(mut self, metadata: HashMap<String, String>) -> Self {
...
}

And then instead of code like

        Schema::new_with_metadata(
            df_schema.fields.iter().map(|f| f.field.clone()).collect(),
            df_schema.metadata.clone(),
        )

It would look like

        Schema::new(
            df_schema.fields.iter().map(|f| f.field.clone()).collect(),
        )
        .with_metadata(df_schema.metadata.clone()),

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Builder-style API is good itself, but is not suitable here I think.

As you can see, schema conversion between Array and datafusion is used in many places, the compiler will output an warning when new is marked as deprecated, but when adopt builder-style API, no warning will be made, and it would be very easy to lost metadata when doing nested conversion.

Copy link
Member

@xudong963 xudong963 Mar 7, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO, metadata in DFSchema is a new feature in the next release, if users want to use it, they'll check the API doc. So we don't need to depend on the compiler's warning.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As you can see in https://github.com/apache/arrow-datafusion/pull/1914/files#diff-c1ef69547042f0c07aa616c9d5d58cbe2a3c5720f7237c948c99012d6cc0024a

Those optimizers will rewrite plan, if any optimizer forget to attach metadata to newly-created plan, then metadata is lost, it's very easy to miss that without compiler's help.

fields: Vec<DFField>,
metadata: HashMap<String, String>,
) -> Result<Self> {
let mut qualified_names = HashSet::new();
let mut unqualified_names = HashSet::new();

Expand Down Expand Up @@ -86,25 +100,28 @@ impl DFSchema {
)));
}
}
Ok(Self { fields })
Ok(Self { fields, metadata })
}

/// Create a `DFSchema` from an Arrow schema
pub fn try_from_qualified_schema(qualifier: &str, schema: &Schema) -> Result<Self> {
Self::new(
Self::new_with_metadata(
schema
.fields()
.iter()
.map(|f| DFField::from_qualified(qualifier, f.clone()))
.collect(),
schema.metadata().clone(),
)
}

/// Combine two schemas
pub fn join(&self, schema: &DFSchema) -> Result<Self> {
let mut fields = self.fields.clone();
let mut metadata = self.metadata.clone();
fields.extend_from_slice(schema.fields().as_slice());
Self::new(fields)
metadata.extend(schema.metadata.clone());
Self::new_with_metadata(fields, metadata)
}

/// Merge a schema into self
Expand All @@ -120,6 +137,7 @@ impl DFSchema {
self.fields.push(field.clone());
}
}
self.metadata.extend(other_schema.metadata.clone())
}

/// Get a list of fields
Expand Down Expand Up @@ -263,6 +281,7 @@ impl DFSchema {
.into_iter()
.map(|f| f.strip_qualifier())
.collect(),
..self
}
}

Expand All @@ -281,6 +300,7 @@ impl DFSchema {
)
})
.collect(),
..self
}
}

Expand All @@ -295,12 +315,17 @@ impl DFSchema {
.collect::<Vec<_>>()
.join(", ")
}

/// Get metadata of this schema
pub fn metadata(&self) -> &HashMap<String, String> {
&self.metadata
}
}

impl From<DFSchema> for Schema {
/// Convert DFSchema into a Schema
fn from(df_schema: DFSchema) -> Self {
Schema::new(
Schema::new_with_metadata(
df_schema
.fields
.into_iter()
Expand All @@ -316,27 +341,32 @@ impl From<DFSchema> for Schema {
}
})
.collect(),
df_schema.metadata,
)
}
}

impl From<&DFSchema> for Schema {
/// Convert DFSchema reference into a Schema
fn from(df_schema: &DFSchema) -> Self {
Schema::new(df_schema.fields.iter().map(|f| f.field.clone()).collect())
Schema::new_with_metadata(
df_schema.fields.iter().map(|f| f.field.clone()).collect(),
df_schema.metadata.clone(),
)
}
}

/// Create a `DFSchema` from an Arrow schema
impl TryFrom<Schema> for DFSchema {
type Error = DataFusionError;
fn try_from(schema: Schema) -> std::result::Result<Self, Self::Error> {
Self::new(
Self::new_with_metadata(
schema
.fields()
.iter()
.map(|f| DFField::from(f.clone()))
.collect(),
schema.metadata().clone(),
)
}
}
Expand Down Expand Up @@ -384,20 +414,21 @@ impl ToDFSchema for SchemaRef {

impl ToDFSchema for Vec<DFField> {
fn to_dfschema(self) -> Result<DFSchema> {
DFSchema::new(self)
DFSchema::new_with_metadata(self, HashMap::new())
}
}

impl Display for DFSchema {
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
write!(
f,
"{}",
"fields:[{}], metadata:{:?}",
self.fields
.iter()
.map(|field| field.qualified_name())
.collect::<Vec<String>>()
.join(", ")
.join(", "),
self.metadata
)
}
}
Expand Down Expand Up @@ -556,14 +587,14 @@ mod tests {
#[test]
fn from_unqualified_schema() -> Result<()> {
let schema = DFSchema::try_from(test_schema_1())?;
assert_eq!("c0, c1", schema.to_string());
assert_eq!("fields:[c0, c1], metadata:{}", schema.to_string());
Ok(())
}

#[test]
fn from_qualified_schema() -> Result<()> {
let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
assert_eq!("t1.c0, t1.c1", schema.to_string());
assert_eq!("fields:[t1.c0, t1.c1], metadata:{}", schema.to_string());
Ok(())
}

Expand All @@ -582,7 +613,10 @@ mod tests {
let left = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
let right = DFSchema::try_from_qualified_schema("t2", &test_schema_1())?;
let join = left.join(&right)?;
assert_eq!("t1.c0, t1.c1, t2.c0, t2.c1", join.to_string());
assert_eq!(
"fields:[t1.c0, t1.c1, t2.c0, t2.c1], metadata:{}",
join.to_string()
);
// test valid access
assert!(join.field_with_qualified_name("t1", "c0").is_ok());
assert!(join.field_with_qualified_name("t2", "c0").is_ok());
Expand Down Expand Up @@ -626,7 +660,10 @@ mod tests {
let left = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
let right = DFSchema::try_from(test_schema_2())?;
let join = left.join(&right)?;
assert_eq!("t1.c0, t1.c1, c100, c101", join.to_string());
assert_eq!(
"fields:[t1.c0, t1.c1, c100, c101], metadata:{}",
join.to_string()
);
// test valid access
assert!(join.field_with_qualified_name("t1", "c0").is_ok());
assert!(join.field_with_unqualified_name("c0").is_ok());
Expand Down Expand Up @@ -678,11 +715,18 @@ mod tests {
#[test]
fn into() {
// Demonstrate how to convert back and forth between Schema, SchemaRef, DFSchema, and DFSchemaRef
let arrow_schema = Schema::new(vec![Field::new("c0", DataType::Int64, true)]);
let metadata = test_metadata();
let arrow_schema = Schema::new_with_metadata(
vec![Field::new("c0", DataType::Int64, true)],
metadata.clone(),
);
let arrow_schema_ref = Arc::new(arrow_schema.clone());

let df_schema =
DFSchema::new(vec![DFField::new(None, "c0", DataType::Int64, true)]).unwrap();
let df_schema = DFSchema::new_with_metadata(
vec![DFField::new(None, "c0", DataType::Int64, true)],
metadata,
)
.unwrap();
let df_schema_ref = Arc::new(df_schema.clone());

{
Expand Down Expand Up @@ -719,4 +763,11 @@ mod tests {
Field::new("c101", DataType::Boolean, true),
])
}

fn test_metadata() -> HashMap<String, String> {
vec![("k1", "v1"), ("k2", "v2")]
.into_iter()
.map(|(k, v)| (k.to_string(), v.to_string()))
.collect::<HashMap<_, _>>()
}
}
1 change: 1 addition & 0 deletions datafusion-proto/src/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ impl TryFrom<&protobuf::DfSchema> for DFSchema {
.iter()
.map(|c| c.try_into())
.collect::<Result<Vec<DFField>, _>>()?;
#[allow(deprecated)]
Ok(DFSchema::new(fields)?)
}
}
Expand Down
36 changes: 27 additions & 9 deletions datafusion/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,8 @@ impl LogicalPlanBuilder {
for (i, j) in nulls {
values[i][j] = Expr::Literal(ScalarValue::try_from(fields[j].data_type())?);
}
let schema = DFSchemaRef::new(DFSchema::new(fields)?);
let schema =
DFSchemaRef::new(DFSchema::new_with_metadata(fields, HashMap::new())?);
Ok(Self::from(LogicalPlan::Values(Values { schema, values })))
}

Expand Down Expand Up @@ -392,12 +393,13 @@ impl LogicalPlanBuilder {
let projected_schema = projection
.as_ref()
.map(|p| {
DFSchema::new(
DFSchema::new_with_metadata(
p.iter()
.map(|i| {
DFField::from_qualified(&table_name, schema.field(*i).clone())
})
.collect(),
schema.metadata().clone(),
)
})
.unwrap_or_else(|| {
Expand Down Expand Up @@ -500,7 +502,10 @@ impl LogicalPlanBuilder {

expr.extend(missing_exprs);

let new_schema = DFSchema::new(exprlist_to_fields(&expr, input_schema)?)?;
let new_schema = DFSchema::new_with_metadata(
exprlist_to_fields(&expr, input_schema)?,
input_schema.metadata().clone(),
)?;

Ok(LogicalPlan::Projection(Projection {
expr,
Expand Down Expand Up @@ -569,7 +574,10 @@ impl LogicalPlanBuilder {
.iter()
.map(|f| Expr::Column(f.qualified_column()))
.collect();
let new_schema = DFSchema::new(exprlist_to_fields(&new_expr, schema)?)?;
let new_schema = DFSchema::new_with_metadata(
exprlist_to_fields(&new_expr, schema)?,
schema.metadata().clone(),
)?;

Ok(Self::from(LogicalPlan::Projection(Projection {
expr: new_expr,
Expand Down Expand Up @@ -787,7 +795,10 @@ impl LogicalPlanBuilder {
Ok(Self::from(LogicalPlan::Window(Window {
input: Arc::new(self.plan.clone()),
window_expr,
schema: Arc::new(DFSchema::new(window_fields)?),
schema: Arc::new(DFSchema::new_with_metadata(
window_fields,
self.plan.schema().metadata().clone(),
)?),
})))
}

Expand All @@ -803,8 +814,10 @@ impl LogicalPlanBuilder {
let aggr_expr = normalize_cols(aggr_expr, &self.plan)?;
let all_expr = group_expr.iter().chain(aggr_expr.iter());
validate_unique_names("Aggregations", all_expr.clone(), self.plan.schema())?;
let aggr_schema =
DFSchema::new(exprlist_to_fields(all_expr, self.plan.schema())?)?;
let aggr_schema = DFSchema::new_with_metadata(
exprlist_to_fields(all_expr, self.plan.schema())?,
self.plan.schema().metadata().clone(),
)?;
Ok(Self::from(LogicalPlan::Aggregate(Aggregate {
input: Arc::new(self.plan.clone()),
group_expr,
Expand Down Expand Up @@ -927,7 +940,9 @@ pub fn build_join_schema(
}
};

DFSchema::new(fields)
let mut metadata = left.metadata().clone();
metadata.extend(right.metadata().clone());
DFSchema::new_with_metadata(fields, metadata)
}

/// Errors if one or more expressions have equal names.
Expand Down Expand Up @@ -1019,7 +1034,10 @@ pub fn project_with_alias(
}
}
validate_unique_names("Projections", projected_expr.iter(), input_schema)?;
let input_schema = DFSchema::new(exprlist_to_fields(&projected_expr, input_schema)?)?;
let input_schema = DFSchema::new_with_metadata(
exprlist_to_fields(&projected_expr, input_schema)?,
plan.schema().metadata().clone(),
)?;
let schema = match alias {
Some(ref alias) => input_schema.replace_qualifier(alias.as_str()),
None => input_schema,
Expand Down
Loading