Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove deprecated JSON writer #5651

Merged
merged 1 commit into from
Apr 24, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Remove deprecated JSON writer
tustvold committed Apr 16, 2024

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
commit d0d0a6c3fec2b2ee227aa9a7f16c09a38c089e7b
12 changes: 0 additions & 12 deletions arrow-json/src/lib.rs
Original file line number Diff line number Diff line change
@@ -69,18 +69,6 @@
pub mod reader;
pub mod writer;

#[doc(hidden)]
#[deprecated(note = "Use Decoder")]
pub type RawDecoder = reader::Decoder;

#[doc(hidden)]
#[deprecated(note = "Use Reader")]
pub type RawReader<R> = Reader<R>;

#[doc(hidden)]
#[deprecated(note = "Use ReaderBuilder")]
pub type RawReaderBuilder = ReaderBuilder;

pub use self::reader::{Reader, ReaderBuilder};
pub use self::writer::{ArrayWriter, LineDelimitedWriter, Writer, WriterBuilder};
use half::f16;
582 changes: 3 additions & 579 deletions arrow-json/src/writer.rs
Original file line number Diff line number Diff line change
@@ -106,427 +106,14 @@
//! ```
mod encoder;

use std::iter;
use std::{fmt::Debug, io::Write};

use serde_json::map::Map as JsonMap;
use serde_json::Value;

use crate::JsonSerializable;
use arrow_array::cast::*;
use arrow_array::types::*;
use arrow_array::*;
use arrow_schema::*;

use crate::writer::encoder::EncoderOptions;
use arrow_cast::display::{ArrayFormatter, FormatOptions};
use encoder::make_encoder;

fn primitive_array_to_json<T>(array: &dyn Array) -> Result<Vec<Value>, ArrowError>
where
T: ArrowPrimitiveType,
T::Native: JsonSerializable,
{
Ok(array
.as_primitive::<T>()
.iter()
.map(|maybe_value| match maybe_value {
Some(v) => v.into_json_value().unwrap_or(Value::Null),
None => Value::Null,
})
.collect())
}

fn struct_array_to_jsonmap_array(
array: &StructArray,
explicit_nulls: bool,
) -> Result<Vec<Option<JsonMap<String, Value>>>, ArrowError> {
let inner_col_names = array.column_names();

let mut inner_objs = (0..array.len())
// Ensure we write nulls for struct arrays as nulls in JSON
// Instead of writing a struct with nulls
.map(|index| array.is_valid(index).then(JsonMap::new))
.collect::<Vec<Option<JsonMap<String, Value>>>>();

for (j, struct_col) in array.columns().iter().enumerate() {
set_column_for_json_rows(
&mut inner_objs,
struct_col,
inner_col_names[j],
explicit_nulls,
)?
}
Ok(inner_objs)
}

/// Converts an arrow [`Array`] into a `Vec` of Serde JSON [`serde_json::Value`]'s
#[deprecated(note = "Use Writer")]
pub fn array_to_json_array(array: &dyn Array) -> Result<Vec<Value>, ArrowError> {
// For backwards compatibility, default to skip nulls
array_to_json_array_internal(array, false)
}

fn array_to_json_array_internal(
array: &dyn Array,
explicit_nulls: bool,
) -> Result<Vec<Value>, ArrowError> {
match array.data_type() {
DataType::Null => Ok(iter::repeat(Value::Null).take(array.len()).collect()),
DataType::Boolean => Ok(array
.as_boolean()
.iter()
.map(|maybe_value| match maybe_value {
Some(v) => v.into(),
None => Value::Null,
})
.collect()),

DataType::Utf8 => Ok(array
.as_string::<i32>()
.iter()
.map(|maybe_value| match maybe_value {
Some(v) => v.into(),
None => Value::Null,
})
.collect()),
DataType::LargeUtf8 => Ok(array
.as_string::<i64>()
.iter()
.map(|maybe_value| match maybe_value {
Some(v) => v.into(),
None => Value::Null,
})
.collect()),
DataType::Int8 => primitive_array_to_json::<Int8Type>(array),
DataType::Int16 => primitive_array_to_json::<Int16Type>(array),
DataType::Int32 => primitive_array_to_json::<Int32Type>(array),
DataType::Int64 => primitive_array_to_json::<Int64Type>(array),
DataType::UInt8 => primitive_array_to_json::<UInt8Type>(array),
DataType::UInt16 => primitive_array_to_json::<UInt16Type>(array),
DataType::UInt32 => primitive_array_to_json::<UInt32Type>(array),
DataType::UInt64 => primitive_array_to_json::<UInt64Type>(array),
DataType::Float16 => primitive_array_to_json::<Float16Type>(array),
DataType::Float32 => primitive_array_to_json::<Float32Type>(array),
DataType::Float64 => primitive_array_to_json::<Float64Type>(array),
DataType::List(_) => as_list_array(array)
.iter()
.map(|maybe_value| match maybe_value {
Some(v) => Ok(Value::Array(array_to_json_array_internal(
&v,
explicit_nulls,
)?)),
None => Ok(Value::Null),
})
.collect(),
DataType::LargeList(_) => as_large_list_array(array)
.iter()
.map(|maybe_value| match maybe_value {
Some(v) => Ok(Value::Array(array_to_json_array_internal(
&v,
explicit_nulls,
)?)),
None => Ok(Value::Null),
})
.collect(),
DataType::FixedSizeList(_, _) => as_fixed_size_list_array(array)
.iter()
.map(|maybe_value| match maybe_value {
Some(v) => Ok(Value::Array(array_to_json_array_internal(
&v,
explicit_nulls,
)?)),
None => Ok(Value::Null),
})
.collect(),
DataType::Struct(_) => {
let jsonmaps = struct_array_to_jsonmap_array(array.as_struct(), explicit_nulls)?;
let json_values = jsonmaps
.into_iter()
.map(|maybe_map| maybe_map.map(Value::Object).unwrap_or(Value::Null))
.collect();
Ok(json_values)
}
DataType::Map(_, _) => as_map_array(array)
.iter()
.map(|maybe_value| match maybe_value {
Some(v) => Ok(Value::Array(array_to_json_array_internal(
&v,
explicit_nulls,
)?)),
None => Ok(Value::Null),
})
.collect(),
t => Err(ArrowError::JsonError(format!(
"data type {t:?} not supported"
))),
}
}

macro_rules! set_column_by_array_type {
($cast_fn:ident, $col_name:ident, $rows:ident, $array:ident, $explicit_nulls:ident) => {
let arr = $cast_fn($array);
$rows
.iter_mut()
.zip(arr.iter())
.filter_map(|(maybe_row, maybe_value)| maybe_row.as_mut().map(|row| (row, maybe_value)))
.for_each(|(row, maybe_value)| {
if let Some(j) = maybe_value.map(Into::into) {
row.insert($col_name.to_string(), j);
} else if $explicit_nulls {
row.insert($col_name.to_string(), Value::Null);
}
});
};
}

fn set_column_by_primitive_type<T>(
rows: &mut [Option<JsonMap<String, Value>>],
array: &ArrayRef,
col_name: &str,
explicit_nulls: bool,
) where
T: ArrowPrimitiveType,
T::Native: JsonSerializable,
{
let primitive_arr = array.as_primitive::<T>();

rows.iter_mut()
.zip(primitive_arr.iter())
.filter_map(|(maybe_row, maybe_value)| maybe_row.as_mut().map(|row| (row, maybe_value)))
.for_each(|(row, maybe_value)| {
if let Some(j) = maybe_value.and_then(|v| v.into_json_value()) {
row.insert(col_name.to_string(), j);
} else if explicit_nulls {
row.insert(col_name.to_string(), Value::Null);
}
});
}

fn set_column_for_json_rows(
rows: &mut [Option<JsonMap<String, Value>>],
array: &ArrayRef,
col_name: &str,
explicit_nulls: bool,
) -> Result<(), ArrowError> {
match array.data_type() {
DataType::Int8 => {
set_column_by_primitive_type::<Int8Type>(rows, array, col_name, explicit_nulls);
}
DataType::Int16 => {
set_column_by_primitive_type::<Int16Type>(rows, array, col_name, explicit_nulls);
}
DataType::Int32 => {
set_column_by_primitive_type::<Int32Type>(rows, array, col_name, explicit_nulls);
}
DataType::Int64 => {
set_column_by_primitive_type::<Int64Type>(rows, array, col_name, explicit_nulls);
}
DataType::UInt8 => {
set_column_by_primitive_type::<UInt8Type>(rows, array, col_name, explicit_nulls);
}
DataType::UInt16 => {
set_column_by_primitive_type::<UInt16Type>(rows, array, col_name, explicit_nulls);
}
DataType::UInt32 => {
set_column_by_primitive_type::<UInt32Type>(rows, array, col_name, explicit_nulls);
}
DataType::UInt64 => {
set_column_by_primitive_type::<UInt64Type>(rows, array, col_name, explicit_nulls);
}
DataType::Float16 => {
set_column_by_primitive_type::<Float16Type>(rows, array, col_name, explicit_nulls);
}
DataType::Float32 => {
set_column_by_primitive_type::<Float32Type>(rows, array, col_name, explicit_nulls);
}
DataType::Float64 => {
set_column_by_primitive_type::<Float64Type>(rows, array, col_name, explicit_nulls);
}
DataType::Null => {
if explicit_nulls {
rows.iter_mut()
.filter_map(|maybe_row| maybe_row.as_mut())
.for_each(|row| {
row.insert(col_name.to_string(), Value::Null);
});
}
}
DataType::Boolean => {
set_column_by_array_type!(as_boolean_array, col_name, rows, array, explicit_nulls);
}
DataType::Utf8 => {
set_column_by_array_type!(as_string_array, col_name, rows, array, explicit_nulls);
}
DataType::LargeUtf8 => {
set_column_by_array_type!(as_largestring_array, col_name, rows, array, explicit_nulls);
}
DataType::Date32
| DataType::Date64
| DataType::Timestamp(_, _)
| DataType::Time32(_)
| DataType::Time64(_)
| DataType::Duration(_) => {
let options = FormatOptions::default();
let formatter = ArrayFormatter::try_new(array.as_ref(), &options)?;
let nulls = array.nulls();
rows.iter_mut()
.enumerate()
.filter_map(|(idx, maybe_row)| maybe_row.as_mut().map(|row| (idx, row)))
.for_each(|(idx, row)| {
let maybe_value = nulls
.map(|x| x.is_valid(idx))
.unwrap_or(true)
.then(|| formatter.value(idx).to_string().into());
if let Some(j) = maybe_value {
row.insert(col_name.to_string(), j);
} else if explicit_nulls {
row.insert(col_name.to_string(), Value::Null);
}
});
}
DataType::Struct(_) => {
let inner_objs = struct_array_to_jsonmap_array(array.as_struct(), explicit_nulls)?;
rows.iter_mut()
.zip(inner_objs)
.filter_map(|(maybe_row, maybe_obj)| maybe_row.as_mut().map(|row| (row, maybe_obj)))
.for_each(|(row, maybe_obj)| {
let json = if let Some(obj) = maybe_obj {
Value::Object(obj)
} else {
Value::Null
};
row.insert(col_name.to_string(), json);
});
}
DataType::List(_) => {
let listarr = as_list_array(array);
rows.iter_mut()
.zip(listarr.iter())
.filter_map(|(maybe_row, maybe_value)| {
maybe_row.as_mut().map(|row| (row, maybe_value))
})
.try_for_each(|(row, maybe_value)| -> Result<(), ArrowError> {
let maybe_value = maybe_value
.map(|v| array_to_json_array_internal(&v, explicit_nulls).map(Value::Array))
.transpose()?;
if let Some(j) = maybe_value {
row.insert(col_name.to_string(), j);
} else if explicit_nulls {
row.insert(col_name.to_string(), Value::Null);
}
Ok(())
})?;
}
DataType::LargeList(_) => {
let listarr = as_large_list_array(array);
rows.iter_mut()
.zip(listarr.iter())
.filter_map(|(maybe_row, maybe_value)| {
maybe_row.as_mut().map(|row| (row, maybe_value))
})
.try_for_each(|(row, maybe_value)| -> Result<(), ArrowError> {
let maybe_value = maybe_value
.map(|v| array_to_json_array_internal(&v, explicit_nulls).map(Value::Array))
.transpose()?;
if let Some(j) = maybe_value {
row.insert(col_name.to_string(), j);
} else if explicit_nulls {
row.insert(col_name.to_string(), Value::Null);
}
Ok(())
})?;
}
DataType::Dictionary(_, value_type) => {
let hydrated = arrow_cast::cast::cast(&array, value_type)
.expect("cannot cast dictionary to underlying values");
set_column_for_json_rows(rows, &hydrated, col_name, explicit_nulls)?;
}
DataType::Map(_, _) => {
let maparr = as_map_array(array);

let keys = maparr.keys();
let values = maparr.values();

// Keys have to be strings to convert to json.
if !matches!(keys.data_type(), DataType::Utf8) {
return Err(ArrowError::JsonError(format!(
"data type {:?} not supported in nested map for json writer",
keys.data_type()
)));
}

let keys = keys.as_string::<i32>();
let values = array_to_json_array_internal(values, explicit_nulls)?;

let mut kv = keys.iter().zip(values);

for (i, row) in rows
.iter_mut()
.enumerate()
.filter_map(|(i, maybe_row)| maybe_row.as_mut().map(|row| (i, row)))
{
if maparr.is_null(i) {
row.insert(col_name.to_string(), serde_json::Value::Null);
continue;
}

let len = maparr.value_length(i) as usize;
let mut obj = serde_json::Map::new();

for (_, (k, v)) in (0..len).zip(&mut kv) {
obj.insert(k.expect("keys in a map should be non-null").to_string(), v);
}

row.insert(col_name.to_string(), serde_json::Value::Object(obj));
}
}
_ => {
return Err(ArrowError::JsonError(format!(
"data type {:?} not supported in nested map for json writer",
array.data_type()
)))
}
}
Ok(())
}

/// Converts an arrow [`RecordBatch`] into a `Vec` of Serde JSON
/// [`JsonMap`]s (objects)
#[deprecated(note = "Use Writer")]
pub fn record_batches_to_json_rows(
batches: &[&RecordBatch],
) -> Result<Vec<JsonMap<String, Value>>, ArrowError> {
// For backwards compatibility, default to skip nulls
record_batches_to_json_rows_internal(batches, false)
}

fn record_batches_to_json_rows_internal(
batches: &[&RecordBatch],
explicit_nulls: bool,
) -> Result<Vec<JsonMap<String, Value>>, ArrowError> {
let mut rows: Vec<Option<JsonMap<String, Value>>> = iter::repeat(Some(JsonMap::new()))
.take(batches.iter().map(|b| b.num_rows()).sum())
.collect();

if !rows.is_empty() {
let schema = batches[0].schema();
let mut base = 0;
for batch in batches {
let row_count = batch.num_rows();
let row_slice = &mut rows[base..base + batch.num_rows()];
for (j, col) in batch.columns().iter().enumerate() {
let col_name = schema.field(j).name();
set_column_for_json_rows(row_slice, col, col_name, explicit_nulls)?
}
base += row_count;
}
}

let rows = rows.into_iter().map(|a| a.unwrap()).collect::<Vec<_>>();
Ok(rows)
}

/// This trait defines how to format a sequence of JSON objects to a
/// byte stream.
pub trait JsonFormat: Debug + Default {
@@ -731,23 +318,6 @@ where
}
}

/// Write a single JSON row to the output writer
#[deprecated(note = "Use Writer::write")]
pub fn write_row(&mut self, row: &Value) -> Result<(), ArrowError> {
let is_first_row = !self.started;
if !self.started {
self.format.start_stream(&mut self.writer)?;
self.started = true;
}

self.format.start_row(&mut self.writer, is_first_row)?;
self.writer.write_all(
&serde_json::to_vec(row).map_err(|error| ArrowError::JsonError(error.to_string()))?,
)?;
self.format.end_row(&mut self.writer)?;
Ok(())
}

/// Serialize `batch` to JSON output
pub fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
if batch.num_rows() == 0 {
@@ -831,12 +401,10 @@ mod tests {
use std::io::{BufReader, Seek};
use std::sync::Arc;

use serde_json::json;
use serde_json::{json, Value};

use arrow_array::builder::{
FixedSizeBinaryBuilder, FixedSizeListBuilder, Int32Builder, Int64Builder, MapBuilder,
StringBuilder,
};
use arrow_array::builder::*;
use arrow_array::types::*;
use arrow_buffer::{Buffer, NullBuffer, OffsetBuffer, ToByteSlice};
use arrow_data::ArrayData;

@@ -1585,78 +1153,6 @@ mod tests {
assert_eq!(String::from_utf8(writer.into_inner()).unwrap(), "");
}

#[test]
#[allow(deprecated)]
fn json_writer_one_row() {
let mut writer = ArrayWriter::new(vec![] as Vec<u8>);
let v = json!({ "an": "object" });
writer.write_row(&v).unwrap();
writer.finish().unwrap();
assert_eq!(
String::from_utf8(writer.into_inner()).unwrap(),
r#"[{"an":"object"}]"#
);
}

#[test]
#[allow(deprecated)]
fn json_writer_two_rows() {
let mut writer = ArrayWriter::new(vec![] as Vec<u8>);
let v = json!({ "an": "object" });
writer.write_row(&v).unwrap();
let v = json!({ "another": "object" });
writer.write_row(&v).unwrap();
writer.finish().unwrap();
assert_eq!(
String::from_utf8(writer.into_inner()).unwrap(),
r#"[{"an":"object"},{"another":"object"}]"#
);
}

#[test]
#[allow(deprecated)]
fn json_list_roundtrip() {
let json_content = r#"
{"list": [{"ints": 1}]}
{"list": [{}]}
{"list": []}
{"list": null}
{"list": [{"ints": null}]}
{"list": [null]}
"#;
let ints_struct = DataType::Struct(vec![Field::new("ints", DataType::Int32, true)].into());
let list_type = DataType::List(Arc::new(Field::new("item", ints_struct, true)));
let list_field = Field::new("list", list_type, true);
let schema = Arc::new(Schema::new(vec![list_field]));
let builder = ReaderBuilder::new(schema).with_batch_size(64);
let mut reader = builder.build(std::io::Cursor::new(json_content)).unwrap();

let batch = reader.next().unwrap().unwrap();

let list_row = batch.column(0).as_list::<i32>();
let values = list_row.values();
assert_eq!(values.len(), 4);
assert_eq!(values.null_count(), 1);

// write the batch to JSON, and compare output with input
let mut buf = Vec::new();
{
let mut writer = LineDelimitedWriter::new(&mut buf);
writer.write_batches(&[&batch]).unwrap();
}

assert_json_eq(
&buf,
r#"{"list":[{"ints":1}]}
{"list":[{}]}
{"list":[]}
{}
{"list":[{}]}
{"list":[null]}
"#,
);
}

#[test]
fn json_struct_array_nulls() {
let inner = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
@@ -1840,78 +1336,6 @@ mod tests {
}
}

#[test]
#[allow(deprecated)]
fn test_array_to_json_array_for_fixed_size_list_array() {
let expected_json = vec![
json!([0, 1, 2]),
json!(null),
json!([3, null, 5]),
json!([6, 7, 45]),
];

let data = vec![
Some(vec![Some(0), Some(1), Some(2)]),
None,
Some(vec![Some(3), None, Some(5)]),
Some(vec![Some(6), Some(7), Some(45)]),
];

let list_array = FixedSizeListArray::from_iter_primitive::<Int32Type, _, _>(data, 3);
let list_array = Arc::new(list_array) as ArrayRef;

assert_eq!(array_to_json_array(&list_array).unwrap(), expected_json);
}

#[test]
#[allow(deprecated)]
fn test_array_to_json_array_for_map_array() {
let expected_json = serde_json::from_value::<Vec<Value>>(json!([
[
{
"keys": "joe",
"values": 1
}
],
[
{
"keys": "blogs",
"values": 2
},
{
"keys": "foo",
"values": 4
}
],
[],
null
]))
.unwrap();

let string_builder = StringBuilder::new();
let int_builder = Int32Builder::with_capacity(4);

let mut builder = MapBuilder::new(None, string_builder, int_builder);

builder.keys().append_value("joe");
builder.values().append_value(1);
builder.append(true).unwrap();

builder.keys().append_value("blogs");
builder.values().append_value(2);
builder.keys().append_value("foo");
builder.values().append_value(4);
builder.append(true).unwrap();
builder.append(true).unwrap();
builder.append(false).unwrap();

let array = builder.finish();

let map_array = Arc::new(array) as ArrayRef;

assert_eq!(array_to_json_array(&map_array).unwrap(), expected_json);
}

#[test]
fn test_writer_explicit_nulls() -> Result<(), ArrowError> {
fn nested_list() -> (Arc<ListArray>, Arc<Field>) {