Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Added support to write to Avro #690

Merged
merged 5 commits into from
Dec 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ io_parquet_compression = [
"parquet2/lz4",
"parquet2/brotli",
]
io_avro = ["avro-schema", "fallible-streaming-iterator", "serde_json"]
io_avro = ["avro-schema", "streaming-iterator", "fallible-streaming-iterator", "serde_json"]
io_avro_compression = [
"libflate",
"snap",
Expand Down
65 changes: 65 additions & 0 deletions examples/avro_write.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
use std::fs::File;

use arrow2::{
array::{Array, Int32Array},
datatypes::{Field, Schema},
error::Result,
io::avro::write,
};

fn write_avro<W: std::io::Write>(
file: &mut W,
arrays: &[&dyn Array],
schema: &Schema,
compression: Option<write::Compression>,
) -> Result<()> {
let avro_fields = write::to_avro_schema(schema)?;

let mut serializers = arrays
.iter()
.zip(avro_fields.iter())
.map(|(array, field)| write::new_serializer(*array, &field.schema))
.collect::<Vec<_>>();
let mut block = write::Block::new(arrays[0].len(), vec![]);

write::serialize(&mut serializers, &mut block)?;

let mut compressed_block = write::CompressedBlock::default();

if let Some(compression) = compression {
write::compress(&block, &mut compressed_block, compression)?;
} else {
compressed_block.number_of_rows = block.number_of_rows;
std::mem::swap(&mut compressed_block.data, &mut block.data);
}

write::write_metadata(file, avro_fields.clone(), compression)?;

write::write_block(file, &compressed_block)?;

Ok(())
}

fn main() -> Result<()> {
use std::env;
let args: Vec<String> = env::args().collect();

let path = &args[1];

let array = Int32Array::from(&[
Some(0),
Some(1),
Some(2),
Some(3),
Some(4),
Some(5),
Some(6),
]);
let field = Field::new("c1", array.data_type().clone(), true);
let schema = Schema::new(vec![field]);

let mut file = File::create(path)?;
write_avro(&mut file, &[(&array) as &dyn Array], &schema, None)?;

Ok(())
}
1 change: 1 addition & 0 deletions guide/src/SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@
- [Read Arrow stream](./io/ipc_stream_read.md)
- [Write Arrow](./io/ipc_write.md)
- [Read Avro](./io/avro_read.md)
- [Write Avro](./io/avro_write.md)
8 changes: 8 additions & 0 deletions guide/src/io/avro_write.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# Avro write

You can use this crate to write to Apache Avro.
Below is an example, which you can run when this crate is compiled with feature `io_avro`.

```rust
{{#include ../../../examples/avro_write.rs}}
```
11 changes: 10 additions & 1 deletion src/io/avro/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,23 @@ pub mod read;
#[cfg(feature = "io_avro_async")]
#[cfg_attr(docsrs, doc(cfg(feature = "io_avro_async")))]
pub mod read_async;
pub mod write;

/// Valid compressions
#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)]
pub enum Compression {
/// Deflate
Deflate,
/// Snappy
Snappy,
}

// macros that can operate in sync and async code.
macro_rules! avro_decode {
($reader:ident $($_await:tt)*) => {
{
let mut i = 0u64;
let mut buf = [0u8; 1];

let mut j = 0;
loop {
if j > 9 {
Expand Down
2 changes: 1 addition & 1 deletion src/io/avro/read/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ fn read_size<R: Read>(reader: &mut R) -> Result<(usize, usize)> {
Ok((rows as usize, bytes as usize))
}

/// Reads a block from the file into `buf`.
/// Reads a block from the `reader` into `buf`.
/// # Panic
/// Panics iff the block marker does not equal to the file's marker
fn read_block<R: Read>(reader: &mut R, buf: &mut Vec<u8>, file_marker: [u8; 16]) -> Result<usize> {
Expand Down
11 changes: 2 additions & 9 deletions src/io/avro/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,22 +24,15 @@ use crate::datatypes::Schema;
use crate::error::Result;
use crate::record_batch::RecordBatch;

/// Valid compressions
#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)]
pub enum Compression {
/// Deflate
Deflate,
/// Snappy
Snappy,
}
use super::Compression;

/// Reads the avro metadata from `reader` into a [`Schema`], [`Compression`] and magic marker.
#[allow(clippy::type_complexity)]
pub fn read_metadata<R: std::io::Read>(
reader: &mut R,
) -> Result<(Vec<AvroSchema>, Schema, Option<Compression>, [u8; 16])> {
let (avro_schema, codec, marker) = util::read_schema(reader)?;
let schema = schema::convert_schema(&avro_schema)?;
let schema = convert_schema(&avro_schema)?;

let avro_schema = if let AvroSchema::Record(Record { fields, .. }) = avro_schema {
fields.into_iter().map(|x| x.schema).collect()
Expand Down
22 changes: 13 additions & 9 deletions src/io/avro/read/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ fn external_props(schema: &AvroSchema) -> BTreeMap<String, String> {
props
}

/// Maps an Avro Schema into a [`Schema`].
pub fn convert_schema(schema: &AvroSchema) -> Result<Schema> {
let mut schema_fields = vec![];
match schema {
Expand All @@ -65,22 +66,25 @@ pub fn convert_schema(schema: &AvroSchema) -> Result<Schema> {
&field.schema,
Some(&field.name),
false,
Some(&external_props(&field.schema)),
Some(external_props(&field.schema)),
)?)
}
}
schema => schema_fields.push(schema_to_field(schema, Some(""), false, None)?),
}

let schema = Schema::new(schema_fields);
Ok(schema)
other => {
return Err(ArrowError::OutOfSpec(format!(
"An avro Schema must be of type Record - it is of type {:?}",
other
)))
}
};
Ok(Schema::new(schema_fields))
}

fn schema_to_field(
schema: &AvroSchema,
name: Option<&str>,
mut nullable: bool,
props: Option<&BTreeMap<String, String>>,
props: Option<BTreeMap<String, String>>,
) -> Result<Field> {
let data_type = match schema {
AvroSchema::Null => DataType::Null,
Expand Down Expand Up @@ -169,7 +173,7 @@ fn schema_to_field(
&field.schema,
Some(&format!("{}.{}", name, field.name)),
false,
Some(&props),
Some(props),
)
})
.collect();
Expand Down Expand Up @@ -198,6 +202,6 @@ fn schema_to_field(
let name = name.unwrap_or_default();

let mut field = Field::new(name, data_type, nullable);
field.set_metadata(props.cloned());
field.set_metadata(props);
Ok(field)
}
2 changes: 1 addition & 1 deletion src/io/avro/read_async/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::error::{ArrowError, Result};

use super::super::read::convert_schema;
use super::super::read::deserialize_header;
use super::super::read::Compression;
use super::super::Compression;
use super::super::{read_header, read_metadata};
use super::utils::zigzag_i64;

Expand Down
69 changes: 69 additions & 0 deletions src/io/avro/write/block.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
use std::io::Write;

use crate::{error::Result, io::avro::Compression};

use super::{util::zigzag_encode, SYNC_NUMBER};

/// A compressed Avro block.
#[derive(Debug, Clone, Default, PartialEq)]
pub struct CompressedBlock {
/// The number of rows
pub number_of_rows: usize,
/// The compressed data
pub data: Vec<u8>,
}

impl CompressedBlock {
/// Creates a new CompressedBlock
pub fn new(number_of_rows: usize, data: Vec<u8>) -> Self {
Self {
number_of_rows,
data,
}
}
}

/// An uncompressed Avro block.
#[derive(Debug, Clone, Default, PartialEq)]
pub struct Block {
/// The number of rows
pub number_of_rows: usize,
/// The uncompressed data
pub data: Vec<u8>,
}

impl Block {
/// Creates a new Block
pub fn new(number_of_rows: usize, data: Vec<u8>) -> Self {
Self {
number_of_rows,
data,
}
}
}

/// Writes a [`CompressedBlock`] to `writer`
pub fn write_block<W: Write>(writer: &mut W, compressed_block: &CompressedBlock) -> Result<()> {
// write size and rows
zigzag_encode(compressed_block.number_of_rows as i64, writer)?;
zigzag_encode(compressed_block.data.len() as i64, writer)?;

writer.write_all(&compressed_block.data)?;

writer.write_all(&SYNC_NUMBER)?;

Ok(())
}

/// Compresses an [`Block`] to a [`CompressedBlock`].
pub fn compress(
block: &Block,
compressed_block: &mut CompressedBlock,
compression: Compression,
) -> Result<()> {
compressed_block.number_of_rows = block.number_of_rows;
match compression {
Compression::Deflate => todo!(),
Compression::Snappy => todo!(),
}
}
30 changes: 30 additions & 0 deletions src/io/avro/write/header.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
use std::collections::HashMap;

use avro_schema::Schema;
use serde_json;

use crate::error::{ArrowError, Result};

use super::Compression;

/// Serializes an [`Schema`] and optional [`Compression`] into an avro header.
pub(crate) fn serialize_header(
schema: &Schema,
compression: Option<Compression>,
) -> Result<HashMap<String, Vec<u8>>> {
let schema =
serde_json::to_string(schema).map_err(|e| ArrowError::ExternalFormat(e.to_string()))?;

let mut header = HashMap::<String, Vec<u8>>::default();

header.insert("avro.schema".to_string(), schema.into_bytes());
if let Some(compression) = compression {
let value = match compression {
Compression::Snappy => b"snappy".to_vec(),
Compression::Deflate => b"deflate".to_vec(),
};
header.insert("avro.codec".to_string(), value);
};

Ok(header)
}
69 changes: 69 additions & 0 deletions src/io/avro/write/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
//! APIs to write to Avro format.
use std::io::Write;

use avro_schema::{Field as AvroField, Record, Schema as AvroSchema};

use crate::error::Result;

pub use super::Compression;

mod header;
use header::serialize_header;
mod schema;
pub use schema::to_avro_schema;
mod serialize;
pub use serialize::{can_serialize, new_serializer, BoxSerializer};
mod block;
pub use block::*;
mod util;

const SYNC_NUMBER: [u8; 16] = [1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4];

/// Writes Avro's metadata to `writer`.
pub fn write_metadata<W: std::io::Write>(
writer: &mut W,
fields: Vec<AvroField>,
compression: Option<Compression>,
) -> Result<()> {
// * Four bytes, ASCII 'O', 'b', 'j', followed by 1.
let avro_magic = [b'O', b'b', b'j', 1u8];
writer.write_all(&avro_magic)?;

// * file metadata, including the schema.
let schema = AvroSchema::Record(Record::new("", fields));
let header = serialize_header(&schema, compression)?;

util::zigzag_encode(header.len() as i64, writer)?;
for (name, item) in header {
util::write_binary(name.as_bytes(), writer)?;
util::write_binary(&item, writer)?;
}
writer.write_all(&[0])?;

// The 16-byte, randomly-generated sync marker for this file.
writer.write_all(&SYNC_NUMBER)?;

Ok(())
}

/// consumes a set of [`BoxSerializer`] into an [`Block`].
/// # Panics
/// Panics iff the number of items in any of the serializers is not equal to the number of rows
/// declared in the `block`.
pub fn serialize<'a>(serializers: &mut [BoxSerializer<'a>], block: &mut Block) -> Result<()> {
let Block {
data,
number_of_rows,
} = block;

data.clear(); // restart it

// _the_ transpose (columns -> rows)
for _ in 0..*number_of_rows {
for serializer in &mut *serializers {
let item_data = serializer.next().unwrap();
data.write_all(item_data)?;
}
}
Ok(())
}
Loading