Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Added async read of avro files. (#620)
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao authored Dec 13, 2021
1 parent a8c55f9 commit bfa2d43
Show file tree
Hide file tree
Showing 16 changed files with 254 additions and 70 deletions.
4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ avro-rs = { version = "0.13", optional = true, default_features = false }
# compression of avro
libflate = { version = "1.1.1", optional = true }
snap = { version = "1", optional = true }
# async avro
async-stream = { version = "0.3.2", optional = true }

# for division/remainder optimization at runtime
strength_reduce = { version = "0.2", optional = true }
Expand Down Expand Up @@ -141,7 +143,7 @@ io_avro_compression = [
"libflate",
"snap",
]
io_avro_async = ["io_avro", "futures"]
io_avro_async = ["io_avro", "futures", "async-stream"]
# io_json: its dependencies + error handling
# serde_derive: there is some derive around
io_json_integration = ["io_json", "serde_derive", "hex"]
Expand Down
42 changes: 42 additions & 0 deletions examples/avro_read_async.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
use std::sync::Arc;

use futures::pin_mut;
use futures::StreamExt;
use tokio::fs::File;
use tokio_util::compat::*;

use arrow2::error::Result;
use arrow2::io::avro::read::{decompress_block, deserialize};
use arrow2::io::avro::read_async::*;

#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<()> {
use std::env;
let args: Vec<String> = env::args().collect();

let file_path = &args[1];

let mut reader = File::open(file_path).await?.compat();

let (avro_schemas, schema, compression, marker) = read_metadata(&mut reader).await?;
let schema = Arc::new(schema);
let avro_schemas = Arc::new(avro_schemas);

let blocks = block_stream(&mut reader, marker).await;

pin_mut!(blocks);
while let Some((mut block, rows)) = blocks.next().await.transpose()? {
// the content here is blocking. In general this should run on spawn_blocking
let schema = schema.clone();
let avro_schemas = avro_schemas.clone();
let handle = tokio::task::spawn_blocking(move || {
let mut decompressed = vec![];
decompress_block(&mut block, &mut decompressed, compression)?;
deserialize(&decompressed, rows, schema, &avro_schemas)
});
let batch = handle.await.unwrap()?;
assert!(batch.num_rows() > 0);
}

Ok(())
}
1 change: 1 addition & 0 deletions guide/src/SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@
- [Read Arrow](./io/ipc_read.md)
- [Read Arrow stream](./io/ipc_stream_read.md)
- [Write Arrow](./io/ipc_write.md)
- [Read Avro](./io/avro_read.md)
1 change: 1 addition & 0 deletions guide/src/io/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,6 @@ This crate offers optional features that enable interoperability with different
* CSV (`io_csv`)
* Parquet (`io_parquet`)
* Json (`io_json`)
* Avro (`io_avro` and `io_avro_async`)

In this section you can find a guide and examples for each one of them.
11 changes: 11 additions & 0 deletions guide/src/io/avro_read.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# Avro read

When compiled with feature `io_avro_async`, you can use this crate to read Avro files
asynchronously.

```rust
{{#include ../../../examples/avro_read_async.rs}}
```

Note how both decompression and deserialization is performed on a separate thread pool to not
block (see also [here](https://ryhl.io/blog/async-what-is-blocking/)).
1 change: 1 addition & 0 deletions src/io/avro/read/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ fn read_block<R: Read>(reader: &mut R, buf: &mut Vec<u8>, file_marker: [u8; 16])
return Ok(0);
};

buf.clear();
buf.resize(bytes, 0);
reader.read_exact(buf)?;

Expand Down
18 changes: 9 additions & 9 deletions src/io/avro/read/decompress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,33 +8,33 @@ use crate::error::{ArrowError, Result};
use super::BlockStreamIterator;
use super::Compression;

/// Decompresses an avro block.
/// Decompresses an Avro block.
/// Returns whether the buffers where swapped.
fn decompress_block(
pub fn decompress_block(
block: &mut Vec<u8>,
decompress: &mut Vec<u8>,
decompressed: &mut Vec<u8>,
compression: Option<Compression>,
) -> Result<bool> {
match compression {
None => {
std::mem::swap(block, decompress);
std::mem::swap(block, decompressed);
Ok(true)
}
#[cfg(feature = "io_avro_compression")]
Some(Compression::Deflate) => {
decompress.clear();
decompressed.clear();
let mut decoder = libflate::deflate::Decoder::new(&block[..]);
decoder.read_to_end(decompress)?;
decoder.read_to_end(decompressed)?;
Ok(false)
}
#[cfg(feature = "io_avro_compression")]
Some(Compression::Snappy) => {
let len = snap::raw::decompress_len(&block[..block.len() - 4])
.map_err(|_| ArrowError::ExternalFormat("Failed to decompress snap".to_string()))?;
decompress.clear();
decompress.resize(len, 0);
decompressed.clear();
decompressed.resize(len, 0);
snap::raw::Decoder::new()
.decompress(&block[..block.len() - 4], decompress)
.decompress(&block[..block.len() - 4], decompressed)
.map_err(|_| ArrowError::ExternalFormat("Failed to decompress snap".to_string()))?;
Ok(false)
}
Expand Down
1 change: 1 addition & 0 deletions src/io/avro/read/deserialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ fn deserialize_value<'a>(
Ok(block)
}

/// Deserializes an Avro block into a [`RecordBatch`].
pub fn deserialize(
mut block: &[u8],
rows: usize,
Expand Down
6 changes: 4 additions & 2 deletions src/io/avro/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,16 @@ use fallible_streaming_iterator::FallibleStreamingIterator;
mod block;
mod decompress;
pub use block::BlockStreamIterator;
pub use decompress::Decompressor;
pub use decompress::{decompress_block, Decompressor};
mod deserialize;
pub use deserialize::deserialize;
mod header;
mod nested;
mod schema;
mod util;

pub(super) use header::deserialize_header;
pub(super) use schema::convert_schema;

use crate::datatypes::Schema;
use crate::error::Result;
Expand Down Expand Up @@ -80,7 +82,7 @@ impl<R: Read> Iterator for Reader<R> {

self.iter.next().transpose().map(|x| {
let (data, rows) = x?;
deserialize::deserialize(data, *rows, schema, avro_schemas)
deserialize(data, *rows, schema, avro_schemas)
})
}
}
2 changes: 1 addition & 1 deletion src/io/avro/read/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::datatypes::*;
use crate::error::{ArrowError, Result};

/// Returns the fully qualified name for a field
pub fn aliased(name: &str, namespace: Option<&str>, default_namespace: Option<&str>) -> String {
fn aliased(name: &str, namespace: Option<&str>, default_namespace: Option<&str>) -> String {
if name.contains('.') {
name.to_string()
} else {
Expand Down
67 changes: 67 additions & 0 deletions src/io/avro/read_async/block.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
//! APIs to read from Avro format to arrow.
use async_stream::try_stream;
use futures::AsyncRead;
use futures::AsyncReadExt;
use futures::Stream;

use crate::error::{ArrowError, Result};

use super::utils::zigzag_i64;

async fn read_size<R: AsyncRead + Unpin + Send>(reader: &mut R) -> Result<(usize, usize)> {
let rows = match zigzag_i64(reader).await {
Ok(a) => a,
Err(ArrowError::Io(io_err)) => {
if let std::io::ErrorKind::UnexpectedEof = io_err.kind() {
// end
return Ok((0, 0));
} else {
return Err(ArrowError::Io(io_err));
}
}
Err(other) => return Err(other),
};
let bytes = zigzag_i64(reader).await?;
Ok((rows as usize, bytes as usize))
}

/// Reads a block from the file into `buf`.
/// # Panic
/// Panics iff the block marker does not equal to the file's marker
async fn read_block<R: AsyncRead + Unpin + Send>(
reader: &mut R,
buf: &mut Vec<u8>,
file_marker: [u8; 16],
) -> Result<usize> {
let (rows, bytes) = read_size(reader).await?;
if rows == 0 {
return Ok(0);
};

buf.clear();
buf.resize(bytes, 0);
reader.read_exact(buf).await?;

let mut marker = [0u8; 16];
reader.read_exact(&mut marker).await?;

assert!(!(marker != file_marker));
Ok(rows)
}

/// Returns a fallible [`Stream`] of Avro blocks bound to `reader`
pub async fn block_stream<R: AsyncRead + Unpin + Send>(
reader: &mut R,
file_marker: [u8; 16],
) -> impl Stream<Item = Result<(Vec<u8>, usize)>> + '_ {
try_stream! {
loop {
let mut buffer = vec![];
let rows = read_block(reader, &mut buffer, file_marker).await?;
if rows == 0 {
break
}
yield (buffer, rows)
}
}
}
Empty file removed src/io/avro/read_async/header.rs
Empty file.
60 changes: 60 additions & 0 deletions src/io/avro/read_async/metadata.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
//! Async Avro
use std::collections::HashMap;

use avro_rs::Schema as AvroSchema;
use futures::AsyncRead;
use futures::AsyncReadExt;

use crate::datatypes::Schema;
use crate::error::{ArrowError, Result};

use super::super::read::convert_schema;
use super::super::read::deserialize_header;
use super::super::read::Compression;
use super::super::{read_header, read_metadata};
use super::utils::zigzag_i64;

/// Reads Avro's metadata from `reader` into a [`AvroSchema`], [`Compression`] and magic marker.
#[allow(clippy::type_complexity)]
async fn read_metadata_async<R: AsyncRead + Unpin + Send>(
reader: &mut R,
) -> Result<(AvroSchema, Option<Compression>, [u8; 16])> {
read_metadata!(reader.await)
}

/// Reads the avro metadata from `reader` into a [`AvroSchema`], [`Compression`] and magic marker.
#[allow(clippy::type_complexity)]
pub async fn read_metadata<R: AsyncRead + Unpin + Send>(
reader: &mut R,
) -> Result<(Vec<AvroSchema>, Schema, Option<Compression>, [u8; 16])> {
let (avro_schema, codec, marker) = read_metadata_async(reader).await?;
let schema = convert_schema(&avro_schema)?;

let avro_schema = if let AvroSchema::Record { fields, .. } = avro_schema {
fields.into_iter().map(|x| x.schema).collect()
} else {
panic!()
};

Ok((avro_schema, schema, codec, marker))
}

/// Reads the file marker asynchronously
async fn read_file_marker<R: AsyncRead + Unpin + Send>(reader: &mut R) -> Result<[u8; 16]> {
let mut marker = [0u8; 16];
reader.read_exact(&mut marker).await?;
Ok(marker)
}

async fn _read_binary<R: AsyncRead + Unpin + Send>(reader: &mut R) -> Result<Vec<u8>> {
let len: usize = zigzag_i64(reader).await? as usize;
let mut buf = vec![0u8; len];
reader.read_exact(&mut buf).await?;
Ok(buf)
}

async fn read_header<R: AsyncRead + Unpin + Send>(
reader: &mut R,
) -> Result<HashMap<String, Vec<u8>>> {
read_header!(reader.await)
}
55 changes: 5 additions & 50 deletions src/io/avro/read_async/mod.rs
Original file line number Diff line number Diff line change
@@ -1,53 +1,8 @@
//! Async Avro
use std::collections::HashMap;
use avro_rs::Schema;
use futures::AsyncRead;
use futures::AsyncReadExt;
mod block;
mod metadata;
pub(self) mod utils;

use crate::error::{ArrowError, Result};

use super::read::deserialize_header;
use super::read::Compression;
use super::{avro_decode, read_header, read_metadata};

/// Reads Avro's metadata from `reader` into a [`Schema`], [`Compression`] and magic marker.
#[allow(clippy::type_complexity)]
pub async fn read_metadata_async<R: AsyncRead + Unpin + Send>(
reader: &mut R,
) -> Result<(Schema, Option<Compression>, [u8; 16])> {
read_metadata!(reader.await)
}

/// Reads the file marker asynchronously
async fn read_file_marker<R: AsyncRead + Unpin + Send>(reader: &mut R) -> Result<[u8; 16]> {
let mut marker = [0u8; 16];
reader.read_exact(&mut marker).await?;
Ok(marker)
}

async fn zigzag_i64<R: AsyncRead + Unpin + Send>(reader: &mut R) -> Result<i64> {
let z = decode_variable(reader).await?;
Ok(if z & 0x1 == 0 {
(z >> 1) as i64
} else {
!(z >> 1) as i64
})
}

async fn decode_variable<R: AsyncRead + Unpin + Send>(reader: &mut R) -> Result<u64> {
avro_decode!(reader.await)
}

async fn _read_binary<R: AsyncRead + Unpin + Send>(reader: &mut R) -> Result<Vec<u8>> {
let len: usize = zigzag_i64(reader).await? as usize;
let mut buf = vec![0u8; len];
reader.read_exact(&mut buf).await?;
Ok(buf)
}

async fn read_header<R: AsyncRead + Unpin + Send>(
reader: &mut R,
) -> Result<HashMap<String, Vec<u8>>> {
read_header!(reader.await)
}
pub use block::block_stream;
pub use metadata::read_metadata;
19 changes: 19 additions & 0 deletions src/io/avro/read_async/utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
use futures::AsyncRead;
use futures::AsyncReadExt;

use crate::error::{ArrowError, Result};

use super::super::avro_decode;

pub async fn zigzag_i64<R: AsyncRead + Unpin + Send>(reader: &mut R) -> Result<i64> {
let z = decode_variable(reader).await?;
Ok(if z & 0x1 == 0 {
(z >> 1) as i64
} else {
!(z >> 1) as i64
})
}

async fn decode_variable<R: AsyncRead + Unpin + Send>(reader: &mut R) -> Result<u64> {
avro_decode!(reader.await)
}
Loading

0 comments on commit bfa2d43

Please sign in to comment.