From bfa2d432282d4bc381a5086ca163a2a5d232afc1 Mon Sep 17 00:00:00 2001 From: Jorge Leitao Date: Mon, 13 Dec 2021 21:38:47 +0100 Subject: [PATCH] Added async read of avro files. (#620) --- Cargo.toml | 4 +- examples/avro_read_async.rs | 42 +++++++++++++++++++ guide/src/SUMMARY.md | 1 + guide/src/io/README.md | 1 + guide/src/io/avro_read.md | 11 +++++ src/io/avro/read/block.rs | 1 + src/io/avro/read/decompress.rs | 18 ++++---- src/io/avro/read/deserialize.rs | 1 + src/io/avro/read/mod.rs | 6 ++- src/io/avro/read/schema.rs | 2 +- src/io/avro/read_async/block.rs | 67 ++++++++++++++++++++++++++++++ src/io/avro/read_async/header.rs | 0 src/io/avro/read_async/metadata.rs | 60 ++++++++++++++++++++++++++ src/io/avro/read_async/mod.rs | 55 +++--------------------- src/io/avro/read_async/utils.rs | 19 +++++++++ tests/it/io/avro/read_async.rs | 36 ++++++++++++---- 16 files changed, 254 insertions(+), 70 deletions(-) create mode 100644 examples/avro_read_async.rs create mode 100644 guide/src/io/avro_read.md create mode 100644 src/io/avro/read_async/block.rs delete mode 100644 src/io/avro/read_async/header.rs create mode 100644 src/io/avro/read_async/metadata.rs create mode 100644 src/io/avro/read_async/utils.rs diff --git a/Cargo.toml b/Cargo.toml index 8c506cf135b..49ad379efbf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -74,6 +74,8 @@ avro-rs = { version = "0.13", optional = true, default_features = false } # compression of avro libflate = { version = "1.1.1", optional = true } snap = { version = "1", optional = true } +# async avro +async-stream = { version = "0.3.2", optional = true } # for division/remainder optimization at runtime strength_reduce = { version = "0.2", optional = true } @@ -141,7 +143,7 @@ io_avro_compression = [ "libflate", "snap", ] -io_avro_async = ["io_avro", "futures"] +io_avro_async = ["io_avro", "futures", "async-stream"] # io_json: its dependencies + error handling # serde_derive: there is some derive around io_json_integration = ["io_json", "serde_derive", "hex"] diff --git a/examples/avro_read_async.rs b/examples/avro_read_async.rs new file mode 100644 index 00000000000..7d0dd8d808e --- /dev/null +++ b/examples/avro_read_async.rs @@ -0,0 +1,42 @@ +use std::sync::Arc; + +use futures::pin_mut; +use futures::StreamExt; +use tokio::fs::File; +use tokio_util::compat::*; + +use arrow2::error::Result; +use arrow2::io::avro::read::{decompress_block, deserialize}; +use arrow2::io::avro::read_async::*; + +#[tokio::main(flavor = "current_thread")] +async fn main() -> Result<()> { + use std::env; + let args: Vec = env::args().collect(); + + let file_path = &args[1]; + + let mut reader = File::open(file_path).await?.compat(); + + let (avro_schemas, schema, compression, marker) = read_metadata(&mut reader).await?; + let schema = Arc::new(schema); + let avro_schemas = Arc::new(avro_schemas); + + let blocks = block_stream(&mut reader, marker).await; + + pin_mut!(blocks); + while let Some((mut block, rows)) = blocks.next().await.transpose()? { + // the content here is blocking. In general this should run on spawn_blocking + let schema = schema.clone(); + let avro_schemas = avro_schemas.clone(); + let handle = tokio::task::spawn_blocking(move || { + let mut decompressed = vec![]; + decompress_block(&mut block, &mut decompressed, compression)?; + deserialize(&decompressed, rows, schema, &avro_schemas) + }); + let batch = handle.await.unwrap()?; + assert!(batch.num_rows() > 0); + } + + Ok(()) +} diff --git a/guide/src/SUMMARY.md b/guide/src/SUMMARY.md index 8158f21788f..a66c993096d 100644 --- a/guide/src/SUMMARY.md +++ b/guide/src/SUMMARY.md @@ -16,3 +16,4 @@ - [Read Arrow](./io/ipc_read.md) - [Read Arrow stream](./io/ipc_stream_read.md) - [Write Arrow](./io/ipc_write.md) + - [Read Avro](./io/avro_read.md) diff --git a/guide/src/io/README.md b/guide/src/io/README.md index 9c65d0d6f1a..cac3d6017fe 100644 --- a/guide/src/io/README.md +++ b/guide/src/io/README.md @@ -6,5 +6,6 @@ This crate offers optional features that enable interoperability with different * CSV (`io_csv`) * Parquet (`io_parquet`) * Json (`io_json`) +* Avro (`io_avro` and `io_avro_async`) In this section you can find a guide and examples for each one of them. diff --git a/guide/src/io/avro_read.md b/guide/src/io/avro_read.md new file mode 100644 index 00000000000..21dcf054c1c --- /dev/null +++ b/guide/src/io/avro_read.md @@ -0,0 +1,11 @@ +# Avro read + +When compiled with feature `io_avro_async`, you can use this crate to read Avro files +asynchronously. + +```rust +{{#include ../../../examples/avro_read_async.rs}} +``` + +Note how both decompression and deserialization is performed on a separate thread pool to not +block (see also [here](https://ryhl.io/blog/async-what-is-blocking/)). diff --git a/src/io/avro/read/block.rs b/src/io/avro/read/block.rs index f061d1e8fe4..69bc0e3ffb6 100644 --- a/src/io/avro/read/block.rs +++ b/src/io/avro/read/block.rs @@ -33,6 +33,7 @@ fn read_block(reader: &mut R, buf: &mut Vec, file_marker: [u8; 16]) return Ok(0); }; + buf.clear(); buf.resize(bytes, 0); reader.read_exact(buf)?; diff --git a/src/io/avro/read/decompress.rs b/src/io/avro/read/decompress.rs index 70fd2ab5d9a..9fd64ddbf3b 100644 --- a/src/io/avro/read/decompress.rs +++ b/src/io/avro/read/decompress.rs @@ -8,33 +8,33 @@ use crate::error::{ArrowError, Result}; use super::BlockStreamIterator; use super::Compression; -/// Decompresses an avro block. +/// Decompresses an Avro block. /// Returns whether the buffers where swapped. -fn decompress_block( +pub fn decompress_block( block: &mut Vec, - decompress: &mut Vec, + decompressed: &mut Vec, compression: Option, ) -> Result { match compression { None => { - std::mem::swap(block, decompress); + std::mem::swap(block, decompressed); Ok(true) } #[cfg(feature = "io_avro_compression")] Some(Compression::Deflate) => { - decompress.clear(); + decompressed.clear(); let mut decoder = libflate::deflate::Decoder::new(&block[..]); - decoder.read_to_end(decompress)?; + decoder.read_to_end(decompressed)?; Ok(false) } #[cfg(feature = "io_avro_compression")] Some(Compression::Snappy) => { let len = snap::raw::decompress_len(&block[..block.len() - 4]) .map_err(|_| ArrowError::ExternalFormat("Failed to decompress snap".to_string()))?; - decompress.clear(); - decompress.resize(len, 0); + decompressed.clear(); + decompressed.resize(len, 0); snap::raw::Decoder::new() - .decompress(&block[..block.len() - 4], decompress) + .decompress(&block[..block.len() - 4], decompressed) .map_err(|_| ArrowError::ExternalFormat("Failed to decompress snap".to_string()))?; Ok(false) } diff --git a/src/io/avro/read/deserialize.rs b/src/io/avro/read/deserialize.rs index a4a87620314..93fdf5537b4 100644 --- a/src/io/avro/read/deserialize.rs +++ b/src/io/avro/read/deserialize.rs @@ -241,6 +241,7 @@ fn deserialize_value<'a>( Ok(block) } +/// Deserializes an Avro block into a [`RecordBatch`]. pub fn deserialize( mut block: &[u8], rows: usize, diff --git a/src/io/avro/read/mod.rs b/src/io/avro/read/mod.rs index ca5f56a6e22..f702b913f70 100644 --- a/src/io/avro/read/mod.rs +++ b/src/io/avro/read/mod.rs @@ -9,14 +9,16 @@ use fallible_streaming_iterator::FallibleStreamingIterator; mod block; mod decompress; pub use block::BlockStreamIterator; -pub use decompress::Decompressor; +pub use decompress::{decompress_block, Decompressor}; mod deserialize; +pub use deserialize::deserialize; mod header; mod nested; mod schema; mod util; pub(super) use header::deserialize_header; +pub(super) use schema::convert_schema; use crate::datatypes::Schema; use crate::error::Result; @@ -80,7 +82,7 @@ impl Iterator for Reader { self.iter.next().transpose().map(|x| { let (data, rows) = x?; - deserialize::deserialize(data, *rows, schema, avro_schemas) + deserialize(data, *rows, schema, avro_schemas) }) } } diff --git a/src/io/avro/read/schema.rs b/src/io/avro/read/schema.rs index fdb9dba57a2..6304eb0892a 100644 --- a/src/io/avro/read/schema.rs +++ b/src/io/avro/read/schema.rs @@ -8,7 +8,7 @@ use crate::datatypes::*; use crate::error::{ArrowError, Result}; /// Returns the fully qualified name for a field -pub fn aliased(name: &str, namespace: Option<&str>, default_namespace: Option<&str>) -> String { +fn aliased(name: &str, namespace: Option<&str>, default_namespace: Option<&str>) -> String { if name.contains('.') { name.to_string() } else { diff --git a/src/io/avro/read_async/block.rs b/src/io/avro/read_async/block.rs new file mode 100644 index 00000000000..47cf71aea93 --- /dev/null +++ b/src/io/avro/read_async/block.rs @@ -0,0 +1,67 @@ +//! APIs to read from Avro format to arrow. +use async_stream::try_stream; +use futures::AsyncRead; +use futures::AsyncReadExt; +use futures::Stream; + +use crate::error::{ArrowError, Result}; + +use super::utils::zigzag_i64; + +async fn read_size(reader: &mut R) -> Result<(usize, usize)> { + let rows = match zigzag_i64(reader).await { + Ok(a) => a, + Err(ArrowError::Io(io_err)) => { + if let std::io::ErrorKind::UnexpectedEof = io_err.kind() { + // end + return Ok((0, 0)); + } else { + return Err(ArrowError::Io(io_err)); + } + } + Err(other) => return Err(other), + }; + let bytes = zigzag_i64(reader).await?; + Ok((rows as usize, bytes as usize)) +} + +/// Reads a block from the file into `buf`. +/// # Panic +/// Panics iff the block marker does not equal to the file's marker +async fn read_block( + reader: &mut R, + buf: &mut Vec, + file_marker: [u8; 16], +) -> Result { + let (rows, bytes) = read_size(reader).await?; + if rows == 0 { + return Ok(0); + }; + + buf.clear(); + buf.resize(bytes, 0); + reader.read_exact(buf).await?; + + let mut marker = [0u8; 16]; + reader.read_exact(&mut marker).await?; + + assert!(!(marker != file_marker)); + Ok(rows) +} + +/// Returns a fallible [`Stream`] of Avro blocks bound to `reader` +pub async fn block_stream( + reader: &mut R, + file_marker: [u8; 16], +) -> impl Stream, usize)>> + '_ { + try_stream! { + loop { + let mut buffer = vec![]; + let rows = read_block(reader, &mut buffer, file_marker).await?; + if rows == 0 { + break + } + yield (buffer, rows) + } + } +} diff --git a/src/io/avro/read_async/header.rs b/src/io/avro/read_async/header.rs deleted file mode 100644 index e69de29bb2d..00000000000 diff --git a/src/io/avro/read_async/metadata.rs b/src/io/avro/read_async/metadata.rs new file mode 100644 index 00000000000..d3931236793 --- /dev/null +++ b/src/io/avro/read_async/metadata.rs @@ -0,0 +1,60 @@ +//! Async Avro +use std::collections::HashMap; + +use avro_rs::Schema as AvroSchema; +use futures::AsyncRead; +use futures::AsyncReadExt; + +use crate::datatypes::Schema; +use crate::error::{ArrowError, Result}; + +use super::super::read::convert_schema; +use super::super::read::deserialize_header; +use super::super::read::Compression; +use super::super::{read_header, read_metadata}; +use super::utils::zigzag_i64; + +/// Reads Avro's metadata from `reader` into a [`AvroSchema`], [`Compression`] and magic marker. +#[allow(clippy::type_complexity)] +async fn read_metadata_async( + reader: &mut R, +) -> Result<(AvroSchema, Option, [u8; 16])> { + read_metadata!(reader.await) +} + +/// Reads the avro metadata from `reader` into a [`AvroSchema`], [`Compression`] and magic marker. +#[allow(clippy::type_complexity)] +pub async fn read_metadata( + reader: &mut R, +) -> Result<(Vec, Schema, Option, [u8; 16])> { + let (avro_schema, codec, marker) = read_metadata_async(reader).await?; + let schema = convert_schema(&avro_schema)?; + + let avro_schema = if let AvroSchema::Record { fields, .. } = avro_schema { + fields.into_iter().map(|x| x.schema).collect() + } else { + panic!() + }; + + Ok((avro_schema, schema, codec, marker)) +} + +/// Reads the file marker asynchronously +async fn read_file_marker(reader: &mut R) -> Result<[u8; 16]> { + let mut marker = [0u8; 16]; + reader.read_exact(&mut marker).await?; + Ok(marker) +} + +async fn _read_binary(reader: &mut R) -> Result> { + let len: usize = zigzag_i64(reader).await? as usize; + let mut buf = vec![0u8; len]; + reader.read_exact(&mut buf).await?; + Ok(buf) +} + +async fn read_header( + reader: &mut R, +) -> Result>> { + read_header!(reader.await) +} diff --git a/src/io/avro/read_async/mod.rs b/src/io/avro/read_async/mod.rs index c58fff2f617..d5d18500856 100644 --- a/src/io/avro/read_async/mod.rs +++ b/src/io/avro/read_async/mod.rs @@ -1,53 +1,8 @@ //! Async Avro -use std::collections::HashMap; -use avro_rs::Schema; -use futures::AsyncRead; -use futures::AsyncReadExt; +mod block; +mod metadata; +pub(self) mod utils; -use crate::error::{ArrowError, Result}; - -use super::read::deserialize_header; -use super::read::Compression; -use super::{avro_decode, read_header, read_metadata}; - -/// Reads Avro's metadata from `reader` into a [`Schema`], [`Compression`] and magic marker. -#[allow(clippy::type_complexity)] -pub async fn read_metadata_async( - reader: &mut R, -) -> Result<(Schema, Option, [u8; 16])> { - read_metadata!(reader.await) -} - -/// Reads the file marker asynchronously -async fn read_file_marker(reader: &mut R) -> Result<[u8; 16]> { - let mut marker = [0u8; 16]; - reader.read_exact(&mut marker).await?; - Ok(marker) -} - -async fn zigzag_i64(reader: &mut R) -> Result { - let z = decode_variable(reader).await?; - Ok(if z & 0x1 == 0 { - (z >> 1) as i64 - } else { - !(z >> 1) as i64 - }) -} - -async fn decode_variable(reader: &mut R) -> Result { - avro_decode!(reader.await) -} - -async fn _read_binary(reader: &mut R) -> Result> { - let len: usize = zigzag_i64(reader).await? as usize; - let mut buf = vec![0u8; len]; - reader.read_exact(&mut buf).await?; - Ok(buf) -} - -async fn read_header( - reader: &mut R, -) -> Result>> { - read_header!(reader.await) -} +pub use block::block_stream; +pub use metadata::read_metadata; diff --git a/src/io/avro/read_async/utils.rs b/src/io/avro/read_async/utils.rs new file mode 100644 index 00000000000..8c6b7591883 --- /dev/null +++ b/src/io/avro/read_async/utils.rs @@ -0,0 +1,19 @@ +use futures::AsyncRead; +use futures::AsyncReadExt; + +use crate::error::{ArrowError, Result}; + +use super::super::avro_decode; + +pub async fn zigzag_i64(reader: &mut R) -> Result { + let z = decode_variable(reader).await?; + Ok(if z & 0x1 == 0 { + (z >> 1) as i64 + } else { + !(z >> 1) as i64 + }) +} + +async fn decode_variable(reader: &mut R) -> Result { + avro_decode!(reader.await) +} diff --git a/tests/it/io/avro/read_async.rs b/tests/it/io/avro/read_async.rs index dc4f873a527..3ace4f05ae5 100644 --- a/tests/it/io/avro/read_async.rs +++ b/tests/it/io/avro/read_async.rs @@ -1,23 +1,45 @@ +use std::sync::Arc; + use avro_rs::Codec; +use futures::pin_mut; +use futures::StreamExt; + use arrow2::error::Result; -use arrow2::io::avro::read; +use arrow2::io::avro::read_async::*; use super::read::write; -async fn _test_metadata(codec: Codec) -> Result<()> { +async fn test(codec: Codec) -> Result<()> { let (data, expected) = write(codec).unwrap(); - let file = &mut &data[..]; + let mut reader = &mut &data[..]; + + let (_, schema, _, marker) = read_metadata(&mut reader).await?; + let schema = Arc::new(schema); - let (_, schema, _, _) = read::read_metadata(file)?; + assert_eq!(schema.as_ref(), expected.schema().as_ref()); - assert_eq!(&schema, expected.schema().as_ref()); + let blocks = block_stream(&mut reader, marker).await; + pin_mut!(blocks); + while let Some((block, rows)) = blocks.next().await.transpose()? { + assert!(rows > 0 || block.is_empty()) + } Ok(()) } #[tokio::test] -async fn read_metadata() -> Result<()> { - _test_metadata(Codec::Null).await +async fn read_without_codec() -> Result<()> { + test(Codec::Null).await +} + +#[tokio::test] +async fn read_deflate() -> Result<()> { + test(Codec::Deflate).await +} + +#[tokio::test] +async fn read_snappy() -> Result<()> { + test(Codec::Snappy).await }