-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use object_store:BufWriter to replace put_multipart #9648
Changes from 3 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,21 +18,18 @@ | |
//! Module containing helper methods/traits related to enabling | ||
//! write support for the various file formats | ||
|
||
use std::io::{Error, Write}; | ||
use std::pin::Pin; | ||
use std::io::Write; | ||
use std::sync::Arc; | ||
use std::task::{Context, Poll}; | ||
|
||
use crate::datasource::file_format::file_compression_type::FileCompressionType; | ||
use crate::error::Result; | ||
|
||
use arrow_array::RecordBatch; | ||
use datafusion_common::DataFusionError; | ||
|
||
use bytes::Bytes; | ||
use futures::future::BoxFuture; | ||
use object_store::buffered::BufWriter; | ||
use object_store::path::Path; | ||
use object_store::{MultipartId, ObjectStore}; | ||
use object_store::ObjectStore; | ||
use tokio::io::AsyncWrite; | ||
|
||
pub(crate) mod demux; | ||
|
@@ -69,79 +66,6 @@ impl Write for SharedBuffer { | |
} | ||
} | ||
|
||
/// Stores data needed during abortion of MultiPart writers | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it correct to say that the implications of removing While I understand that some object stores (maybe all) can be configured to automatically clean up such parts, I think reverting the "try to cleanup on failure" behavior is worth reconsidering. I think I could be convinced with an argument like "the software can't ensure clean up anyways (for example, if it is I think @metesynnada or @mustafasrepo originally added this code (though I may be wrong) so perhaps they have some perspective to share There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, I think according to #9648 (comment), 'garbage' cleanup will be only on cloud provider if removing There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Yes. I have mixed feelings about removing any attempt to clean up on failure.
This argument is valid. A hardware/network fault will prevent any cleanup code we write from working, so to truly protect against partial writes would require logic outside of DataFusion's process (e.g. on the cloud service provider side). On the other hand, this change may be annoying for simple failures when writing to a local file system. Encountering any execution error will leave dangling files when before they often could be cleaned up. I think this is a case where one will draw different conclusions depending on if they are imagining an individual user of something like There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Local file system automatically cleans up on drop, or at least makes a best effort to do so. FWIW this same mechanism is used for ALL uploads, even the non-multipart ones so as to provide atomicity. Given nobody has complained about this, I suspect it is doing a fairly good job I am not aware of a cloud provider that provides multipart uploads without some automated way to reap aborted uploads after a given time There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok, so I vote we leave the code as is (no attempt to explicitly abort in write) and add a note in the documentation. If it turns out this is an important behavior, we can add it back in @yyy1000 can you handle adding the note in the documentation? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure, I added it around There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Looks reasonable to me -- thank you |
||
#[derive(Clone)] | ||
pub(crate) struct MultiPart { | ||
/// A shared reference to the object store | ||
store: Arc<dyn ObjectStore>, | ||
multipart_id: MultipartId, | ||
location: Path, | ||
} | ||
|
||
impl MultiPart { | ||
/// Create a new `MultiPart` | ||
pub fn new( | ||
store: Arc<dyn ObjectStore>, | ||
multipart_id: MultipartId, | ||
location: Path, | ||
) -> Self { | ||
Self { | ||
store, | ||
multipart_id, | ||
location, | ||
} | ||
} | ||
} | ||
|
||
/// A wrapper struct with abort method and writer | ||
pub(crate) struct AbortableWrite<W: AsyncWrite + Unpin + Send> { | ||
writer: W, | ||
multipart: MultiPart, | ||
} | ||
|
||
impl<W: AsyncWrite + Unpin + Send> AbortableWrite<W> { | ||
/// Create a new `AbortableWrite` instance with the given writer, and write mode. | ||
pub(crate) fn new(writer: W, multipart: MultiPart) -> Self { | ||
Self { writer, multipart } | ||
} | ||
|
||
/// handling of abort for different write modes | ||
pub(crate) fn abort_writer(&self) -> Result<BoxFuture<'static, Result<()>>> { | ||
let multi = self.multipart.clone(); | ||
Ok(Box::pin(async move { | ||
multi | ||
.store | ||
.abort_multipart(&multi.location, &multi.multipart_id) | ||
.await | ||
.map_err(DataFusionError::ObjectStore) | ||
})) | ||
} | ||
} | ||
|
||
impl<W: AsyncWrite + Unpin + Send> AsyncWrite for AbortableWrite<W> { | ||
fn poll_write( | ||
self: Pin<&mut Self>, | ||
cx: &mut Context<'_>, | ||
buf: &[u8], | ||
) -> Poll<std::result::Result<usize, Error>> { | ||
Pin::new(&mut self.get_mut().writer).poll_write(cx, buf) | ||
} | ||
|
||
fn poll_flush( | ||
self: Pin<&mut Self>, | ||
cx: &mut Context<'_>, | ||
) -> Poll<std::result::Result<(), Error>> { | ||
Pin::new(&mut self.get_mut().writer).poll_flush(cx) | ||
} | ||
|
||
fn poll_shutdown( | ||
self: Pin<&mut Self>, | ||
cx: &mut Context<'_>, | ||
) -> Poll<std::result::Result<(), Error>> { | ||
Pin::new(&mut self.get_mut().writer).poll_shutdown(cx) | ||
} | ||
} | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I also removed |
||
/// A trait that defines the methods required for a RecordBatch serializer. | ||
pub trait BatchSerializer: Sync + Send { | ||
/// Asynchronously serializes a `RecordBatch` and returns the serialized bytes. | ||
|
@@ -150,19 +74,13 @@ pub trait BatchSerializer: Sync + Send { | |
fn serialize(&self, batch: RecordBatch, initial: bool) -> Result<Bytes>; | ||
} | ||
|
||
/// Returns an [`AbortableWrite`] which writes to the given object store location | ||
/// Returns an [`AsyncWrite`] which writes to the given object store location | ||
/// with the specified compression | ||
pub(crate) async fn create_writer( | ||
file_compression_type: FileCompressionType, | ||
location: &Path, | ||
object_store: Arc<dyn ObjectStore>, | ||
) -> Result<AbortableWrite<Box<dyn AsyncWrite + Send + Unpin>>> { | ||
let (multipart_id, writer) = object_store | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For anyone else following along, the |
||
.put_multipart(location) | ||
.await | ||
.map_err(DataFusionError::ObjectStore)?; | ||
Ok(AbortableWrite::new( | ||
file_compression_type.convert_async_writer(writer)?, | ||
MultiPart::new(object_store, multipart_id, location.clone()), | ||
)) | ||
) -> Result<Box<dyn AsyncWrite + Send + Unpin>> { | ||
let buf_writer = BufWriter::new(object_store, location.clone()); | ||
file_compression_type.convert_async_writer(buf_writer) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,7 +22,7 @@ | |
use std::sync::Arc; | ||
|
||
use super::demux::start_demuxer_task; | ||
use super::{create_writer, AbortableWrite, BatchSerializer}; | ||
use super::{create_writer, BatchSerializer}; | ||
use crate::datasource::file_format::file_compression_type::FileCompressionType; | ||
use crate::datasource::physical_plan::FileSinkConfig; | ||
use crate::error::Result; | ||
|
@@ -39,7 +39,7 @@ use tokio::io::{AsyncWrite, AsyncWriteExt}; | |
use tokio::sync::mpsc::{self, Receiver}; | ||
use tokio::task::JoinSet; | ||
|
||
type WriterType = AbortableWrite<Box<dyn AsyncWrite + Send + Unpin>>; | ||
type WriterType = Box<dyn AsyncWrite + Send + Unpin>; | ||
type SerializerType = Arc<dyn BatchSerializer>; | ||
|
||
/// Serializes a single data stream in parallel and writes to an ObjectStore | ||
|
@@ -49,7 +49,7 @@ type SerializerType = Arc<dyn BatchSerializer>; | |
pub(crate) async fn serialize_rb_stream_to_object_store( | ||
mut data_rx: Receiver<RecordBatch>, | ||
serializer: Arc<dyn BatchSerializer>, | ||
mut writer: AbortableWrite<Box<dyn AsyncWrite + Send + Unpin>>, | ||
mut writer: WriterType, | ||
) -> std::result::Result<(WriterType, u64), (WriterType, DataFusionError)> { | ||
let (tx, mut rx) = | ||
mpsc::channel::<SpawnedTask<Result<(usize, Bytes), DataFusionError>>>(100); | ||
|
@@ -173,19 +173,9 @@ pub(crate) async fn stateless_serialize_and_write_files( | |
|
||
// Finalize or abort writers as appropriate | ||
for mut writer in finished_writers.into_iter() { | ||
match any_errors { | ||
true => { | ||
let abort_result = writer.abort_writer(); | ||
if abort_result.is_err() { | ||
any_abort_errors = true; | ||
} | ||
} | ||
false => { | ||
writer.shutdown() | ||
writer.shutdown() | ||
.await | ||
.map_err(|_| internal_datafusion_err!("Error encountered while finalizing writes! Partial results may have been written to ObjectStore!"))?; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Don't know whether it's proper to let all just |
||
} | ||
} | ||
} | ||
|
||
if any_errors { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm thinking whether it's OK to change the param type here cause it's public, but keeping
Box<dyn AsyncWrite + Send + Unpin>
makes the type incompatible. 🤔There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it depends on if this code is always used with
object_store
(aka if DataFusion code always writes output using theobject_store
API). If this is the case, then switching toBufWriter
here makes sense to meBTW I think we need to update the comments on this function to match the new implementation