Skip to content

Commit

Permalink
Sort filenames when reading parquet to ensure consistent schema (apac…
Browse files Browse the repository at this point in the history
…he#6629)

* update

* FIXed

* add  parquet

* update

* update

* update

* try2

* update

* update

* Add comments

* cargo fmt

---------

Co-authored-by: Andrew Lamb <[email protected]>
  • Loading branch information
2 people authored and appletreeisyellow committed Jan 3, 2024
1 parent 15ebe7e commit 9cfdba4
Showing 1 changed file with 80 additions and 5 deletions.
85 changes: 80 additions & 5 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,16 @@ fn clear_metadata(
})
}

async fn fetch_schema_with_location(
store: &dyn ObjectStore,
file: &ObjectMeta,
metadata_size_hint: Option<usize>,
) -> Result<(Path, Schema)> {
let loc_path = file.location.clone();
let schema = fetch_schema(store, file, metadata_size_hint).await?;
Ok((loc_path, schema))
}

#[async_trait]
impl FileFormat for ParquetFormat {
fn as_any(&self) -> &dyn Any {
Expand All @@ -176,13 +186,32 @@ impl FileFormat for ParquetFormat {
store: &Arc<dyn ObjectStore>,
objects: &[ObjectMeta],
) -> Result<SchemaRef> {
let schemas: Vec<_> = futures::stream::iter(objects)
.map(|object| fetch_schema(store.as_ref(), object, self.metadata_size_hint))
let mut schemas: Vec<_> = futures::stream::iter(objects)
.map(|object| {
fetch_schema_with_location(
store.as_ref(),
object,
self.metadata_size_hint,
)
})
.boxed() // Workaround https://github.com/rust-lang/rust/issues/64552
.buffered(state.config_options().execution.meta_fetch_concurrency)
.try_collect()
.await?;

// Schema inference adds fields based the order they are seen
// which depends on the order the files are processed. For some
// object stores (like local file systems) the order returned from list
// is not deterministic. Thus, to ensure deterministic schema inference
// sort the files first.
// https://github.com/apache/arrow-datafusion/pull/6629
schemas.sort_by(|(location1, _), (location2, _)| location1.cmp(location2));

let schemas = schemas
.into_iter()
.map(|(_, schema)| schema)
.collect::<Vec<_>>();

let schema = if self.skip_metadata(state.config_options()) {
Schema::try_merge(clear_metadata(schemas))
} else {
Expand Down Expand Up @@ -1124,12 +1153,21 @@ pub(crate) mod test_util {
batches: Vec<RecordBatch>,
multi_page: bool,
) -> Result<(Vec<ObjectMeta>, Vec<NamedTempFile>)> {
// we need the tmp files to be sorted as some tests rely on the how the returning files are ordered
// https://github.com/apache/arrow-datafusion/pull/6629
let tmp_files = {
let mut tmp_files: Vec<_> = (0..batches.len())
.map(|_| NamedTempFile::new().expect("creating temp file"))
.collect();
tmp_files.sort_by(|a, b| a.path().cmp(b.path()));
tmp_files
};

// Each batch writes to their own file
let files: Vec<_> = batches
.into_iter()
.map(|batch| {
let mut output = NamedTempFile::new().expect("creating temp file");

.zip(tmp_files.into_iter())
.map(|(batch, mut output)| {
let builder = WriterProperties::builder();
let props = if multi_page {
builder.set_data_page_row_count_limit(ROWS_PER_PAGE)
Expand All @@ -1155,6 +1193,7 @@ pub(crate) mod test_util {
.collect();

let meta: Vec<_> = files.iter().map(local_unpartitioned_file).collect();

Ok((meta, files))
}

Expand Down Expand Up @@ -1254,6 +1293,42 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn is_schema_stable() -> Result<()> {
let c1: ArrayRef =
Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));

let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None]));

let batch1 =
RecordBatch::try_from_iter(vec![("a", c1.clone()), ("b", c1.clone())])
.unwrap();
let batch2 =
RecordBatch::try_from_iter(vec![("c", c2.clone()), ("d", c2.clone())])
.unwrap();

let store = Arc::new(LocalFileSystem::new()) as _;
let (meta, _files) = store_parquet(vec![batch1, batch2], false).await?;

let session = SessionContext::new();
let ctx = session.state();
let format = ParquetFormat::default();
let schema = format.infer_schema(&ctx, &store, &meta).await.unwrap();

let order: Vec<_> = ["a", "b", "c", "d"]
.into_iter()
.map(|i| i.to_string())
.collect();
let coll: Vec<_> = schema
.all_fields()
.into_iter()
.map(|i| i.name().to_string())
.collect();
assert_eq!(coll, order);

Ok(())
}

#[derive(Debug)]
struct RequestCountingObjectStore {
inner: Arc<dyn ObjectStore>,
Expand Down

0 comments on commit 9cfdba4

Please sign in to comment.