From d17b2067d637c563eb315571e719807a6f482784 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Tue, 14 May 2024 16:50:39 +0800 Subject: [PATCH] feat: Make AsyncArrowWriter accepts AsyncFileWriter (#5753) Signed-off-by: Xuanwo --- parquet/src/arrow/async_writer/mod.rs | 74 ++++++++++++++++++++++----- 1 file changed, 60 insertions(+), 14 deletions(-) diff --git a/parquet/src/arrow/async_writer/mod.rs b/parquet/src/arrow/async_writer/mod.rs index d95c2007c378..0bedf1fcb731 100644 --- a/parquet/src/arrow/async_writer/mod.rs +++ b/parquet/src/arrow/async_writer/mod.rs @@ -59,13 +59,62 @@ use crate::{ }; use arrow_array::RecordBatch; use arrow_schema::SchemaRef; +use bytes::Bytes; +use futures::future::BoxFuture; +use futures::FutureExt; +use std::mem; use tokio::io::{AsyncWrite, AsyncWriteExt}; -/// Encodes [`RecordBatch`] to parquet, outputting to an [`AsyncWrite`] +/// The asynchronous interface used by [`AsyncArrowWriter`] to write parquet files +pub trait AsyncFileWriter: Send { + /// Write the provided bytes to the underlying writer + /// + /// The underlying writer CAN decide to buffer the data or write it immediately. + /// This design allows the writer implementer to control the buffering and I/O scheduling. + /// + /// The underlying writer MAY implement retry logic to prevent breaking users write process. + fn write(&mut self, bs: Bytes) -> BoxFuture<'_, Result<()>>; + + /// Flush any buffered data to the underlying writer and finish writing process. + /// + /// After `complete` returns `Ok(())`, caller SHOULD not call write again. + fn complete(&mut self) -> BoxFuture<'_, Result<()>>; +} + +impl AsyncFileWriter for Box { + fn write(&mut self, bs: Bytes) -> BoxFuture<'_, Result<()>> { + self.as_mut().write(bs) + } + + fn complete(&mut self) -> BoxFuture<'_, Result<()>> { + self.as_mut().complete() + } +} + +impl AsyncFileWriter for T { + fn write(&mut self, bs: Bytes) -> BoxFuture<'_, Result<()>> { + async move { + self.write_all(&bs).await?; + Ok(()) + } + .boxed() + } + + fn complete(&mut self) -> BoxFuture<'_, Result<()>> { + async move { + self.flush().await?; + self.shutdown().await?; + Ok(()) + } + .boxed() + } +} + +/// Encodes [`RecordBatch`] to parquet, outputting to an [`AsyncFileWriter`] /// /// ## Memory Usage /// -/// This writer eagerly writes data as soon as possible to the underlying [`AsyncWrite`], +/// This writer eagerly writes data as soon as possible to the underlying [`AsyncFileWriter`], /// permitting fine-grained control over buffering and I/O scheduling. However, the columnar /// nature of parquet forces data for an entire row group to be buffered in memory, before /// it can be flushed. Depending on the data and the configured row group size, this buffering @@ -97,7 +146,7 @@ pub struct AsyncArrowWriter { async_writer: W, } -impl AsyncArrowWriter { +impl AsyncArrowWriter { /// Try to create a new Async Arrow Writer pub fn try_new( writer: W, @@ -178,28 +227,25 @@ impl AsyncArrowWriter { // Force to flush the remaining data. self.do_write().await?; - self.async_writer.shutdown().await?; + self.async_writer.complete().await?; Ok(metadata) } /// Flush the data written by `sync_writer` into the `async_writer` + /// + /// # Notes + /// + /// This method will take the inner buffer from the `sync_writer` and write it into the + /// async writer. After the write, the inner buffer will be empty. async fn do_write(&mut self) -> Result<()> { - let buffer = self.sync_writer.inner_mut(); - - self.async_writer - .write_all(buffer.as_slice()) - .await - .map_err(|e| ParquetError::External(Box::new(e)))?; + let buffer = mem::take(self.sync_writer.inner_mut()); self.async_writer - .flush() + .write(Bytes::from(buffer)) .await .map_err(|e| ParquetError::External(Box::new(e)))?; - // reuse the buffer. - buffer.clear(); - Ok(()) } }