Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix generate_nested_dictionary_case integration test failure #1636

Merged
merged 5 commits into from
May 8, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion arrow-flight/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
//! Utilities to assist with reading and writing Arrow data as Flight messages

use crate::{FlightData, IpcMessage, SchemaAsIpc, SchemaResult};
use std::collections::HashMap;

use arrow::array::ArrayRef;
use arrow::datatypes::{Schema, SchemaRef};
Expand Down Expand Up @@ -49,7 +50,7 @@ pub fn flight_data_from_arrow_batch(
pub fn flight_data_to_arrow_batch(
data: &FlightData,
schema: SchemaRef,
dictionaries_by_field: &[Option<ArrayRef>],
dictionaries_by_field: &HashMap<i64, ArrayRef>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be renamed to dictionaries_by_id, here and in all the other places?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. I will update them.

) -> Result<RecordBatch> {
// check that the data_header is a record batch message
let message = arrow::ipc::root_as_message(&data.data_header[..]).map_err(|err| {
Expand Down
43 changes: 20 additions & 23 deletions arrow/src/ipc/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,15 @@ fn read_buffer(buf: &ipc::Buffer, a_data: &[u8]) -> Buffer {
/// - cast the 64-bit array to the appropriate data type
fn create_array(
nodes: &[ipc::FieldNode],
data_type: &DataType,
field: &Field,
data: &[u8],
buffers: &[ipc::Buffer],
dictionaries: &[Option<ArrayRef>],
dictionaries: &HashMap<i64, ArrayRef>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
dictionaries: &HashMap<i64, ArrayRef>,
dictionaries_by_id: &HashMap<i64, ArrayRef>,

Maybe this would more consistent with the names used in the rest of this PR

mut node_index: usize,
mut buffer_index: usize,
) -> Result<(ArrayRef, usize, usize)> {
use DataType::*;
let data_type = field.data_type();
let array = match data_type {
Utf8 | Binary | LargeBinary | LargeUtf8 => {
let array = create_primitive_array(
Expand Down Expand Up @@ -99,7 +100,7 @@ fn create_array(
buffer_index += 2;
let triple = create_array(
nodes,
list_field.data_type(),
list_field,
data,
buffers,
dictionaries,
Expand All @@ -121,7 +122,7 @@ fn create_array(
buffer_index += 1;
let triple = create_array(
nodes,
list_field.data_type(),
list_field,
data,
buffers,
dictionaries,
Expand All @@ -146,7 +147,7 @@ fn create_array(
for struct_field in struct_fields {
let triple = create_array(
nodes,
struct_field.data_type(),
struct_field,
data,
buffers,
dictionaries,
Expand All @@ -173,7 +174,9 @@ fn create_array(
.iter()
.map(|buf| read_buffer(buf, data))
.collect();
let value_array = dictionaries[node_index].clone().unwrap();

let value_array =
dictionaries.get(&field.dict_id().unwrap()).unwrap().clone();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could maybe return an error here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, updated.

node_index += 1;
buffer_index += 2;

Expand Down Expand Up @@ -209,7 +212,7 @@ fn create_array(
for field in fields {
let triple = create_array(
nodes,
field.data_type(),
field,
data,
buffers,
dictionaries,
Expand Down Expand Up @@ -457,7 +460,7 @@ pub fn read_record_batch(
buf: &[u8],
batch: ipc::RecordBatch,
schema: SchemaRef,
dictionaries: &[Option<ArrayRef>],
dictionaries: &HashMap<i64, ArrayRef>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
dictionaries: &HashMap<i64, ArrayRef>,
dictionaries_by_id: &HashMap<i64, ArrayRef>,

projection: Option<&[usize]>,
) -> Result<RecordBatch> {
let buffers = batch.buffers().ok_or_else(|| {
Expand All @@ -477,7 +480,7 @@ pub fn read_record_batch(
let field = &fields[index];
let triple = create_array(
field_nodes,
field.data_type(),
field,
buf,
buffers,
dictionaries,
Expand All @@ -495,7 +498,7 @@ pub fn read_record_batch(
for field in schema.fields() {
let triple = create_array(
field_nodes,
field.data_type(),
field,
buf,
buffers,
dictionaries,
Expand All @@ -516,7 +519,7 @@ pub fn read_dictionary(
buf: &[u8],
batch: ipc::DictionaryBatch,
schema: &Schema,
dictionaries_by_field: &mut [Option<ArrayRef>],
dictionaries_by_field: &mut HashMap<i64, ArrayRef>,
) -> Result<()> {
if batch.isDelta() {
return Err(ArrowError::IoError(
Expand Down Expand Up @@ -556,16 +559,10 @@ pub fn read_dictionary(
ArrowError::InvalidArgumentError("dictionary id not found in schema".to_string())
})?;

// for all fields with this dictionary id, update the dictionaries vector
// in the reader. Note that a dictionary batch may be shared between many fields.
// We don't currently record the isOrdered field. This could be general
// attributes of arrays.
for (i, field) in schema.all_fields().iter().enumerate() {
if field.dict_id() == Some(id) {
// Add (possibly multiple) array refs to the dictionaries array.
dictionaries_by_field[i] = Some(dictionary_values.clone());
}
}
// Add (possibly multiple) array refs to the dictionaries array.
dictionaries_by_field.insert(id, dictionary_values.clone());
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Main difference here. In create_array, we use dictionaries[node_index] to access dictionary array by node_index. But dictionary arrays are indexed by dict id, not node index.


Ok(())
}
Expand All @@ -592,7 +589,7 @@ pub struct FileReader<R: Read + Seek> {
/// Optional dictionaries for each schema field.
///
/// Dictionaries may be appended to in the streaming format.
dictionaries_by_field: Vec<Option<ArrayRef>>,
dictionaries_by_field: HashMap<i64, ArrayRef>,

/// Metadata version
metadata_version: ipc::MetadataVersion,
Expand Down Expand Up @@ -650,7 +647,7 @@ impl<R: Read + Seek> FileReader<R> {
let schema = ipc::convert::fb_to_schema(ipc_schema);

// Create an array of optional dictionary value arrays, one per field.
let mut dictionaries_by_field = vec![None; schema.all_fields().len()];
let mut dictionaries_by_field = HashMap::new();
if let Some(dictionaries) = footer.dictionaries() {
for block in dictionaries {
// read length from end of offset
Expand Down Expand Up @@ -840,7 +837,7 @@ pub struct StreamReader<R: Read> {
/// Optional dictionaries for each schema field.
///
/// Dictionaries may be appended to in the streaming format.
dictionaries_by_field: Vec<Option<ArrayRef>>,
dictionaries_by_field: HashMap<i64, ArrayRef>,

/// An indicator of whether the stream is complete.
///
Expand Down Expand Up @@ -884,7 +881,7 @@ impl<R: Read> StreamReader<R> {
let schema = ipc::convert::fb_to_schema(ipc_schema);

// Create an array of optional dictionary value arrays, one per field.
let dictionaries_by_field = vec![None; schema.all_fields().len()];
let dictionaries_by_field = HashMap::new();

let projection = match projection {
Some(projection_indices) => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.

use crate::{read_json_file, ArrowFile};
use std::collections::HashMap;

use arrow::{
array::ArrayRef,
Expand Down Expand Up @@ -196,7 +197,7 @@ async fn consume_flight_location(
// first FlightData. Ignore this one.
let _schema_again = resp.next().await.unwrap();

let mut dictionaries_by_field = vec![None; schema.fields().len()];
let mut dictionaries_by_field = HashMap::new();

for (counter, expected_batch) in expected_data.iter().enumerate() {
let data = receive_batch_flight_data(
Expand Down Expand Up @@ -247,7 +248,7 @@ async fn consume_flight_location(
async fn receive_batch_flight_data(
resp: &mut Streaming<FlightData>,
schema: SchemaRef,
dictionaries_by_field: &mut [Option<ArrayRef>],
dictionaries_by_field: &mut HashMap<i64, ArrayRef>,
) -> Option<FlightData> {
let mut data = resp.next().await?.ok()?;
let mut message = arrow::ipc::root_as_message(&data.data_header[..])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ async fn record_batch_from_message(
message: ipc::Message<'_>,
data_body: &[u8],
schema_ref: SchemaRef,
dictionaries_by_field: &[Option<ArrayRef>],
dictionaries_by_field: &HashMap<i64, ArrayRef>,
) -> Result<RecordBatch, Status> {
let ipc_batch = message.header_as_record_batch().ok_or_else(|| {
Status::internal("Could not parse message header as record batch")
Expand All @@ -307,7 +307,7 @@ async fn dictionary_from_message(
message: ipc::Message<'_>,
data_body: &[u8],
schema_ref: SchemaRef,
dictionaries_by_field: &mut [Option<ArrayRef>],
dictionaries_by_field: &mut HashMap<i64, ArrayRef>,
) -> Result<(), Status> {
let ipc_batch = message.header_as_dictionary_batch().ok_or_else(|| {
Status::internal("Could not parse message header as dictionary batch")
Expand All @@ -331,7 +331,7 @@ async fn save_uploaded_chunks(
let mut chunks = vec![];
let mut uploaded_chunks = uploaded_chunks.lock().await;

let mut dictionaries_by_field = vec![None; schema_ref.fields().len()];
let mut dictionaries_by_field = HashMap::new();

while let Some(Ok(data)) = input_stream.next().await {
let message = arrow::ipc::root_as_message(&data.data_header[..])
Expand Down
16 changes: 12 additions & 4 deletions integration-testing/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -578,7 +578,12 @@ fn array_from_json(
.get(&dict_id);
match dictionary {
Some(dictionary) => dictionary_array_from_json(
field, json_col, key_type, value_type, dictionary,
field,
json_col,
key_type,
value_type,
dictionary,
dictionaries,
),
None => Err(ArrowError::JsonError(format!(
"Unable to find dictionary for field {:?}",
Expand Down Expand Up @@ -640,6 +645,7 @@ fn dictionary_array_from_json(
dict_key: &DataType,
dict_value: &DataType,
dictionary: &ArrowJsonDictionaryBatch,
dictionaries: Option<&HashMap<i64, ArrowJsonDictionaryBatch>>,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Passing in the map of dictionaries so nested dictionary can be used.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is where the following error comes out when running archery --debug integration --run-flight --with-cpp=false --with-rust=true:

dictionary value type: List(Field { name: "str_dict", data_type: Dictionary(Int8, Utf8), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: None })
Error: JsonError("Unable to find any dictionaries for field Field { name: \"str_dict\", data_type: Dictionary(Int8, Utf8), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: None }")

After fixing this, there is index error fixed by #1636 (review).

) -> Result<ArrayRef> {
match dict_key {
DataType::Int8
Expand Down Expand Up @@ -667,9 +673,11 @@ fn dictionary_array_from_json(
let keys = array_from_json(&key_field, json_col, None)?;
// note: not enough info on nullability of dictionary
let value_field = Field::new("value", dict_value.clone(), true);
println!("dictionary value type: {:?}", dict_value);
let values =
array_from_json(&value_field, dictionary.data.columns[0].clone(), None)?;
let values = array_from_json(
&value_field,
dictionary.data.columns[0].clone(),
dictionaries,
)?;

// convert key and value to dictionary data
let dict_data = ArrayData::builder(field.data_type().clone())
Expand Down