From 61c4f12e84330db243789fc98375512d67628e57 Mon Sep 17 00:00:00 2001 From: bold Date: Fri, 10 Mar 2023 12:30:54 +0100 Subject: [PATCH] Support timestamp/time and date json decoding (#3835) * Support timestamp/time and date json decoding * Don't support timezones for now Co-authored-by: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com> --------- Co-authored-by: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com> --- arrow-json/src/raw/mod.rs | 191 +++++++++++++++++++++++++++++++++++++- 1 file changed, 187 insertions(+), 4 deletions(-) diff --git a/arrow-json/src/raw/mod.rs b/arrow-json/src/raw/mod.rs index 5b699b1d51fb..1ab879d203fb 100644 --- a/arrow-json/src/raw/mod.rs +++ b/arrow-json/src/raw/mod.rs @@ -30,7 +30,7 @@ use crate::raw::tape::{Tape, TapeDecoder, TapeElement}; use arrow_array::types::*; use arrow_array::{downcast_integer, make_array, RecordBatch, RecordBatchReader}; use arrow_data::ArrayData; -use arrow_schema::{ArrowError, DataType, SchemaRef}; +use arrow_schema::{ArrowError, DataType, SchemaRef, TimeUnit}; use std::io::BufRead; mod boolean_array; @@ -293,6 +293,16 @@ fn make_decoder( data_type => (primitive_decoder, data_type), DataType::Float32 => primitive_decoder!(Float32Type, data_type), DataType::Float64 => primitive_decoder!(Float64Type, data_type), + DataType::Timestamp(TimeUnit::Second, None) => primitive_decoder!(TimestampSecondType, data_type), + DataType::Timestamp(TimeUnit::Millisecond, None) => primitive_decoder!(TimestampMillisecondType, data_type), + DataType::Timestamp(TimeUnit::Microsecond, None) => primitive_decoder!(TimestampMicrosecondType, data_type), + DataType::Timestamp(TimeUnit::Nanosecond, None) => primitive_decoder!(TimestampNanosecondType, data_type), + DataType::Date32 => primitive_decoder!(Date32Type, data_type), + DataType::Date64 => primitive_decoder!(Date64Type, data_type), + DataType::Time32(TimeUnit::Second) => primitive_decoder!(Time32SecondType, data_type), + DataType::Time32(TimeUnit::Millisecond) => primitive_decoder!(Time32MillisecondType, data_type), + DataType::Time64(TimeUnit::Microsecond) => primitive_decoder!(Time64MicrosecondType, data_type), + DataType::Time64(TimeUnit::Nanosecond) => primitive_decoder!(Time64NanosecondType, data_type), DataType::Decimal128(p, s) => Ok(Box::new(DecimalArrayDecoder::::new(p, s))), DataType::Decimal256(p, s) => Ok(Box::new(DecimalArrayDecoder::::new(p, s))), DataType::Boolean => Ok(Box::::default()), @@ -373,10 +383,10 @@ mod tests { #[test] fn test_basic() { let buf = r#" - {"a": 1, "b": 2, "c": true} - {"a": 2E0, "b": 4, "c": false} + {"a": 1, "b": 2, "c": true, "d": 1} + {"a": 2E0, "b": 4, "c": false, "d": 2, "e": 254} - {"b": 6, "a": 2.0} + {"b": 6, "a": 2.0, "d": 45} {"b": "5", "a": 2} {"b": 4e0} {"b": 7, "a": null} @@ -386,6 +396,8 @@ mod tests { Field::new("a", DataType::Int64, true), Field::new("b", DataType::Int32, true), Field::new("c", DataType::Boolean, true), + Field::new("d", DataType::Date32, true), + Field::new("e", DataType::Date64, true), ])); let batches = do_read(buf, 1024, false, schema); @@ -407,6 +419,18 @@ mod tests { assert!(!col3.is_null(0)); assert!(!col3.value(1)); assert!(!col3.is_null(1)); + + let col4 = as_primitive_array::(batches[0].column(3)); + assert_eq!(col4.null_count(), 3); + assert!(col4.is_null(3)); + assert_eq!(col4.values(), &[1, 2, 45, 0, 0, 0]); + + let col5 = as_primitive_array::(batches[0].column(4)); + assert_eq!(col5.null_count(), 5); + assert!(col5.is_null(0)); + assert!(col5.is_null(2)); + assert!(col5.is_null(3)); + assert_eq!(col5.values(), &[0, 254, 0, 0, 0, 0]); } #[test] @@ -782,4 +806,163 @@ mod tests { test_decimal::(DataType::Decimal128(10, 2)); test_decimal::(DataType::Decimal256(10, 2)); } + + fn test_timestamp() { + let buf = r#" + {"a": 1, "b": "2020-09-08T13:42:29.190855+00:00", "c": 38.30} + {"a": 2, "b": "2020-09-08T13:42:29.190855Z", "c": 123.456} + + {"b": 1337, "b": "2020-09-08T13:42:29Z", "c": "1997-01-31T09:26:56.123"} + {"b": 40, "c": "2020-09-08T13:42:29.190855+00:00"} + {"b": 1234, "a": null, "c": "1997-01-31 09:26:56.123Z"} + {"c": "1997-01-31T14:26:56.123-05:00"} + "#; + + let schema = Arc::new(Schema::new(vec![ + Field::new("a", T::DATA_TYPE, true), + Field::new("b", T::DATA_TYPE, true), + Field::new("c", T::DATA_TYPE, true), + ])); + + let batches = do_read(buf, 1024, true, schema); + assert_eq!(batches.len(), 1); + + let unit = match T::DATA_TYPE { + DataType::Timestamp(unit, _) => unit, + _ => unreachable!(), + }; + let unit_in_nanos = match unit { + TimeUnit::Second => 1_000_000_000, + TimeUnit::Millisecond => 1_000_000, + TimeUnit::Microsecond => 1_000, + TimeUnit::Nanosecond => 1, + }; + + let col1 = as_primitive_array::(batches[0].column(0)); + assert_eq!(col1.null_count(), 4); + assert!(col1.is_null(2)); + assert!(col1.is_null(3)); + assert!(col1.is_null(4)); + assert!(col1.is_null(5)); + assert_eq!(col1.values(), &[1, 2, 0, 0, 0, 0].map(T::Native::usize_as)); + + let col2 = as_primitive_array::(batches[0].column(1)); + assert_eq!(col2.null_count(), 1); + assert!(col2.is_null(5)); + assert_eq!( + col2.values(), + &[ + 1599572549190855000 / unit_in_nanos, + 1599572549190855000 / unit_in_nanos, + 1599572549000000000 / unit_in_nanos, + 40, + 1234, + 0 + ] + .map(T::Native::usize_as) + ); + + let col3 = as_primitive_array::(batches[0].column(2)); + assert_eq!(col3.null_count(), 0); + assert_eq!( + col3.values(), + &[ + 38, + 123, + 854702816123000000 / unit_in_nanos, + 1599572549190855000 / unit_in_nanos, + 854702816123000000 / unit_in_nanos, + 854738816123000000 / unit_in_nanos + ] + .map(T::Native::usize_as) + ); + } + + #[test] + fn test_timestamps() { + test_timestamp::(); + test_timestamp::(); + test_timestamp::(); + test_timestamp::(); + } + + fn test_time() { + let buf = r#" + {"a": 1, "b": "09:26:56.123 AM", "c": 38.30} + {"a": 2, "b": "23:59:59", "c": 123.456} + + {"b": 1337, "b": "6:00 pm", "c": "09:26:56.123"} + {"b": 40, "c": "13:42:29.190855"} + {"b": 1234, "a": null, "c": "09:26:56.123"} + {"c": "14:26:56.123"} + "#; + + let unit = match T::DATA_TYPE { + DataType::Time32(unit) | DataType::Time64(unit) => unit, + _ => unreachable!(), + }; + + let unit_in_nanos = match unit { + TimeUnit::Second => 1_000_000_000, + TimeUnit::Millisecond => 1_000_000, + TimeUnit::Microsecond => 1_000, + TimeUnit::Nanosecond => 1, + }; + + let schema = Arc::new(Schema::new(vec![ + Field::new("a", T::DATA_TYPE, true), + Field::new("b", T::DATA_TYPE, true), + Field::new("c", T::DATA_TYPE, true), + ])); + + let batches = do_read(buf, 1024, true, schema); + assert_eq!(batches.len(), 1); + + let col1 = as_primitive_array::(batches[0].column(0)); + assert_eq!(col1.null_count(), 4); + assert!(col1.is_null(2)); + assert!(col1.is_null(3)); + assert!(col1.is_null(4)); + assert!(col1.is_null(5)); + assert_eq!(col1.values(), &[1, 2, 0, 0, 0, 0].map(T::Native::usize_as)); + + let col2 = as_primitive_array::(batches[0].column(1)); + assert_eq!(col2.null_count(), 1); + assert!(col2.is_null(5)); + assert_eq!( + col2.values(), + &[ + 34016123000000 / unit_in_nanos, + 86399000000000 / unit_in_nanos, + 64800000000000 / unit_in_nanos, + 40, + 1234, + 0 + ] + .map(T::Native::usize_as) + ); + + let col3 = as_primitive_array::(batches[0].column(2)); + assert_eq!(col3.null_count(), 0); + assert_eq!( + col3.values(), + &[ + 38, + 123, + 34016123000000 / unit_in_nanos, + 49349190855000 / unit_in_nanos, + 34016123000000 / unit_in_nanos, + 52016123000000 / unit_in_nanos + ] + .map(T::Native::usize_as) + ); + } + + #[test] + fn test_times() { + test_time::(); + test_time::(); + test_time::(); + test_time::(); + } }