Skip to content

Commit

Permalink
Merge pull request #3 from ArroyoSystems/timestamp_formats
Browse files Browse the repository at this point in the history
Add configurable timestamp serialization to arrow-json writer
mwylde authored Apr 5, 2024
2 parents b6d7669 + 6f9ca64 commit 8b568ec
Showing 1 changed file with 253 additions and 15 deletions.
268 changes: 253 additions & 15 deletions arrow-json/src/writer.rs
Original file line number Diff line number Diff line change
@@ -102,7 +102,7 @@ use std::iter;
use std::{fmt::Debug, io::Write};

use serde_json::map::Map as JsonMap;
use serde_json::Value;
use serde_json::{Number, Value};

use crate::JsonSerializable;
use arrow_array::cast::*;
@@ -130,6 +130,7 @@ where
fn struct_array_to_jsonmap_array(
array: &StructArray,
explicit_nulls: bool,
timestamp_format: TimestampFormat,
) -> Result<Vec<Option<JsonMap<String, Value>>>, ArrowError> {
let inner_col_names = array.column_names();

@@ -151,6 +152,7 @@ fn struct_array_to_jsonmap_array(
field.metadata(),
inner_col_names[j],
explicit_nulls,
timestamp_format,
)?
}
Ok(inner_objs)
@@ -159,12 +161,13 @@ fn struct_array_to_jsonmap_array(
/// Converts an arrow [`Array`] into a `Vec` of Serde JSON [`serde_json::Value`]'s
pub fn array_to_json_array(array: &dyn Array) -> Result<Vec<Value>, ArrowError> {
// For backwards compatibility, default to skip nulls
array_to_json_array_internal(array, false)
array_to_json_array_internal(array, false, TimestampFormat::default())
}

fn array_to_json_array_internal(
array: &dyn Array,
explicit_nulls: bool,
timestamp_format: TimestampFormat,
) -> Result<Vec<Value>, ArrowError> {
match array.data_type() {
DataType::Null => Ok(iter::repeat(Value::Null).take(array.len()).collect()),
@@ -210,6 +213,7 @@ fn array_to_json_array_internal(
Some(v) => Ok(Value::Array(array_to_json_array_internal(
&v,
explicit_nulls,
timestamp_format,
)?)),
None => Ok(Value::Null),
})
@@ -220,6 +224,7 @@ fn array_to_json_array_internal(
Some(v) => Ok(Value::Array(array_to_json_array_internal(
&v,
explicit_nulls,
timestamp_format,
)?)),
None => Ok(Value::Null),
})
@@ -230,12 +235,14 @@ fn array_to_json_array_internal(
Some(v) => Ok(Value::Array(array_to_json_array_internal(
&v,
explicit_nulls,
timestamp_format,
)?)),
None => Ok(Value::Null),
})
.collect(),
DataType::Struct(_) => {
let jsonmaps = struct_array_to_jsonmap_array(array.as_struct(), explicit_nulls)?;
let jsonmaps =
struct_array_to_jsonmap_array(array.as_struct(), explicit_nulls, timestamp_format)?;
let json_values = jsonmaps
.into_iter()
.map(|maybe_map| maybe_map.map(Value::Object).unwrap_or(Value::Null))
@@ -248,6 +255,7 @@ fn array_to_json_array_internal(
Some(v) => Ok(Value::Array(array_to_json_array_internal(
&v,
explicit_nulls,
timestamp_format,
)?)),
None => Ok(Value::Null),
})
@@ -298,12 +306,92 @@ fn set_column_by_primitive_type<T>(
});
}

enum TimeFormatter<'a> {
Formatter(ArrayFormatter<'a>),
UnixMillisDate32(&'a Date32Array),
UnixMillisDate64(&'a Date64Array),
UnixMillisTimestampNanos(&'a TimestampNanosecondArray),
UnixMillisTimestampMicros(&'a TimestampMicrosecondArray),
UnixMillisTimestampMillis(&'a TimestampMillisecondArray),
UnixMillisTimestampSeconds(&'a TimestampSecondArray),
}

impl<'a> TimeFormatter<'a> {
fn for_timestamp_format(
timestamp_format: TimestampFormat,
array: &'a dyn Array,
) -> Result<Self, ArrowError> {
match timestamp_format {
TimestampFormat::RFC3339 => {
let options = FormatOptions::default();
let formatter = ArrayFormatter::try_new(array, &options)?;
Ok(Self::Formatter(formatter))
}
TimestampFormat::UnixMillis => Ok(match array.data_type() {
DataType::Timestamp(TimeUnit::Second, _) => Self::UnixMillisTimestampSeconds(
array
.as_any()
.downcast_ref::<TimestampSecondArray>()
.unwrap(),
),
DataType::Timestamp(TimeUnit::Millisecond, _) => Self::UnixMillisTimestampMillis(
array
.as_any()
.downcast_ref::<TimestampMillisecondArray>()
.unwrap(),
),
DataType::Timestamp(TimeUnit::Microsecond, _) => Self::UnixMillisTimestampMicros(
array
.as_any()
.downcast_ref::<TimestampMicrosecondArray>()
.unwrap(),
),
DataType::Timestamp(TimeUnit::Nanosecond, _) => Self::UnixMillisTimestampNanos(
array
.as_any()
.downcast_ref::<TimestampNanosecondArray>()
.unwrap(),
),
DataType::Date32 => {
Self::UnixMillisDate32(array.as_any().downcast_ref::<Date32Array>().unwrap())
}
DataType::Date64 => {
Self::UnixMillisDate64(array.as_any().downcast_ref::<Date64Array>().unwrap())
}
_ => {
unreachable!(
"cannot have time format field with type {}",
array.data_type()
)
}
}),
}
}

fn value(&self, idx: usize) -> Value {
let millis = match self {
TimeFormatter::Formatter(f) => {
return f.value(idx).to_string().into();
}
TimeFormatter::UnixMillisDate32(a) => (a.value(idx) as f64) * 86400.0 * 1000.0,
TimeFormatter::UnixMillisDate64(a) => a.value(idx) as f64,
TimeFormatter::UnixMillisTimestampNanos(a) => a.value(idx) as f64 / 1_000_000.0,
TimeFormatter::UnixMillisTimestampMicros(a) => a.value(idx) as f64 / 1_000.0,
TimeFormatter::UnixMillisTimestampMillis(a) => a.value(idx) as f64,
TimeFormatter::UnixMillisTimestampSeconds(a) => a.value(idx) as f64 * 1000.0,
};

Value::Number(Number::from_f64(millis).unwrap_or(0.into()))
}
}

fn set_column_for_json_rows(
rows: &mut [Option<JsonMap<String, Value>>],
array: &ArrayRef,
metadata: &HashMap<String, String>,
col_name: &str,
explicit_nulls: bool,
timestamp_format: TimestampFormat,
) -> Result<(), ArrowError> {
match array.data_type() {
DataType::Int8 => {
@@ -378,17 +466,17 @@ fn set_column_for_json_rows(
| DataType::Time32(_)
| DataType::Time64(_)
| DataType::Duration(_) => {
let options = FormatOptions::default();
let formatter = ArrayFormatter::try_new(array.as_ref(), &options)?;
let nulls = array.nulls();
let formatter = TimeFormatter::for_timestamp_format(timestamp_format, array.as_ref())?;

rows.iter_mut()
.enumerate()
.filter_map(|(idx, maybe_row)| maybe_row.as_mut().map(|row| (idx, row)))
.for_each(|(idx, row)| {
let maybe_value = nulls
.map(|x| x.is_valid(idx))
.unwrap_or(true)
.then(|| formatter.value(idx).to_string().into());
.then(|| formatter.value(idx));
if let Some(j) = maybe_value {
row.insert(col_name.to_string(), j);
} else if explicit_nulls {
@@ -397,7 +485,8 @@ fn set_column_for_json_rows(
});
}
DataType::Struct(_) => {
let inner_objs = struct_array_to_jsonmap_array(array.as_struct(), explicit_nulls)?;
let inner_objs =
struct_array_to_jsonmap_array(array.as_struct(), explicit_nulls, timestamp_format)?;
rows.iter_mut()
.zip(inner_objs)
.filter_map(|(maybe_row, maybe_obj)| maybe_row.as_mut().map(|row| (row, maybe_obj)))
@@ -419,7 +508,10 @@ fn set_column_for_json_rows(
})
.try_for_each(|(row, maybe_value)| -> Result<(), ArrowError> {
let maybe_value = maybe_value
.map(|v| array_to_json_array_internal(&v, explicit_nulls).map(Value::Array))
.map(|v| {
array_to_json_array_internal(&v, explicit_nulls, timestamp_format)
.map(Value::Array)
})
.transpose()?;
if let Some(j) = maybe_value {
row.insert(col_name.to_string(), j);
@@ -438,7 +530,10 @@ fn set_column_for_json_rows(
})
.try_for_each(|(row, maybe_value)| -> Result<(), ArrowError> {
let maybe_value = maybe_value
.map(|v| array_to_json_array_internal(&v, explicit_nulls).map(Value::Array))
.map(|v| {
array_to_json_array_internal(&v, explicit_nulls, timestamp_format)
.map(Value::Array)
})
.transpose()?;
if let Some(j) = maybe_value {
row.insert(col_name.to_string(), j);
@@ -451,7 +546,14 @@ fn set_column_for_json_rows(
DataType::Dictionary(_, value_type) => {
let hydrated = arrow_cast::cast::cast(&array, value_type)
.expect("cannot cast dictionary to underlying values");
set_column_for_json_rows(rows, &hydrated, &metadata, col_name, explicit_nulls)?;
set_column_for_json_rows(
rows,
&hydrated,
&metadata,
col_name,
explicit_nulls,
timestamp_format,
)?;
}
DataType::Map(_, _) => {
let maparr = as_map_array(array);
@@ -468,7 +570,7 @@ fn set_column_for_json_rows(
}

let keys = keys.as_string::<i32>();
let values = array_to_json_array_internal(values, explicit_nulls)?;
let values = array_to_json_array_internal(values, explicit_nulls, timestamp_format)?;

let mut kv = keys.iter().zip(values);

@@ -508,12 +610,15 @@ pub fn record_batches_to_json_rows(
batches: &[&RecordBatch],
) -> Result<Vec<JsonMap<String, Value>>, ArrowError> {
// For backwards compatibility, default to skip nulls
record_batches_to_json_rows_internal(batches, false)
record_batches_to_json_rows_opts(batches, false, TimestampFormat::default())
}

fn record_batches_to_json_rows_internal(
/// Converts an arrow [`RecordBatch`] into a `Vec` of Serde JSON
/// [`JsonMap`]s (objects) with options to control how data is written
pub fn record_batches_to_json_rows_opts(
batches: &[&RecordBatch],
explicit_nulls: bool,
timestamp_format: TimestampFormat,
) -> Result<Vec<JsonMap<String, Value>>, ArrowError> {
let mut rows: Vec<Option<JsonMap<String, Value>>> = iter::repeat(Some(JsonMap::new()))
.take(batches.iter().map(|b| b.num_rows()).sum())
@@ -538,6 +643,7 @@ fn record_batches_to_json_rows_internal(
field.metadata(),
col_name,
explicit_nulls,
timestamp_format,
)?
}
base += row_count;
@@ -629,12 +735,24 @@ pub type LineDelimitedWriter<W> = Writer<W, LineDelimited>;
/// A JSON writer which serializes [`RecordBatch`]es to JSON arrays.
pub type ArrayWriter<W> = Writer<W, JsonArray>;

/// Configuration for how timestamps are serialized
#[derive(Debug, Copy, Clone, Default)]
pub enum TimestampFormat {
/// Serialize as RFC3339
#[default]
RFC3339,
/// Serialize as milliseconds since the UNIX Epoch
UnixMillis,
}

/// JSON writer builder.
#[derive(Debug, Clone, Default)]
pub struct WriterBuilder {
/// Controls whether null values should be written explicitly for keys
/// in objects, or whether the key should be omitted entirely.
explicit_nulls: bool,
/// Sets the timestamp serialization format
timestamp_format: TimestampFormat,
}

impl WriterBuilder {
@@ -692,6 +810,12 @@ impl WriterBuilder {
self
}

/// Sets the timestamp format, controlling how timestamps are serialized in JSON
pub fn with_timestamp_format(mut self, timestamp_format: TimestampFormat) -> Self {
self.timestamp_format = timestamp_format;
self
}

/// Create a new `Writer` with specified `JsonFormat` and builder options.
pub fn build<W, F>(self, writer: W) -> Writer<W, F>
where
@@ -704,6 +828,7 @@ impl WriterBuilder {
finished: false,
format: F::default(),
explicit_nulls: self.explicit_nulls,
timestamp_format: self.timestamp_format,
}
}
}
@@ -738,6 +863,9 @@ where

/// Whether keys with null values should be written or skipped
explicit_nulls: bool,

// How timestamps should be serialized
timestamp_format: TimestampFormat,
}

impl<W, F> Writer<W, F>
@@ -753,6 +881,7 @@ where
finished: false,
format: F::default(),
explicit_nulls: false,
timestamp_format: TimestampFormat::default(),
}
}

@@ -774,15 +903,23 @@ where

/// Convert the `RecordBatch` into JSON rows, and write them to the output
pub fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
for row in record_batches_to_json_rows_internal(&[batch], self.explicit_nulls)? {
for row in record_batches_to_json_rows_opts(
&[batch],
self.explicit_nulls,
self.timestamp_format,
)? {
self.write_row(&Value::Object(row))?;
}
Ok(())
}

/// Convert the [`RecordBatch`] into JSON rows, and write them to the output
pub fn write_batches(&mut self, batches: &[&RecordBatch]) -> Result<(), ArrowError> {
for row in record_batches_to_json_rows_internal(batches, self.explicit_nulls)? {
for row in record_batches_to_json_rows_opts(
batches,
self.explicit_nulls,
self.timestamp_format,
)? {
self.write_row(&Value::Object(row))?;
}
Ok(())
@@ -1001,6 +1138,60 @@ mod tests {
);
}

#[test]
fn write_timestamps_as_millis() {
let ts_string = "2018-11-13T17:11:10.011375885995";
let ts_nanos = ts_string
.parse::<chrono::NaiveDateTime>()
.unwrap()
.timestamp_nanos_opt()
.unwrap();
let ts_micros = ts_nanos / 1000;
let ts_millis = ts_micros / 1000;
let ts_secs = ts_millis / 1000;

let arr_nanos = TimestampNanosecondArray::from(vec![Some(ts_nanos), None]);
let arr_micros = TimestampMicrosecondArray::from(vec![Some(ts_micros), None]);
let arr_millis = TimestampMillisecondArray::from(vec![Some(ts_millis), None]);
let arr_secs = TimestampSecondArray::from(vec![Some(ts_secs), None]);
let arr_names = StringArray::from(vec![Some("a"), Some("b")]);

let schema = Schema::new(vec![
Field::new("nanos", arr_nanos.data_type().clone(), true),
Field::new("micros", arr_micros.data_type().clone(), true),
Field::new("millis", arr_millis.data_type().clone(), true),
Field::new("secs", arr_secs.data_type().clone(), true),
Field::new("name", arr_names.data_type().clone(), true),
]);
let schema = Arc::new(schema);

let batch = RecordBatch::try_new(
schema,
vec![
Arc::new(arr_nanos),
Arc::new(arr_micros),
Arc::new(arr_millis),
Arc::new(arr_secs),
Arc::new(arr_names),
],
)
.unwrap();

let mut buf = Vec::new();
{
let mut writer = LineDelimitedWriter::new(&mut buf);
writer.timestamp_format = TimestampFormat::UnixMillis;
writer.write_batches(&[&batch]).unwrap();
}

assert_json_eq(
&buf,
r#"{"micros":1542129070011.375,"millis":1542129070011.0,"name":"a","nanos":1542129070011.376,"secs":1542129070000.0}
{"name":"b"}
"#,
);
}

#[test]
fn write_timestamps_with_tz() {
let ts_string = "2018-11-13T17:11:10.011375885995";
@@ -1107,6 +1298,53 @@ mod tests {
);
}

#[test]
fn write_dates_as_millis() {
let ts_string = "2018-11-13T17:11:10.011375885995";
let ts_millis = ts_string
.parse::<chrono::NaiveDateTime>()
.unwrap()
.timestamp_millis();

let arr_date32 = Date32Array::from(vec![
Some(i32::try_from(ts_millis / 1000 / (60 * 60 * 24)).unwrap()),
None,
]);
let arr_date64 = Date64Array::from(vec![Some(ts_millis), None]);
let arr_names = StringArray::from(vec![Some("a"), Some("b")]);

let schema = Schema::new(vec![
Field::new("date32", arr_date32.data_type().clone(), true),
Field::new("date64", arr_date64.data_type().clone(), true),
Field::new("name", arr_names.data_type().clone(), false),
]);
let schema = Arc::new(schema);

let batch = RecordBatch::try_new(
schema,
vec![
Arc::new(arr_date32),
Arc::new(arr_date64),
Arc::new(arr_names),
],
)
.unwrap();

let mut buf = Vec::new();
{
let mut writer = LineDelimitedWriter::new(&mut buf);
writer.timestamp_format = TimestampFormat::UnixMillis;
writer.write_batches(&[&batch]).unwrap();
}

assert_json_eq(
&buf,
r#"{"date32":1542067200000.0,"date64":1542129070011.0,"name":"a"}
{"name":"b"}
"#,
);
}

#[test]
fn write_times() {
let arr_time32sec = Time32SecondArray::from(vec![Some(120), None]);

0 comments on commit 8b568ec

Please sign in to comment.