Skip to content

Commit

Permalink
feat(9493): provide access to FileMetaData for files written with Par…
Browse files Browse the repository at this point in the history
…quetSink (apache#9548)

* feat(tmp): changes to enable ParquetSink poc

* chore: remove unnecessary accessor to inner Url

* refactor: provide file metadata keyed to (possibly partitioned) file path

* test: unit tests for ParquetSink, with and without partitioned file writes

* refactor: update to use macro for optional tracing of internal DF errors

* chore: clean up test and code comments, to make more explicit
  • Loading branch information
wiedld authored Mar 12, 2024
1 parent bd9a272 commit 1cd4529
Showing 1 changed file with 228 additions and 21 deletions.
249 changes: 228 additions & 21 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@ use crate::physical_plan::{
use datafusion_common::config::TableParquetOptions;
use datafusion_common::file_options::parquet_writer::ParquetWriterOptions;
use datafusion_common::stats::Precision;
use datafusion_common::{exec_err, not_impl_err, DataFusionError, FileType};
use datafusion_common::{
exec_err, internal_datafusion_err, not_impl_err, DataFusionError, FileType,
};
use datafusion_common_runtime::SpawnedTask;
use datafusion_execution::TaskContext;
use datafusion_physical_expr::{PhysicalExpr, PhysicalSortRequirement};
Expand All @@ -66,6 +68,7 @@ use parquet::file::metadata::ParquetMetaData;
use parquet::file::properties::WriterProperties;
use parquet::file::statistics::Statistics as ParquetStatistics;
use parquet::file::writer::SerializedFileWriter;
use parquet::format::FileMetaData;
use tokio::io::{AsyncWrite, AsyncWriteExt};
use tokio::sync::mpsc::{self, Receiver, Sender};
use tokio::task::JoinSet;
Expand Down Expand Up @@ -537,6 +540,9 @@ pub struct ParquetSink {
config: FileSinkConfig,
///
parquet_options: TableParquetOptions,
/// File metadata from successfully produced parquet files. The Mutex is only used
/// to allow inserting to HashMap from behind borrowed reference in DataSink::write_all.
written: Arc<parking_lot::Mutex<HashMap<Path, FileMetaData>>>,
}

impl Debug for ParquetSink {
Expand All @@ -563,13 +569,21 @@ impl ParquetSink {
Self {
config,
parquet_options,
written: Default::default(),
}
}

/// Retrieve the inner [`FileSinkConfig`].
pub fn config(&self) -> &FileSinkConfig {
&self.config
}

/// Retrieve the file metadata for the written files, keyed to the path
/// which may be partitioned (in the case of hive style partitioning).
pub fn written(&self) -> HashMap<Path, FileMetaData> {
self.written.lock().clone()
}

/// Converts table schema to writer schema, which may differ in the case
/// of hive style partitioning where some columns are removed from the
/// underlying files.
Expand Down Expand Up @@ -673,8 +687,10 @@ impl DataSink for ParquetSink {
"parquet".into(),
);

let mut file_write_tasks: JoinSet<std::result::Result<usize, DataFusionError>> =
JoinSet::new();
let mut file_write_tasks: JoinSet<
std::result::Result<(Path, FileMetaData), DataFusionError>,
> = JoinSet::new();

while let Some((path, mut rx)) = file_stream_rx.recv().await {
if !allow_single_file_parallelism {
let mut writer = self
Expand All @@ -685,13 +701,14 @@ impl DataSink for ParquetSink {
)
.await?;
file_write_tasks.spawn(async move {
let mut row_count = 0;
while let Some(batch) = rx.recv().await {
row_count += batch.num_rows();
writer.write(&batch).await?;
}
writer.close().await?;
Ok(row_count)
let file_metadata = writer
.close()
.await
.map_err(DataFusionError::ParquetError)?;
Ok((path, file_metadata))
});
} else {
let writer = create_writer(
Expand All @@ -706,14 +723,15 @@ impl DataSink for ParquetSink {
let props = parquet_props.clone();
let parallel_options_clone = parallel_options.clone();
file_write_tasks.spawn(async move {
output_single_parquet_file_parallelized(
let file_metadata = output_single_parquet_file_parallelized(
writer,
rx,
schema,
props.writer_options(),
parallel_options_clone,
)
.await
.await?;
Ok((path, file_metadata))
});
}
}
Expand All @@ -722,7 +740,13 @@ impl DataSink for ParquetSink {
while let Some(result) = file_write_tasks.join_next().await {
match result {
Ok(r) => {
row_count += r?;
let (path, file_metadata) = r?;
row_count += file_metadata.num_rows;
let mut written_files = self.written.lock();
written_files
.try_insert(path.clone(), file_metadata)
.map_err(|e| internal_datafusion_err!("duplicate entry detected for partitioned file {path}: {e}"))?;
drop(written_files);
}
Err(e) => {
if e.is_panic() {
Expand Down Expand Up @@ -924,7 +948,7 @@ async fn concatenate_parallel_row_groups(
schema: Arc<Schema>,
writer_props: Arc<WriterProperties>,
mut object_store_writer: AbortableWrite<Box<dyn AsyncWrite + Send + Unpin>>,
) -> Result<usize> {
) -> Result<FileMetaData> {
let merged_buff = SharedBuffer::new(INITIAL_BUFFER_BYTES);

let schema_desc = arrow_to_parquet_schema(schema.as_ref())?;
Expand All @@ -934,13 +958,10 @@ async fn concatenate_parallel_row_groups(
writer_props,
)?;

let mut row_count = 0;

while let Some(task) = serialize_rx.recv().await {
let result = task.join_unwind().await;
let mut rg_out = parquet_writer.next_row_group()?;
let (serialized_columns, cnt) = result?;
row_count += cnt;
let (serialized_columns, _cnt) = result?;
for chunk in serialized_columns {
chunk.append_to_row_group(&mut rg_out)?;
let mut buff_to_flush = merged_buff.buffer.try_lock().unwrap();
Expand All @@ -954,13 +975,13 @@ async fn concatenate_parallel_row_groups(
rg_out.close()?;
}

let inner_writer = parquet_writer.into_inner()?;
let final_buff = inner_writer.buffer.try_lock().unwrap();
let file_metadata = parquet_writer.close()?;
let final_buff = merged_buff.buffer.try_lock().unwrap();

object_store_writer.write_all(final_buff.as_slice()).await?;
object_store_writer.shutdown().await?;

Ok(row_count)
Ok(file_metadata)
}

/// Parallelizes the serialization of a single parquet file, by first serializing N
Expand All @@ -973,7 +994,7 @@ async fn output_single_parquet_file_parallelized(
output_schema: Arc<Schema>,
parquet_props: &WriterProperties,
parallel_options: ParallelParquetWriterOptions,
) -> Result<usize> {
) -> Result<FileMetaData> {
let max_rowgroups = parallel_options.max_parallel_row_groups;
// Buffer size of this channel limits maximum number of RowGroups being worked on in parallel
let (serialize_tx, serialize_rx) =
Expand All @@ -987,7 +1008,7 @@ async fn output_single_parquet_file_parallelized(
arc_props.clone(),
parallel_options,
);
let row_count = concatenate_parallel_row_groups(
let file_metadata = concatenate_parallel_row_groups(
serialize_rx,
output_schema.clone(),
arc_props.clone(),
Expand All @@ -996,7 +1017,7 @@ async fn output_single_parquet_file_parallelized(
.await?;

launch_serialization_task.join_unwind().await?;
Ok(row_count)
Ok(file_metadata)
}

#[cfg(test)]
Expand Down Expand Up @@ -1084,6 +1105,7 @@ pub(crate) mod test_util {
#[cfg(test)]
mod tests {
use super::super::test_util::scan_format;
use crate::datasource::listing::{ListingTableUrl, PartitionedFile};
use crate::physical_plan::collect;
use std::fmt::{Display, Formatter};
use std::sync::atomic::{AtomicUsize, Ordering};
Expand All @@ -1095,13 +1117,19 @@ mod tests {
use crate::prelude::{SessionConfig, SessionContext};
use arrow::array::{Array, ArrayRef, StringArray};
use arrow::record_batch::RecordBatch;
use arrow_schema::Field;
use async_trait::async_trait;
use bytes::Bytes;
use datafusion_common::cast::{
as_binary_array, as_boolean_array, as_float32_array, as_float64_array,
as_int32_array, as_timestamp_nanosecond_array,
};
use datafusion_common::config::ParquetOptions;
use datafusion_common::config::TableParquetOptions;
use datafusion_common::ScalarValue;
use datafusion_execution::object_store::ObjectStoreUrl;
use datafusion_execution::runtime_env::RuntimeEnv;
use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
use futures::stream::BoxStream;
use futures::StreamExt;
use log::error;
Expand Down Expand Up @@ -1796,4 +1824,183 @@ mod tests {
let format = ParquetFormat::default();
scan_format(state, &format, &testdata, file_name, projection, limit).await
}

fn build_ctx(store_url: &url::Url) -> Arc<TaskContext> {
let tmp_dir = tempfile::TempDir::new().unwrap();
let local = Arc::new(
LocalFileSystem::new_with_prefix(&tmp_dir)
.expect("should create object store"),
);

let mut session = SessionConfig::default();
let mut parquet_opts = ParquetOptions {
allow_single_file_parallelism: true,
..Default::default()
};
parquet_opts.allow_single_file_parallelism = true;
session.options_mut().execution.parquet = parquet_opts;

let runtime = RuntimeEnv::default();
runtime
.object_store_registry
.register_store(store_url, local);

Arc::new(
TaskContext::default()
.with_session_config(session)
.with_runtime(Arc::new(runtime)),
)
}

#[tokio::test]
async fn parquet_sink_write() -> Result<()> {
let field_a = Field::new("a", DataType::Utf8, false);
let field_b = Field::new("b", DataType::Utf8, false);
let schema = Arc::new(Schema::new(vec![field_a, field_b]));
let object_store_url = ObjectStoreUrl::local_filesystem();

let file_sink_config = FileSinkConfig {
object_store_url: object_store_url.clone(),
file_groups: vec![PartitionedFile::new("/tmp".to_string(), 1)],
table_paths: vec![ListingTableUrl::parse("file:///")?],
output_schema: schema.clone(),
table_partition_cols: vec![],
overwrite: true,
};
let parquet_sink = Arc::new(ParquetSink::new(
file_sink_config,
TableParquetOptions::default(),
));

// create data
let col_a: ArrayRef = Arc::new(StringArray::from(vec!["foo", "bar"]));
let col_b: ArrayRef = Arc::new(StringArray::from(vec!["baz", "baz"]));
let batch = RecordBatch::try_from_iter(vec![("a", col_a), ("b", col_b)]).unwrap();

// write stream
parquet_sink
.write_all(
Box::pin(RecordBatchStreamAdapter::new(
schema,
futures::stream::iter(vec![Ok(batch)]),
)),
&build_ctx(object_store_url.as_ref()),
)
.await
.unwrap();

// assert written
let mut written = parquet_sink.written();
let written = written.drain();
assert_eq!(
written.len(),
1,
"expected a single parquet files to be written, instead found {}",
written.len()
);

// check the file metadata
let (
path,
FileMetaData {
num_rows, schema, ..
},
) = written.take(1).next().unwrap();
let path_parts = path.parts().collect::<Vec<_>>();
assert_eq!(path_parts.len(), 1, "should not have path prefix");

assert_eq!(num_rows, 2, "file metdata to have 2 rows");
assert!(
schema.iter().any(|col_schema| col_schema.name == "a"),
"output file metadata should contain col a"
);
assert!(
schema.iter().any(|col_schema| col_schema.name == "b"),
"output file metadata should contain col b"
);

Ok(())
}

#[tokio::test]
async fn parquet_sink_write_partitions() -> Result<()> {
let field_a = Field::new("a", DataType::Utf8, false);
let field_b = Field::new("b", DataType::Utf8, false);
let schema = Arc::new(Schema::new(vec![field_a, field_b]));
let object_store_url = ObjectStoreUrl::local_filesystem();

// set file config to include partitioning on field_a
let file_sink_config = FileSinkConfig {
object_store_url: object_store_url.clone(),
file_groups: vec![PartitionedFile::new("/tmp".to_string(), 1)],
table_paths: vec![ListingTableUrl::parse("file:///")?],
output_schema: schema.clone(),
table_partition_cols: vec![("a".to_string(), DataType::Utf8)], // add partitioning
overwrite: true,
};
let parquet_sink = Arc::new(ParquetSink::new(
file_sink_config,
TableParquetOptions::default(),
));

// create data with 2 partitions
let col_a: ArrayRef = Arc::new(StringArray::from(vec!["foo", "bar"]));
let col_b: ArrayRef = Arc::new(StringArray::from(vec!["baz", "baz"]));
let batch = RecordBatch::try_from_iter(vec![("a", col_a), ("b", col_b)]).unwrap();

// write stream
parquet_sink
.write_all(
Box::pin(RecordBatchStreamAdapter::new(
schema,
futures::stream::iter(vec![Ok(batch)]),
)),
&build_ctx(object_store_url.as_ref()),
)
.await
.unwrap();

// assert written
let mut written = parquet_sink.written();
let written = written.drain();
assert_eq!(
written.len(),
2,
"expected two parquet files to be written, instead found {}",
written.len()
);

// check the file metadata includes partitions
let mut expected_partitions = std::collections::HashSet::from(["a=foo", "a=bar"]);
for (
path,
FileMetaData {
num_rows, schema, ..
},
) in written.take(2)
{
let path_parts = path.parts().collect::<Vec<_>>();
assert_eq!(path_parts.len(), 2, "should have path prefix");

let prefix = path_parts[0].as_ref();
assert!(
expected_partitions.contains(prefix),
"expected path prefix to match partition, instead found {:?}",
prefix
);
expected_partitions.remove(prefix);

assert_eq!(num_rows, 1, "file metdata to have 1 row");
assert!(
!schema.iter().any(|col_schema| col_schema.name == "a"),
"output file metadata will not contain partitioned col a"
);
assert!(
schema.iter().any(|col_schema| col_schema.name == "b"),
"output file metadata should contain col b"
);
}

Ok(())
}
}

0 comments on commit 1cd4529

Please sign in to comment.