diff --git a/Cargo.toml b/Cargo.toml index d9e69e53db7c..c004d0ce2663 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -93,7 +93,7 @@ indexmap = "2.0.0" itertools = "0.12" log = "^0.4" num_cpus = "1.13.0" -object_store = { version = "0.9.0", default-features = false } +object_store = { version = "0.9.1", default-features = false } parking_lot = "0.12" parquet = { version = "51.0.0", default-features = false, features = ["arrow", "async", "object_store"] } rand = "0.8" diff --git a/datafusion/core/src/datasource/file_format/file_compression_type.rs b/datafusion/core/src/datasource/file_format/file_compression_type.rs index c538819e2684..c1fbe352d37b 100644 --- a/datafusion/core/src/datasource/file_format/file_compression_type.rs +++ b/datafusion/core/src/datasource/file_format/file_compression_type.rs @@ -43,6 +43,7 @@ use futures::stream::BoxStream; use futures::StreamExt; #[cfg(feature = "compression")] use futures::TryStreamExt; +use object_store::buffered::BufWriter; use tokio::io::AsyncWrite; #[cfg(feature = "compression")] use tokio_util::io::{ReaderStream, StreamReader}; @@ -148,11 +149,11 @@ impl FileCompressionType { }) } - /// Wrap the given `AsyncWrite` so that it performs compressed writes + /// Wrap the given `BufWriter` so that it performs compressed writes /// according to this `FileCompressionType`. pub fn convert_async_writer( &self, - w: Box, + w: BufWriter, ) -> Result> { Ok(match self.variant { #[cfg(feature = "compression")] @@ -169,7 +170,7 @@ impl FileCompressionType { "Compression feature is not enabled".to_owned(), )) } - UNCOMPRESSED => w, + UNCOMPRESSED => Box::new(w), }) } diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index b7626d41f4dd..ec333bb557d2 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -23,7 +23,7 @@ use std::fmt::Debug; use std::sync::Arc; use super::write::demux::start_demuxer_task; -use super::write::{create_writer, AbortableWrite, SharedBuffer}; +use super::write::{create_writer, SharedBuffer}; use super::{FileFormat, FileScanConfig}; use crate::arrow::array::{ BooleanArray, Float32Array, Float64Array, Int32Array, Int64Array, RecordBatch, @@ -56,6 +56,7 @@ use datafusion_physical_plan::metrics::MetricsSet; use async_trait::async_trait; use bytes::{BufMut, BytesMut}; +use object_store::buffered::BufWriter; use parquet::arrow::arrow_writer::{ compute_leaves, get_column_writers, ArrowColumnChunk, ArrowColumnWriter, ArrowLeafColumn, @@ -613,19 +614,13 @@ impl ParquetSink { location: &Path, object_store: Arc, parquet_props: WriterProperties, - ) -> Result< - AsyncArrowWriter>, - > { - let (_, multipart_writer) = object_store - .put_multipart(location) - .await - .map_err(DataFusionError::ObjectStore)?; + ) -> Result> { + let buf_writer = BufWriter::new(object_store, location.clone()); let writer = AsyncArrowWriter::try_new( - multipart_writer, + buf_writer, self.get_writer_schema(), Some(parquet_props), )?; - Ok(writer) } @@ -943,7 +938,7 @@ async fn concatenate_parallel_row_groups( mut serialize_rx: Receiver>, schema: Arc, writer_props: Arc, - mut object_store_writer: AbortableWrite>, + mut object_store_writer: Box, ) -> Result { let merged_buff = SharedBuffer::new(INITIAL_BUFFER_BYTES); @@ -985,7 +980,7 @@ async fn concatenate_parallel_row_groups( /// task then stitches these independent RowGroups together and streams this large /// single parquet file to an ObjectStore in multiple parts. async fn output_single_parquet_file_parallelized( - object_store_writer: AbortableWrite>, + object_store_writer: Box, data: Receiver, output_schema: Arc, parquet_props: &WriterProperties, diff --git a/datafusion/core/src/datasource/file_format/write/mod.rs b/datafusion/core/src/datasource/file_format/write/mod.rs index 410a32a19cc1..42115fc7b93f 100644 --- a/datafusion/core/src/datasource/file_format/write/mod.rs +++ b/datafusion/core/src/datasource/file_format/write/mod.rs @@ -18,21 +18,18 @@ //! Module containing helper methods/traits related to enabling //! write support for the various file formats -use std::io::{Error, Write}; -use std::pin::Pin; +use std::io::Write; use std::sync::Arc; -use std::task::{Context, Poll}; use crate::datasource::file_format::file_compression_type::FileCompressionType; use crate::error::Result; use arrow_array::RecordBatch; -use datafusion_common::DataFusionError; use bytes::Bytes; -use futures::future::BoxFuture; +use object_store::buffered::BufWriter; use object_store::path::Path; -use object_store::{MultipartId, ObjectStore}; +use object_store::ObjectStore; use tokio::io::AsyncWrite; pub(crate) mod demux; @@ -69,79 +66,6 @@ impl Write for SharedBuffer { } } -/// Stores data needed during abortion of MultiPart writers -#[derive(Clone)] -pub(crate) struct MultiPart { - /// A shared reference to the object store - store: Arc, - multipart_id: MultipartId, - location: Path, -} - -impl MultiPart { - /// Create a new `MultiPart` - pub fn new( - store: Arc, - multipart_id: MultipartId, - location: Path, - ) -> Self { - Self { - store, - multipart_id, - location, - } - } -} - -/// A wrapper struct with abort method and writer -pub(crate) struct AbortableWrite { - writer: W, - multipart: MultiPart, -} - -impl AbortableWrite { - /// Create a new `AbortableWrite` instance with the given writer, and write mode. - pub(crate) fn new(writer: W, multipart: MultiPart) -> Self { - Self { writer, multipart } - } - - /// handling of abort for different write modes - pub(crate) fn abort_writer(&self) -> Result>> { - let multi = self.multipart.clone(); - Ok(Box::pin(async move { - multi - .store - .abort_multipart(&multi.location, &multi.multipart_id) - .await - .map_err(DataFusionError::ObjectStore) - })) - } -} - -impl AsyncWrite for AbortableWrite { - fn poll_write( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll> { - Pin::new(&mut self.get_mut().writer).poll_write(cx, buf) - } - - fn poll_flush( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { - Pin::new(&mut self.get_mut().writer).poll_flush(cx) - } - - fn poll_shutdown( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { - Pin::new(&mut self.get_mut().writer).poll_shutdown(cx) - } -} - /// A trait that defines the methods required for a RecordBatch serializer. pub trait BatchSerializer: Sync + Send { /// Asynchronously serializes a `RecordBatch` and returns the serialized bytes. @@ -150,19 +74,15 @@ pub trait BatchSerializer: Sync + Send { fn serialize(&self, batch: RecordBatch, initial: bool) -> Result; } -/// Returns an [`AbortableWrite`] which writes to the given object store location -/// with the specified compression +/// Returns an [`AsyncWrite`] which writes to the given object store location +/// with the specified compression. +/// We drop the `AbortableWrite` struct and the writer will not try to cleanup on failure. +/// Users can configure automatic cleanup with their cloud provider. pub(crate) async fn create_writer( file_compression_type: FileCompressionType, location: &Path, object_store: Arc, -) -> Result>> { - let (multipart_id, writer) = object_store - .put_multipart(location) - .await - .map_err(DataFusionError::ObjectStore)?; - Ok(AbortableWrite::new( - file_compression_type.convert_async_writer(writer)?, - MultiPart::new(object_store, multipart_id, location.clone()), - )) +) -> Result> { + let buf_writer = BufWriter::new(object_store, location.clone()); + file_compression_type.convert_async_writer(buf_writer) } diff --git a/datafusion/core/src/datasource/file_format/write/orchestration.rs b/datafusion/core/src/datasource/file_format/write/orchestration.rs index b7f268959311..3ae2122de827 100644 --- a/datafusion/core/src/datasource/file_format/write/orchestration.rs +++ b/datafusion/core/src/datasource/file_format/write/orchestration.rs @@ -22,7 +22,7 @@ use std::sync::Arc; use super::demux::start_demuxer_task; -use super::{create_writer, AbortableWrite, BatchSerializer}; +use super::{create_writer, BatchSerializer}; use crate::datasource::file_format::file_compression_type::FileCompressionType; use crate::datasource::physical_plan::FileSinkConfig; use crate::error::Result; @@ -39,7 +39,7 @@ use tokio::io::{AsyncWrite, AsyncWriteExt}; use tokio::sync::mpsc::{self, Receiver}; use tokio::task::JoinSet; -type WriterType = AbortableWrite>; +type WriterType = Box; type SerializerType = Arc; /// Serializes a single data stream in parallel and writes to an ObjectStore @@ -49,7 +49,7 @@ type SerializerType = Arc; pub(crate) async fn serialize_rb_stream_to_object_store( mut data_rx: Receiver, serializer: Arc, - mut writer: AbortableWrite>, + mut writer: WriterType, ) -> std::result::Result<(WriterType, u64), (WriterType, DataFusionError)> { let (tx, mut rx) = mpsc::channel::>>(100); @@ -173,19 +173,9 @@ pub(crate) async fn stateless_serialize_and_write_files( // Finalize or abort writers as appropriate for mut writer in finished_writers.into_iter() { - match any_errors { - true => { - let abort_result = writer.abort_writer(); - if abort_result.is_err() { - any_abort_errors = true; - } - } - false => { - writer.shutdown() + writer.shutdown() .await .map_err(|_| internal_datafusion_err!("Error encountered while finalizing writes! Partial results may have been written to ObjectStore!"))?; - } - } } if any_errors { diff --git a/datafusion/core/src/datasource/physical_plan/csv.rs b/datafusion/core/src/datasource/physical_plan/csv.rs index 5fcb9f483952..31cc52f79697 100644 --- a/datafusion/core/src/datasource/physical_plan/csv.rs +++ b/datafusion/core/src/datasource/physical_plan/csv.rs @@ -44,6 +44,7 @@ use datafusion_physical_expr::{EquivalenceProperties, LexOrdering}; use bytes::{Buf, Bytes}; use futures::{ready, StreamExt, TryStreamExt}; +use object_store::buffered::BufWriter; use object_store::{GetOptions, GetResultPayload, ObjectStore}; use tokio::io::AsyncWriteExt; use tokio::task::JoinSet; @@ -471,7 +472,7 @@ pub async fn plan_to_csv( let mut stream = plan.execute(i, task_ctx.clone())?; join_set.spawn(async move { - let (_, mut multipart_writer) = storeref.put_multipart(&file).await?; + let mut buf_writer = BufWriter::new(storeref, file.clone()); let mut buffer = Vec::with_capacity(1024); //only write headers on first iteration let mut write_headers = true; @@ -481,15 +482,12 @@ pub async fn plan_to_csv( .build(buffer); writer.write(&batch)?; buffer = writer.into_inner(); - multipart_writer.write_all(&buffer).await?; + buf_writer.write_all(&buffer).await?; buffer.clear(); //prevent writing headers more than once write_headers = false; } - multipart_writer - .shutdown() - .await - .map_err(DataFusionError::from) + buf_writer.shutdown().await.map_err(DataFusionError::from) }); } diff --git a/datafusion/core/src/datasource/physical_plan/json.rs b/datafusion/core/src/datasource/physical_plan/json.rs index 068426e0fdcb..194a4a91c34a 100644 --- a/datafusion/core/src/datasource/physical_plan/json.rs +++ b/datafusion/core/src/datasource/physical_plan/json.rs @@ -43,6 +43,7 @@ use datafusion_physical_expr::{EquivalenceProperties, LexOrdering}; use bytes::{Buf, Bytes}; use futures::{ready, StreamExt, TryStreamExt}; +use object_store::buffered::BufWriter; use object_store::{self, GetOptions, GetResultPayload, ObjectStore}; use tokio::io::AsyncWriteExt; use tokio::task::JoinSet; @@ -338,21 +339,18 @@ pub async fn plan_to_json( let mut stream = plan.execute(i, task_ctx.clone())?; join_set.spawn(async move { - let (_, mut multipart_writer) = storeref.put_multipart(&file).await?; + let mut buf_writer = BufWriter::new(storeref, file.clone()); let mut buffer = Vec::with_capacity(1024); while let Some(batch) = stream.next().await.transpose()? { let mut writer = json::LineDelimitedWriter::new(buffer); writer.write(&batch)?; buffer = writer.into_inner(); - multipart_writer.write_all(&buffer).await?; + buf_writer.write_all(&buffer).await?; buffer.clear(); } - multipart_writer - .shutdown() - .await - .map_err(DataFusionError::from) + buf_writer.shutdown().await.map_err(DataFusionError::from) }); } diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs index 282cd624d036..767cde9cc55e 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs @@ -52,6 +52,7 @@ use futures::future::BoxFuture; use futures::{StreamExt, TryStreamExt}; use itertools::Itertools; use log::debug; +use object_store::buffered::BufWriter; use object_store::path::Path; use object_store::ObjectStore; use parquet::arrow::arrow_reader::ArrowReaderOptions; @@ -698,11 +699,11 @@ pub async fn plan_to_parquet( let propclone = writer_properties.clone(); let storeref = store.clone(); - let (_, multipart_writer) = storeref.put_multipart(&file).await?; + let buf_writer = BufWriter::new(storeref, file.clone()); let mut stream = plan.execute(i, task_ctx.clone())?; join_set.spawn(async move { let mut writer = - AsyncArrowWriter::try_new(multipart_writer, plan.schema(), propclone)?; + AsyncArrowWriter::try_new(buf_writer, plan.schema(), propclone)?; while let Some(next_batch) = stream.next().await { let batch = next_batch?; writer.write(&batch).await?;