Skip to content

Commit

Permalink
add boolean, date, timestamp & binary partition types (delta-io#1180)
Browse files Browse the repository at this point in the history
# Description
Adds boolean, date, timestamp & binary partition value types

# Related Issue(s)
closes delta-io#1170

---------

Signed-off-by: Marijn Valk <[email protected]>
Co-authored-by: Will Jones <[email protected]>
  • Loading branch information
2 people authored and chitralverma committed Mar 17, 2023
1 parent 938647d commit d543798
Showing 1 changed file with 121 additions and 7 deletions.
128 changes: 121 additions & 7 deletions rust/src/writer/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,14 @@ use std::sync::Arc;
use crate::writer::DeltaWriterError;
use crate::DeltaTableError;

use arrow::array::{as_primitive_array, Array};
use arrow::array::{
as_boolean_array, as_generic_binary_array, as_primitive_array, as_string_array, Array,
};
use arrow::datatypes::{
DataType, Int16Type, Int32Type, Int64Type, Int8Type, Schema as ArrowSchema,
SchemaRef as ArrowSchemaRef, UInt16Type, UInt32Type, UInt64Type, UInt8Type,
DataType, Date32Type, Date64Type, Int16Type, Int32Type, Int64Type, Int8Type,
Schema as ArrowSchema, SchemaRef as ArrowSchemaRef, TimeUnit, TimestampMicrosecondType,
TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType, UInt16Type, UInt32Type,
UInt64Type, UInt8Type,
};
use arrow::json::reader::{Decoder, DecoderOptions};
use arrow::record_batch::*;
Expand All @@ -20,6 +24,8 @@ use serde_json::Value;
use uuid::Uuid;

const NULL_PARTITION_VALUE_DATA_PATH: &str = "__HIVE_DEFAULT_PARTITION__";
const PARTITION_DATE_FORMAT: &str = "%Y-%m-%d";
const PARTITION_DATETIME_FORMAT: &str = "%Y-%m-%d %H:%M:%S";

#[derive(Debug, Clone, Hash, PartialEq, Eq)]
pub(crate) struct PartitionPath {
Expand Down Expand Up @@ -137,11 +143,52 @@ pub(crate) fn stringified_partition_value(
DataType::UInt16 => as_primitive_array::<UInt16Type>(arr).value(0).to_string(),
DataType::UInt32 => as_primitive_array::<UInt32Type>(arr).value(0).to_string(),
DataType::UInt64 => as_primitive_array::<UInt64Type>(arr).value(0).to_string(),
DataType::Utf8 => {
let data = arrow::array::as_string_array(arr);

data.value(0).to_string()
DataType::Utf8 => as_string_array(arr).value(0).to_string(),
DataType::Boolean => as_boolean_array(arr).value(0).to_string(),
DataType::Date32 => as_primitive_array::<Date32Type>(arr)
.value_as_date(0)
.unwrap()
.format(PARTITION_DATE_FORMAT)
.to_string(),
DataType::Date64 => as_primitive_array::<Date64Type>(arr)
.value_as_date(0)
.unwrap()
.format(PARTITION_DATE_FORMAT)
.to_string(),
DataType::Timestamp(TimeUnit::Second, _) => as_primitive_array::<TimestampSecondType>(arr)
.value_as_datetime(0)
.unwrap()
.format(PARTITION_DATETIME_FORMAT)
.to_string(),
DataType::Timestamp(TimeUnit::Millisecond, _) => {
as_primitive_array::<TimestampMillisecondType>(arr)
.value_as_datetime(0)
.unwrap()
.format(PARTITION_DATETIME_FORMAT)
.to_string()
}
DataType::Timestamp(TimeUnit::Microsecond, _) => {
as_primitive_array::<TimestampMicrosecondType>(arr)
.value_as_datetime(0)
.unwrap()
.format(PARTITION_DATETIME_FORMAT)
.to_string()
}
DataType::Timestamp(TimeUnit::Nanosecond, _) => {
as_primitive_array::<TimestampNanosecondType>(arr)
.value_as_datetime(0)
.unwrap()
.format(PARTITION_DATETIME_FORMAT)
.to_string()
}
DataType::Binary => as_generic_binary_array::<i32>(arr)
.value(0)
.escape_ascii()
.to_string(),
DataType::LargeBinary => as_generic_binary_array::<i64>(arr)
.value(0)
.escape_ascii()
.to_string(),
// TODO: handle more types
_ => {
unimplemented!("Unimplemented data type: {:?}", data_type);
Expand Down Expand Up @@ -235,3 +282,70 @@ impl Write for ShareableBuffer {
(*inner).flush()
}
}

#[cfg(test)]
mod tests {
use super::*;
use arrow::array::{
BinaryArray, BooleanArray, Date32Array, Date64Array, Int16Array, Int32Array, Int64Array,
Int8Array, LargeBinaryArray, StringArray, TimestampMicrosecondArray,
TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray, UInt16Array,
UInt32Array, UInt64Array, UInt8Array,
};

#[test]
fn test_stringified_partition_value() {
let reference_pairs: Vec<(Arc<dyn Array>, Option<&str>)> = vec![
(Arc::new(Int8Array::from(vec![None])), None),
(Arc::new(Int8Array::from(vec![1])), Some("1")),
(Arc::new(Int16Array::from(vec![1])), Some("1")),
(Arc::new(Int32Array::from(vec![1])), Some("1")),
(Arc::new(Int64Array::from(vec![1])), Some("1")),
(Arc::new(UInt8Array::from(vec![1])), Some("1")),
(Arc::new(UInt16Array::from(vec![1])), Some("1")),
(Arc::new(UInt32Array::from(vec![1])), Some("1")),
(Arc::new(UInt64Array::from(vec![1])), Some("1")),
(Arc::new(UInt8Array::from(vec![1])), Some("1")),
(Arc::new(StringArray::from(vec!["1"])), Some("1")),
(Arc::new(BooleanArray::from(vec![true])), Some("true")),
(Arc::new(BooleanArray::from(vec![false])), Some("false")),
(Arc::new(Date32Array::from(vec![1])), Some("1970-01-02")),
(
Arc::new(Date64Array::from(vec![86400000])),
Some("1970-01-02"),
),
(
Arc::new(TimestampSecondArray::from(vec![1])),
Some("1970-01-01 00:00:01"),
),
(
Arc::new(TimestampMillisecondArray::from(vec![1000])),
Some("1970-01-01 00:00:01"),
),
(
Arc::new(TimestampMicrosecondArray::from(vec![1000000])),
Some("1970-01-01 00:00:01"),
),
(
Arc::new(TimestampNanosecondArray::from(vec![1000000000])),
Some("1970-01-01 00:00:01"),
),
(Arc::new(BinaryArray::from_vec(vec![b"1"])), Some("1")),
(
Arc::new(BinaryArray::from_vec(vec![b"\x00\\"])),
Some("\\x00\\\\"),
),
(Arc::new(LargeBinaryArray::from_vec(vec![b"1"])), Some("1")),
(
Arc::new(LargeBinaryArray::from_vec(vec![b"\x00\\"])),
Some("\\x00\\\\"),
),
];
for (vals, result) in reference_pairs {
assert_eq!(
stringified_partition_value(&vals).unwrap().as_deref(),
result
)
}
}
}

0 comments on commit d543798

Please sign in to comment.