Skip to content

Commit

Permalink
Support timestamp/time and date json decoding (#3835)
Browse files Browse the repository at this point in the history
* Support timestamp/time and date json decoding

* Don't support timezones for now

Co-authored-by: Raphael Taylor-Davies <[email protected]>

---------

Co-authored-by: Raphael Taylor-Davies <[email protected]>
  • Loading branch information
spebern and tustvold authored Mar 10, 2023
1 parent 495682a commit 61c4f12
Showing 1 changed file with 187 additions and 4 deletions.
191 changes: 187 additions & 4 deletions arrow-json/src/raw/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use crate::raw::tape::{Tape, TapeDecoder, TapeElement};
use arrow_array::types::*;
use arrow_array::{downcast_integer, make_array, RecordBatch, RecordBatchReader};
use arrow_data::ArrayData;
use arrow_schema::{ArrowError, DataType, SchemaRef};
use arrow_schema::{ArrowError, DataType, SchemaRef, TimeUnit};
use std::io::BufRead;

mod boolean_array;
Expand Down Expand Up @@ -293,6 +293,16 @@ fn make_decoder(
data_type => (primitive_decoder, data_type),
DataType::Float32 => primitive_decoder!(Float32Type, data_type),
DataType::Float64 => primitive_decoder!(Float64Type, data_type),
DataType::Timestamp(TimeUnit::Second, None) => primitive_decoder!(TimestampSecondType, data_type),
DataType::Timestamp(TimeUnit::Millisecond, None) => primitive_decoder!(TimestampMillisecondType, data_type),
DataType::Timestamp(TimeUnit::Microsecond, None) => primitive_decoder!(TimestampMicrosecondType, data_type),
DataType::Timestamp(TimeUnit::Nanosecond, None) => primitive_decoder!(TimestampNanosecondType, data_type),
DataType::Date32 => primitive_decoder!(Date32Type, data_type),
DataType::Date64 => primitive_decoder!(Date64Type, data_type),
DataType::Time32(TimeUnit::Second) => primitive_decoder!(Time32SecondType, data_type),
DataType::Time32(TimeUnit::Millisecond) => primitive_decoder!(Time32MillisecondType, data_type),
DataType::Time64(TimeUnit::Microsecond) => primitive_decoder!(Time64MicrosecondType, data_type),
DataType::Time64(TimeUnit::Nanosecond) => primitive_decoder!(Time64NanosecondType, data_type),
DataType::Decimal128(p, s) => Ok(Box::new(DecimalArrayDecoder::<Decimal128Type>::new(p, s))),
DataType::Decimal256(p, s) => Ok(Box::new(DecimalArrayDecoder::<Decimal256Type>::new(p, s))),
DataType::Boolean => Ok(Box::<BooleanArrayDecoder>::default()),
Expand Down Expand Up @@ -373,10 +383,10 @@ mod tests {
#[test]
fn test_basic() {
let buf = r#"
{"a": 1, "b": 2, "c": true}
{"a": 2E0, "b": 4, "c": false}
{"a": 1, "b": 2, "c": true, "d": 1}
{"a": 2E0, "b": 4, "c": false, "d": 2, "e": 254}
{"b": 6, "a": 2.0}
{"b": 6, "a": 2.0, "d": 45}
{"b": "5", "a": 2}
{"b": 4e0}
{"b": 7, "a": null}
Expand All @@ -386,6 +396,8 @@ mod tests {
Field::new("a", DataType::Int64, true),
Field::new("b", DataType::Int32, true),
Field::new("c", DataType::Boolean, true),
Field::new("d", DataType::Date32, true),
Field::new("e", DataType::Date64, true),
]));

let batches = do_read(buf, 1024, false, schema);
Expand All @@ -407,6 +419,18 @@ mod tests {
assert!(!col3.is_null(0));
assert!(!col3.value(1));
assert!(!col3.is_null(1));

let col4 = as_primitive_array::<Date32Type>(batches[0].column(3));
assert_eq!(col4.null_count(), 3);
assert!(col4.is_null(3));
assert_eq!(col4.values(), &[1, 2, 45, 0, 0, 0]);

let col5 = as_primitive_array::<Date64Type>(batches[0].column(4));
assert_eq!(col5.null_count(), 5);
assert!(col5.is_null(0));
assert!(col5.is_null(2));
assert!(col5.is_null(3));
assert_eq!(col5.values(), &[0, 254, 0, 0, 0, 0]);
}

#[test]
Expand Down Expand Up @@ -782,4 +806,163 @@ mod tests {
test_decimal::<Decimal128Type>(DataType::Decimal128(10, 2));
test_decimal::<Decimal256Type>(DataType::Decimal256(10, 2));
}

fn test_timestamp<T: ArrowTimestampType>() {
let buf = r#"
{"a": 1, "b": "2020-09-08T13:42:29.190855+00:00", "c": 38.30}
{"a": 2, "b": "2020-09-08T13:42:29.190855Z", "c": 123.456}
{"b": 1337, "b": "2020-09-08T13:42:29Z", "c": "1997-01-31T09:26:56.123"}
{"b": 40, "c": "2020-09-08T13:42:29.190855+00:00"}
{"b": 1234, "a": null, "c": "1997-01-31 09:26:56.123Z"}
{"c": "1997-01-31T14:26:56.123-05:00"}
"#;

let schema = Arc::new(Schema::new(vec![
Field::new("a", T::DATA_TYPE, true),
Field::new("b", T::DATA_TYPE, true),
Field::new("c", T::DATA_TYPE, true),
]));

let batches = do_read(buf, 1024, true, schema);
assert_eq!(batches.len(), 1);

let unit = match T::DATA_TYPE {
DataType::Timestamp(unit, _) => unit,
_ => unreachable!(),
};
let unit_in_nanos = match unit {
TimeUnit::Second => 1_000_000_000,
TimeUnit::Millisecond => 1_000_000,
TimeUnit::Microsecond => 1_000,
TimeUnit::Nanosecond => 1,
};

let col1 = as_primitive_array::<T>(batches[0].column(0));
assert_eq!(col1.null_count(), 4);
assert!(col1.is_null(2));
assert!(col1.is_null(3));
assert!(col1.is_null(4));
assert!(col1.is_null(5));
assert_eq!(col1.values(), &[1, 2, 0, 0, 0, 0].map(T::Native::usize_as));

let col2 = as_primitive_array::<T>(batches[0].column(1));
assert_eq!(col2.null_count(), 1);
assert!(col2.is_null(5));
assert_eq!(
col2.values(),
&[
1599572549190855000 / unit_in_nanos,
1599572549190855000 / unit_in_nanos,
1599572549000000000 / unit_in_nanos,
40,
1234,
0
]
.map(T::Native::usize_as)
);

let col3 = as_primitive_array::<T>(batches[0].column(2));
assert_eq!(col3.null_count(), 0);
assert_eq!(
col3.values(),
&[
38,
123,
854702816123000000 / unit_in_nanos,
1599572549190855000 / unit_in_nanos,
854702816123000000 / unit_in_nanos,
854738816123000000 / unit_in_nanos
]
.map(T::Native::usize_as)
);
}

#[test]
fn test_timestamps() {
test_timestamp::<TimestampSecondType>();
test_timestamp::<TimestampMillisecondType>();
test_timestamp::<TimestampMicrosecondType>();
test_timestamp::<TimestampNanosecondType>();
}

fn test_time<T: ArrowTemporalType>() {
let buf = r#"
{"a": 1, "b": "09:26:56.123 AM", "c": 38.30}
{"a": 2, "b": "23:59:59", "c": 123.456}
{"b": 1337, "b": "6:00 pm", "c": "09:26:56.123"}
{"b": 40, "c": "13:42:29.190855"}
{"b": 1234, "a": null, "c": "09:26:56.123"}
{"c": "14:26:56.123"}
"#;

let unit = match T::DATA_TYPE {
DataType::Time32(unit) | DataType::Time64(unit) => unit,
_ => unreachable!(),
};

let unit_in_nanos = match unit {
TimeUnit::Second => 1_000_000_000,
TimeUnit::Millisecond => 1_000_000,
TimeUnit::Microsecond => 1_000,
TimeUnit::Nanosecond => 1,
};

let schema = Arc::new(Schema::new(vec![
Field::new("a", T::DATA_TYPE, true),
Field::new("b", T::DATA_TYPE, true),
Field::new("c", T::DATA_TYPE, true),
]));

let batches = do_read(buf, 1024, true, schema);
assert_eq!(batches.len(), 1);

let col1 = as_primitive_array::<T>(batches[0].column(0));
assert_eq!(col1.null_count(), 4);
assert!(col1.is_null(2));
assert!(col1.is_null(3));
assert!(col1.is_null(4));
assert!(col1.is_null(5));
assert_eq!(col1.values(), &[1, 2, 0, 0, 0, 0].map(T::Native::usize_as));

let col2 = as_primitive_array::<T>(batches[0].column(1));
assert_eq!(col2.null_count(), 1);
assert!(col2.is_null(5));
assert_eq!(
col2.values(),
&[
34016123000000 / unit_in_nanos,
86399000000000 / unit_in_nanos,
64800000000000 / unit_in_nanos,
40,
1234,
0
]
.map(T::Native::usize_as)
);

let col3 = as_primitive_array::<T>(batches[0].column(2));
assert_eq!(col3.null_count(), 0);
assert_eq!(
col3.values(),
&[
38,
123,
34016123000000 / unit_in_nanos,
49349190855000 / unit_in_nanos,
34016123000000 / unit_in_nanos,
52016123000000 / unit_in_nanos
]
.map(T::Native::usize_as)
);
}

#[test]
fn test_times() {
test_time::<Time32MillisecondType>();
test_time::<Time32SecondType>();
test_time::<Time64MicrosecondType>();
test_time::<Time64NanosecondType>();
}
}

0 comments on commit 61c4f12

Please sign in to comment.