Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade to DataFusion 16.0.0 #115

Merged
merged 13 commits into from
Jan 19, 2023
Prev Previous commit
Next Next commit
fix reads:
andygrove committed Jan 2, 2023

Verified

This commit was signed with the committer’s verified signature. The key has expired.
tvdeyen Thomas von Deyen
commit 1ed7a44f6ebd25436553996877993d8de368dd4c
32 changes: 18 additions & 14 deletions src/context.rs
Original file line number Diff line number Diff line change
@@ -94,7 +94,7 @@ impl PySessionContext {
}

let mut cfg = SessionConfig::new()
//.create_default_catalog_and_schema();
//TODO .create_default_catalog_and_schema();
.with_default_catalog_and_schema(default_catalog, default_schema)
.with_information_schema(information_schema)
.with_repartition_joins(repartition_joins)
@@ -348,15 +348,16 @@ impl PySessionContext {
.ok_or_else(|| PyValueError::new_err("Unable to convert path to a string"))?;
let mut options = NdJsonReadOptions::default()
.table_partition_cols(convert_table_partition_cols(table_partition_cols)?);
//TODO
// if let Some(x) = schema {
// options.schema = Some(&x.0);
// }
options.schema_infer_max_records = schema_infer_max_records;
options.file_extension = file_extension;

let result = self.ctx.read_json(path, options);
let df = wait_for_future(py, result).map_err(DataFusionError::from)?;
let df = if let Some(schema) = schema {
options.schema = Some(&schema.0);
let result = self.ctx.read_json(path, options);
wait_for_future(py, result).map_err(DataFusionError::from)?
} else {
let result = self.ctx.read_json(path, options);
wait_for_future(py, result).map_err(DataFusionError::from)?
};
Comment on lines +347 to +354
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was necessary because options.schema holds a reference to a schema now so we need to wait for the future to complete before ownership can be released.

Ok(PyDataFrame::new(df))
}

@@ -454,12 +455,15 @@ impl PySessionContext {
let mut options = AvroReadOptions::default()
.table_partition_cols(convert_table_partition_cols(table_partition_cols)?);
options.file_extension = file_extension;
//TODO
// options.schema = schema.map(|s| &s.0);

let result = self.ctx.read_avro(path, options);
let df = PyDataFrame::new(wait_for_future(py, result).map_err(DataFusionError::from)?);
Ok(df)
let df = if let Some(schema) = schema {
options.schema = Some(&schema.0);
let read_future = self.ctx.read_avro(path, options);
wait_for_future(py, read_future).map_err(DataFusionError::from)?
} else {
let read_future = self.ctx.read_avro(path, options);
wait_for_future(py, read_future).map_err(DataFusionError::from)?
};
Ok(PyDataFrame::new(df))
}
}