Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Migrated to avro-schema
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Dec 19, 2021
1 parent f64339c commit f6e2bb0
Show file tree
Hide file tree
Showing 9 changed files with 88 additions and 106 deletions.
5 changes: 2 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,8 @@ ahash = { version = "0.7", optional = true }
# parquet support
parquet2 = { version = "0.8", optional = true, default_features = false, features = ["stream"] }

# avro
avro-rs = { version = "0.13", optional = true, default_features = false }
# compression of avro
avro-schema = { version = "0.2", optional = true }
libflate = { version = "1.1.1", optional = true }
snap = { version = "1", optional = true }
# async avro
Expand Down Expand Up @@ -138,7 +137,7 @@ io_parquet_compression = [
"parquet2/lz4",
"parquet2/brotli",
]
io_avro = ["avro-rs", "fallible-streaming-iterator", "serde_json"]
io_avro = ["avro-schema", "streaming-iterator", "fallible-streaming-iterator", "serde_json"]
io_avro_compression = [
"libflate",
"snap",
Expand Down
8 changes: 0 additions & 8 deletions src/io/avro/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,6 @@ pub mod read;
#[cfg_attr(docsrs, doc(cfg(feature = "io_avro_async")))]
pub mod read_async;

use crate::error::ArrowError;

impl From<avro_rs::Error> for ArrowError {
fn from(error: avro_rs::Error) -> Self {
ArrowError::External("".to_string(), Box::new(error))
}
}

// macros that can operate in sync and async code.
macro_rules! avro_decode {
($reader:ident $($_await:tt)*) => {
Expand Down
6 changes: 3 additions & 3 deletions src/io/avro/read/deserialize.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::convert::TryInto;
use std::sync::Arc;

use avro_rs::Schema as AvroSchema;
use avro_schema::{Enum, Schema as AvroSchema};

use crate::array::*;
use crate::datatypes::*;
Expand Down Expand Up @@ -33,7 +33,7 @@ fn make_mutable(
Box::new(MutableUtf8Array::<i32>::with_capacity(capacity)) as Box<dyn MutableArray>
}
PhysicalType::Dictionary(_) => {
if let Some(AvroSchema::Enum { symbols, .. }) = avro_schema {
if let Some(AvroSchema::Enum(Enum { symbols, .. })) = avro_schema {
let values = Utf8Array::<i32>::from_slice(symbols);
Box::new(FixedItemsUtf8Dictionary::with_capacity(values, capacity))
as Box<dyn MutableArray>
Expand Down Expand Up @@ -64,7 +64,7 @@ fn make_mutable(

fn is_union_null_first(avro_field: &AvroSchema) -> bool {
if let AvroSchema::Union(schemas) = avro_field {
schemas.variants()[0] == AvroSchema::Null
schemas[0] == AvroSchema::Null
} else {
unreachable!()
}
Expand Down
15 changes: 9 additions & 6 deletions src/io/avro/read/header.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,24 @@
use std::collections::HashMap;

use avro_rs::{Error, Schema};
use avro_schema::Schema;
use serde_json;

use crate::error::Result;
use crate::error::{ArrowError, Result};

use super::Compression;

/// Deserializes the Avro header into an Avro [`Schema`] and optional [`Compression`].
pub(crate) fn deserialize_header(
header: HashMap<String, Vec<u8>>,
) -> Result<(Schema, Option<Compression>)> {
let json = header
let schema = header
.get("avro.schema")
.and_then(|bytes| serde_json::from_slice(bytes.as_ref()).ok())
.ok_or(Error::GetAvroSchemaFromMap)?;
let schema = Schema::parse(&json)?;
.ok_or_else(|| ArrowError::ExternalFormat("Avro schema must be present".to_string()))
.and_then(|bytes| {
println!("{}", std::str::from_utf8(bytes).unwrap());
serde_json::from_slice(bytes.as_ref())
.map_err(|e| ArrowError::ExternalFormat(e.to_string()))
})?;

let compression = header.get("avro.codec").and_then(|bytes| {
let bytes: &[u8] = bytes.as_ref();
Expand Down
4 changes: 2 additions & 2 deletions src/io/avro/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use std::io::Read;
use std::sync::Arc;

use avro_rs::Schema as AvroSchema;
use avro_schema::{Record, Schema as AvroSchema};
use fallible_streaming_iterator::FallibleStreamingIterator;

mod block;
Expand Down Expand Up @@ -41,7 +41,7 @@ pub fn read_metadata<R: std::io::Read>(
let (avro_schema, codec, marker) = util::read_schema(reader)?;
let schema = schema::convert_schema(&avro_schema)?;

let avro_schema = if let AvroSchema::Record { fields, .. } = avro_schema {
let avro_schema = if let AvroSchema::Record(Record { fields, .. }) = avro_schema {
fields.into_iter().map(|x| x.schema).collect()
} else {
panic!()
Expand Down
127 changes: 68 additions & 59 deletions src/io/avro/read/schema.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
use std::collections::BTreeMap;

use avro_rs::schema::Name;
use avro_rs::types::Value;
use avro_rs::Schema as AvroSchema;
use avro_schema::{Enum, Fixed, Record, Schema as AvroSchema};

use crate::datatypes::*;
use crate::error::{ArrowError, Result};
Expand All @@ -24,44 +22,26 @@ fn aliased(name: &str, namespace: Option<&str>, default_namespace: Option<&str>)
fn external_props(schema: &AvroSchema) -> BTreeMap<String, String> {
let mut props = BTreeMap::new();
match &schema {
AvroSchema::Record {
AvroSchema::Record(Record {
doc: Some(ref doc), ..
}
| AvroSchema::Enum {
})
| AvroSchema::Enum(Enum {
doc: Some(ref doc), ..
} => {
}) => {
props.insert("avro::doc".to_string(), doc.clone());
}
_ => {}
}
match &schema {
AvroSchema::Record {
name:
Name {
aliases: Some(aliases),
namespace,
..
},
..
}
| AvroSchema::Enum {
name:
Name {
aliases: Some(aliases),
namespace,
..
},
..
}
| AvroSchema::Fixed {
name:
Name {
aliases: Some(aliases),
namespace,
..
},
..
} => {
AvroSchema::Record(Record {
aliases, namespace, ..
})
| AvroSchema::Enum(Enum {
aliases, namespace, ..
})
| AvroSchema::Fixed(Fixed {
aliases, namespace, ..
}) => {
let aliases: Vec<String> = aliases
.iter()
.map(|alias| aliased(alias, namespace.as_deref(), None))
Expand All @@ -79,7 +59,7 @@ fn external_props(schema: &AvroSchema) -> BTreeMap<String, String> {
pub fn convert_schema(schema: &AvroSchema) -> Result<Schema> {
let mut schema_fields = vec![];
match schema {
AvroSchema::Record { fields, .. } => {
AvroSchema::Record(Record { fields, .. }) => {
for field in fields {
schema_fields.push(schema_to_field(
&field.schema,
Expand All @@ -105,26 +85,55 @@ fn schema_to_field(
let data_type = match schema {
AvroSchema::Null => DataType::Null,
AvroSchema::Boolean => DataType::Boolean,
AvroSchema::Int => DataType::Int32,
AvroSchema::Long => DataType::Int64,
AvroSchema::Int(logical) => match logical {
Some(logical) => match logical {
avro_schema::IntLogical::Date => DataType::Date32,
avro_schema::IntLogical::Time => DataType::Time32(TimeUnit::Millisecond),
},
None => DataType::Int32,
},
AvroSchema::Long(logical) => match logical {
Some(logical) => match logical {
avro_schema::LongLogical::Time => DataType::Time64(TimeUnit::Microsecond),
avro_schema::LongLogical::TimestampMillis => {
DataType::Timestamp(TimeUnit::Millisecond, Some("00:00".to_string()))
}
avro_schema::LongLogical::TimestampMicros => {
DataType::Timestamp(TimeUnit::Microsecond, Some("00:00".to_string()))
}
avro_schema::LongLogical::LocalTimestampMillis => {
DataType::Timestamp(TimeUnit::Millisecond, None)
}
avro_schema::LongLogical::LocalTimestampMicros => {
DataType::Timestamp(TimeUnit::Microsecond, None)
}
},
None => DataType::Int64,
},
AvroSchema::Float => DataType::Float32,
AvroSchema::Double => DataType::Float64,
AvroSchema::Bytes => DataType::Binary,
AvroSchema::String => DataType::Utf8,
AvroSchema::Bytes(logical) => match logical {
Some(logical) => match logical {
avro_schema::BytesLogical::Decimal(precision, scale) => {
DataType::Decimal(*precision, *scale)
}
},
None => DataType::Binary,
},
AvroSchema::String(_) => DataType::Utf8,
AvroSchema::Array(item_schema) => DataType::List(Box::new(schema_to_field(
item_schema,
Some("item"), // default name for list items
false,
None,
)?)),
AvroSchema::Map(_) => todo!("Avro maps are mapped to MapArrays"),
AvroSchema::Union(us) => {
AvroSchema::Union(schemas) => {
// If there are only two variants and one of them is null, set the other type as the field data type
let has_nullable = us.find_schema(&Value::Null).is_some();
let sub_schemas = us.variants();
if has_nullable && sub_schemas.len() == 2 {
let has_nullable = schemas.iter().any(|x| x == &AvroSchema::Null);
if has_nullable && schemas.len() == 2 {
nullable = true;
if let Some(schema) = sub_schemas
if let Some(schema) = schemas
.iter()
.find(|&schema| !matches!(schema, AvroSchema::Null))
{
Expand All @@ -134,18 +143,18 @@ fn schema_to_field(
} else {
return Err(ArrowError::NotYetImplemented(format!(
"Can't read avro union {:?}",
us
schema
)));
}
} else {
let fields = sub_schemas
let fields = schemas
.iter()
.map(|s| schema_to_field(s, None, has_nullable, None))
.collect::<Result<Vec<Field>>>()?;
DataType::Union(fields, None, UnionMode::Dense)
}
}
AvroSchema::Record { name, fields, .. } => {
AvroSchema::Record(Record { name, fields, .. }) => {
let fields: Result<Vec<Field>> = fields
.iter()
.map(|field| {
Expand All @@ -158,7 +167,7 @@ fn schema_to_field(
}*/
schema_to_field(
&field.schema,
Some(&format!("{}.{}", name.fullname(None), field.name)),
Some(&format!("{}.{}", name, field.name)),
false,
Some(&props),
)
Expand All @@ -173,17 +182,17 @@ fn schema_to_field(
false,
))
}
AvroSchema::Fixed { size, .. } => DataType::FixedSizeBinary(*size),
AvroSchema::Decimal {
precision, scale, ..
} => DataType::Decimal(*precision, *scale),
AvroSchema::Uuid => DataType::Utf8,
AvroSchema::Date => DataType::Date32,
AvroSchema::TimeMillis => DataType::Time32(TimeUnit::Millisecond),
AvroSchema::TimeMicros => DataType::Time64(TimeUnit::Microsecond),
AvroSchema::TimestampMillis => DataType::Timestamp(TimeUnit::Millisecond, None),
AvroSchema::TimestampMicros => DataType::Timestamp(TimeUnit::Microsecond, None),
AvroSchema::Duration => DataType::Interval(IntervalUnit::MonthDayNano),
AvroSchema::Fixed(Fixed { size, logical, .. }) => match logical {
Some(logical) => match logical {
avro_schema::FixedLogical::Decimal(precision, scale) => {
DataType::Decimal(*precision, *scale)
}
avro_schema::FixedLogical::Duration => {
DataType::Interval(IntervalUnit::MonthDayNano)
}
},
None => DataType::FixedSizeBinary(*size),
},
};

let name = name.unwrap_or_default();
Expand Down
2 changes: 1 addition & 1 deletion src/io/avro/read/util.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::collections::HashMap;
use std::io::Read;

use avro_rs::Schema;
use avro_schema::Schema;

use crate::error::{ArrowError, Result};

Expand Down
4 changes: 2 additions & 2 deletions src/io/avro/read_async/metadata.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Async Avro
use std::collections::HashMap;

use avro_rs::Schema as AvroSchema;
use avro_schema::{Record, Schema as AvroSchema};
use futures::AsyncRead;
use futures::AsyncReadExt;

Expand Down Expand Up @@ -30,7 +30,7 @@ pub async fn read_metadata<R: AsyncRead + Unpin + Send>(
let (avro_schema, codec, marker) = read_metadata_async(reader).await?;
let schema = convert_schema(&avro_schema)?;

let avro_schema = if let AvroSchema::Record { fields, .. } = avro_schema {
let avro_schema = if let AvroSchema::Record(Record { fields, .. }) = avro_schema {
fields.into_iter().map(|x| x.schema).collect()
} else {
panic!()
Expand Down
Loading

0 comments on commit f6e2bb0

Please sign in to comment.