Skip to content

Commit

Permalink
Added basic support for arrow -> avro codec along with beginnings of …
Browse files Browse the repository at this point in the history
…avro writer.
  • Loading branch information
jecsand838 committed Dec 28, 2024
1 parent 63899b7 commit 7d4e6fd
Show file tree
Hide file tree
Showing 6 changed files with 640 additions and 12 deletions.
228 changes: 227 additions & 1 deletion arrow-avro/src/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,14 @@
// specific language governing permissions and limitations
// under the License.

use crate::schema::{Attributes, ComplexType, PrimitiveType, Record, Schema, TypeName};
use crate::schema::{
Attributes, ComplexType, PrimitiveType, Schema, TypeName, Array, Fixed, Map, Record,
Field as AvroFieldDef
};
use arrow_schema::{
ArrowError, DataType, Field, FieldRef, IntervalUnit, SchemaBuilder, SchemaRef, TimeUnit,
};
use arrow_array::{ArrayRef, Int32Array, StringArray, StructArray, RecordBatch};
use std::borrow::Cow;
use std::collections::HashMap;
use std::sync::Arc;
Expand All @@ -45,6 +49,25 @@ pub struct AvroDataType {
}

impl AvroDataType {

/// Create a new AvroDataType with the given parts.
/// This helps you construct it from outside `codec.rs` without exposing internals.
pub fn new(
codec: Codec,
nullability: Option<Nullability>,
metadata: HashMap<String, String>,
) -> Self {
AvroDataType {
codec,
nullability,
metadata,
}
}

pub fn from_codec(codec: Codec) -> Self {
Self::new(codec, None, Default::default())
}

/// Returns an arrow [`Field`] with the given name
pub fn field_with_name(&self, name: &str) -> Field {
let d = self.codec.data_type();
Expand All @@ -58,6 +81,23 @@ impl AvroDataType {
pub fn nullability(&self) -> Option<Nullability> {
self.nullability
}

/// Convert this `AvroDataType`, which encapsulates an Arrow data type (`codec`)
/// plus nullability, back into an Avro `Schema<'a>`.
pub fn to_avro_schema<'a>(&'a self, name: &'a str) -> Schema<'a> {
let inner_schema = self.codec.to_avro_schema(name);

// If the field is nullable in Arrow, wrap Avro schema in a union: ["null", <type>].
// Otherwise, return the schema as-is.
if let Some(_) = self.nullability {
Schema::Union(vec![
Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
inner_schema,
])
} else {
inner_schema
}
}
}

/// A named [`AvroDataType`]
Expand Down Expand Up @@ -157,6 +197,128 @@ impl Codec {
Self::Struct(f) => DataType::Struct(f.iter().map(|x| x.field()).collect()),
}
}

/// Convert this `Codec` variant to an Avro `Schema<'a>`.
/// More work needed to handle `decimal`, `enum`, `map`, etc.
pub fn to_avro_schema<'a>(&'a self, name: &'a str) -> Schema<'a> {
match self {
Codec::Null => Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
Codec::Boolean => Schema::TypeName(TypeName::Primitive(PrimitiveType::Boolean)),
Codec::Int32 => Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
Codec::Int64 => Schema::TypeName(TypeName::Primitive(PrimitiveType::Long)),
Codec::Float32 => Schema::TypeName(TypeName::Primitive(PrimitiveType::Float)),
Codec::Float64 => Schema::TypeName(TypeName::Primitive(PrimitiveType::Double)),
Codec::Binary => Schema::TypeName(TypeName::Primitive(PrimitiveType::Bytes)),
Codec::Utf8 => Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),

// date32 => Avro int + logicalType=date
Codec::Date32 => Schema::Type(crate::schema::Type {
r#type: TypeName::Primitive(PrimitiveType::Int),
attributes: Attributes {
logical_type: Some("date"),
additional: Default::default(),
},
}),

// time-millis => Avro int with logicalType=time-millis
Codec::TimeMillis => Schema::Type(crate::schema::Type {
r#type: TypeName::Primitive(PrimitiveType::Int),
attributes: Attributes {
logical_type: Some("time-millis"),
additional: Default::default(),
},
}),

// time-micros => Avro long with logicalType=time-micros
Codec::TimeMicros => Schema::Type(crate::schema::Type {
r#type: TypeName::Primitive(PrimitiveType::Long),
attributes: Attributes {
logical_type: Some("time-micros"),
additional: Default::default(),
},
}),

// timestamp-millis => Avro long with logicalType=timestamp-millis
Codec::TimestampMillis(is_utc) => {
// TODO `is_utc` or store it in metadata
Schema::Type(crate::schema::Type {
r#type: TypeName::Primitive(PrimitiveType::Long),
attributes: Attributes {
logical_type: Some("timestamp-millis"),
additional: Default::default(),
},
})
}

// timestamp-micros => Avro long with logicalType=timestamp-micros
Codec::TimestampMicros(is_utc) => {
Schema::Type(crate::schema::Type {
r#type: TypeName::Primitive(PrimitiveType::Long),
attributes: Attributes {
logical_type: Some("timestamp-micros"),
additional: Default::default(),
},
})
}

Codec::Interval => {
Schema::Type(crate::schema::Type {
r#type: TypeName::Primitive(PrimitiveType::Bytes),
attributes: Attributes {
logical_type: Some("duration"),
additional: Default::default(),
},
})
}

Codec::Fixed(size) => {
// Convert Arrow FixedSizeBinary => Avro fixed with a known name & size
// TODO namespace/aliases.
Schema::Complex(ComplexType::Fixed(Fixed {
name,
namespace: None, // TODO namespace implementation
aliases: vec![], // TODO alias implementation
size: *size as usize,
attributes: Attributes::default(),
}))
}

Codec::List(item_type) => {
// Avro array with "items" recursively derived
let items_schema = item_type.to_avro_schema("items");
Schema::Complex(ComplexType::Array(Array {
items: Box::new(items_schema),
attributes: Attributes::default(),
}))
}

Codec::Struct(fields) => {
// Avro record with nested fields
let record_fields = fields
.iter()
.map(|f| {
// For each `AvroField`, get its Avro schema
let child_schema = f.data_type().to_avro_schema(f.name());
AvroFieldDef {
name: f.name(), // Avro field name
doc: None,
r#type: child_schema,
default: None,
}
})
.collect();

Schema::Complex(ComplexType::Record(Record {
name,
namespace: None, // TODO follow up for namespace implementation
doc: None,
aliases: vec![], // TODO follow up for alias implementation
fields: record_fields,
attributes: Attributes::default(),
}))
}
}
}
}

impl From<PrimitiveType> for Codec {
Expand Down Expand Up @@ -327,3 +489,67 @@ fn make_data_type<'a>(
}
}
}


/// Convert an Arrow `Field` into an `AvroField`.
pub(crate) fn arrow_field_to_avro_field(arrow_field: &Field) -> AvroField {
// TODO advanced metadata logic here
let codec = arrow_type_to_codec(arrow_field.data_type());
// Set nullability if the Arrow field is nullable
let nullability = if arrow_field.is_nullable() {
Some(Nullability::NullFirst)
} else {
None
};
let avro_data_type = AvroDataType {
nullability,
metadata: arrow_field.metadata().clone(),
codec,
};
AvroField {
name: arrow_field.name().clone(),
data_type: avro_data_type,
}
}

/// Maps an Arrow `DataType` to a `Codec`:
fn arrow_type_to_codec(dt: &DataType) -> Codec {
use arrow_schema::DataType::*;
match dt {
Null => Codec::Null,
Boolean => Codec::Boolean,
Int8 | Int16 | Int32 => Codec::Int32,
Int64 => Codec::Int64,
Float32 => Codec::Float32,
Float64 => Codec::Float64,
Utf8 => Codec::Utf8,
Binary | LargeBinary => Codec::Binary,
Date32 => Codec::Date32,
Time32(TimeUnit::Millisecond) => Codec::TimeMillis,
Time64(TimeUnit::Microsecond) => Codec::TimeMicros,
Timestamp(TimeUnit::Millisecond, _) => Codec::TimestampMillis(true),
Timestamp(TimeUnit::Microsecond, _) => Codec::TimestampMicros(true),
FixedSizeBinary(n) => Codec::Fixed(*n as i32),

List(field) => {
// Recursively create Codec for the child item
let child_codec = arrow_type_to_codec(field.data_type());
Codec::List(Arc::new(AvroDataType {
nullability: None,
metadata: Default::default(),
codec: child_codec,
}))
}
Struct(child_fields) => {
let avro_fields: Vec<AvroField> = child_fields
.iter()
.map(|fref| arrow_field_to_avro_field(fref.as_ref()))
.collect();
Codec::Struct(Arc::from(avro_fields))
}
_ => {
// TODO handle more arrow types (e.g. decimal, map, union, etc.)
Codec::Utf8
}
}
}
1 change: 1 addition & 0 deletions arrow-avro/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ mod schema;
mod compression;

mod codec;
mod writer;

#[cfg(test)]
mod test_util {
Expand Down
22 changes: 11 additions & 11 deletions arrow-avro/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ pub enum ComplexType<'a> {
pub struct Record<'a> {
#[serde(borrow)]
pub name: &'a str,
#[serde(borrow, default)]
#[serde(borrow, default, skip_serializing_if = "Option::is_none")]
pub namespace: Option<&'a str>,
#[serde(borrow, default)]
pub doc: Option<&'a str>,
Expand All @@ -144,7 +144,7 @@ pub struct Field<'a> {
pub doc: Option<&'a str>,
#[serde(borrow)]
pub r#type: Schema<'a>,
#[serde(borrow, default)]
#[serde(borrow, default, skip_serializing_if = "Option::is_none")]
pub default: Option<&'a str>,
}

Expand All @@ -155,15 +155,15 @@ pub struct Field<'a> {
pub struct Enum<'a> {
#[serde(borrow)]
pub name: &'a str,
#[serde(borrow, default)]
#[serde(borrow, default, skip_serializing_if = "Option::is_none")]
pub namespace: Option<&'a str>,
#[serde(borrow, default)]
pub doc: Option<&'a str>,
#[serde(borrow, default)]
pub aliases: Vec<&'a str>,
#[serde(borrow)]
pub symbols: Vec<&'a str>,
#[serde(borrow, default)]
#[serde(borrow, default, skip_serializing_if = "Option::is_none")]
pub default: Option<&'a str>,
#[serde(flatten)]
pub attributes: Attributes<'a>,
Expand Down Expand Up @@ -198,7 +198,7 @@ pub struct Map<'a> {
pub struct Fixed<'a> {
#[serde(borrow)]
pub name: &'a str,
#[serde(borrow, default)]
#[serde(borrow, default, skip_serializing_if = "Option::is_none")]
pub namespace: Option<&'a str>,
#[serde(borrow, default)]
pub aliases: Vec<&'a str>,
Expand Down Expand Up @@ -237,7 +237,7 @@ mod tests {
"logicalType":"timestamp-micros"
}"#,
)
.unwrap();
.unwrap();

let timestamp = Type {
r#type: TypeName::Primitive(PrimitiveType::Long),
Expand All @@ -260,7 +260,7 @@ mod tests {
"scale":2
}"#,
)
.unwrap();
.unwrap();

let decimal = ComplexType::Fixed(Fixed {
name: "fixed",
Expand Down Expand Up @@ -300,7 +300,7 @@ mod tests {
]
}"#,
)
.unwrap();
.unwrap();

assert_eq!(
schema,
Expand Down Expand Up @@ -333,7 +333,7 @@ mod tests {
]
}"#,
)
.unwrap();
.unwrap();

assert_eq!(
schema,
Expand Down Expand Up @@ -392,7 +392,7 @@ mod tests {
]
}"#,
)
.unwrap();
.unwrap();

assert_eq!(
schema,
Expand Down Expand Up @@ -453,7 +453,7 @@ mod tests {
]
}"#,
)
.unwrap();
.unwrap();

assert_eq!(
schema,
Expand Down
15 changes: 15 additions & 0 deletions arrow-avro/src/writer/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
mod schema;
mod vlq;

#[cfg(test)]
mod test {
use std::fs::File;
use std::io::BufWriter;
use arrow_array::RecordBatch;

fn write_file(file: &str, batch: &RecordBatch) {
let file = File::open(file).unwrap();
let mut writer = BufWriter::new(file);

}
}
Loading

0 comments on commit 7d4e6fd

Please sign in to comment.