-
Notifications
You must be signed in to change notification settings - Fork 842
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix generate_nested_dictionary_case
integration test failure
#1636
Changes from 3 commits
ae20913
3ba6256
f984dbf
7622bc3
3090a00
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -54,14 +54,15 @@ fn read_buffer(buf: &ipc::Buffer, a_data: &[u8]) -> Buffer { | |||||
/// - cast the 64-bit array to the appropriate data type | ||||||
fn create_array( | ||||||
nodes: &[ipc::FieldNode], | ||||||
data_type: &DataType, | ||||||
field: &Field, | ||||||
data: &[u8], | ||||||
buffers: &[ipc::Buffer], | ||||||
dictionaries: &[Option<ArrayRef>], | ||||||
dictionaries: &HashMap<i64, ArrayRef>, | ||||||
mut node_index: usize, | ||||||
mut buffer_index: usize, | ||||||
) -> Result<(ArrayRef, usize, usize)> { | ||||||
use DataType::*; | ||||||
let data_type = field.data_type(); | ||||||
let array = match data_type { | ||||||
Utf8 | Binary | LargeBinary | LargeUtf8 => { | ||||||
let array = create_primitive_array( | ||||||
|
@@ -99,7 +100,7 @@ fn create_array( | |||||
buffer_index += 2; | ||||||
let triple = create_array( | ||||||
nodes, | ||||||
list_field.data_type(), | ||||||
list_field, | ||||||
data, | ||||||
buffers, | ||||||
dictionaries, | ||||||
|
@@ -121,7 +122,7 @@ fn create_array( | |||||
buffer_index += 1; | ||||||
let triple = create_array( | ||||||
nodes, | ||||||
list_field.data_type(), | ||||||
list_field, | ||||||
data, | ||||||
buffers, | ||||||
dictionaries, | ||||||
|
@@ -146,7 +147,7 @@ fn create_array( | |||||
for struct_field in struct_fields { | ||||||
let triple = create_array( | ||||||
nodes, | ||||||
struct_field.data_type(), | ||||||
struct_field, | ||||||
data, | ||||||
buffers, | ||||||
dictionaries, | ||||||
|
@@ -173,15 +174,25 @@ fn create_array( | |||||
.iter() | ||||||
.map(|buf| read_buffer(buf, data)) | ||||||
.collect(); | ||||||
let value_array = dictionaries[node_index].clone().unwrap(); | ||||||
|
||||||
let dict_id = field.dict_id().ok_or_else(|| { | ||||||
ArrowError::IoError(format!("Field {} does not have dict id", field)) | ||||||
})?; | ||||||
|
||||||
let value_array = dictionaries.get(&dict_id).ok_or_else(|| { | ||||||
ArrowError::IoError(format!( | ||||||
"Cannot find a dictionary batch with dict id: {}", | ||||||
dict_id | ||||||
)) | ||||||
})?; | ||||||
node_index += 1; | ||||||
buffer_index += 2; | ||||||
|
||||||
create_dictionary_array( | ||||||
index_node, | ||||||
data_type, | ||||||
&index_buffers[..], | ||||||
value_array, | ||||||
value_array.clone(), | ||||||
) | ||||||
} | ||||||
Union(fields, mode) => { | ||||||
|
@@ -209,7 +220,7 @@ fn create_array( | |||||
for field in fields { | ||||||
let triple = create_array( | ||||||
nodes, | ||||||
field.data_type(), | ||||||
field, | ||||||
data, | ||||||
buffers, | ||||||
dictionaries, | ||||||
|
@@ -457,7 +468,7 @@ pub fn read_record_batch( | |||||
buf: &[u8], | ||||||
batch: ipc::RecordBatch, | ||||||
schema: SchemaRef, | ||||||
dictionaries: &[Option<ArrayRef>], | ||||||
dictionaries: &HashMap<i64, ArrayRef>, | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
projection: Option<&[usize]>, | ||||||
) -> Result<RecordBatch> { | ||||||
let buffers = batch.buffers().ok_or_else(|| { | ||||||
|
@@ -477,7 +488,7 @@ pub fn read_record_batch( | |||||
let field = &fields[index]; | ||||||
let triple = create_array( | ||||||
field_nodes, | ||||||
field.data_type(), | ||||||
field, | ||||||
buf, | ||||||
buffers, | ||||||
dictionaries, | ||||||
|
@@ -495,7 +506,7 @@ pub fn read_record_batch( | |||||
for field in schema.fields() { | ||||||
let triple = create_array( | ||||||
field_nodes, | ||||||
field.data_type(), | ||||||
field, | ||||||
buf, | ||||||
buffers, | ||||||
dictionaries, | ||||||
|
@@ -511,12 +522,12 @@ pub fn read_record_batch( | |||||
} | ||||||
|
||||||
/// Read the dictionary from the buffer and provided metadata, | ||||||
/// updating the `dictionaries_by_field` with the resulting dictionary | ||||||
/// updating the `dictionaries_by_id` with the resulting dictionary | ||||||
pub fn read_dictionary( | ||||||
buf: &[u8], | ||||||
batch: ipc::DictionaryBatch, | ||||||
schema: &Schema, | ||||||
dictionaries_by_field: &mut [Option<ArrayRef>], | ||||||
dictionaries_by_id: &mut HashMap<i64, ArrayRef>, | ||||||
) -> Result<()> { | ||||||
if batch.isDelta() { | ||||||
return Err(ArrowError::IoError( | ||||||
|
@@ -545,7 +556,7 @@ pub fn read_dictionary( | |||||
buf, | ||||||
batch.data().unwrap(), | ||||||
Arc::new(schema), | ||||||
dictionaries_by_field, | ||||||
dictionaries_by_id, | ||||||
None, | ||||||
)?; | ||||||
Some(record_batch.column(0).clone()) | ||||||
|
@@ -556,16 +567,10 @@ pub fn read_dictionary( | |||||
ArrowError::InvalidArgumentError("dictionary id not found in schema".to_string()) | ||||||
})?; | ||||||
|
||||||
// for all fields with this dictionary id, update the dictionaries vector | ||||||
// in the reader. Note that a dictionary batch may be shared between many fields. | ||||||
// We don't currently record the isOrdered field. This could be general | ||||||
// attributes of arrays. | ||||||
for (i, field) in schema.all_fields().iter().enumerate() { | ||||||
if field.dict_id() == Some(id) { | ||||||
// Add (possibly multiple) array refs to the dictionaries array. | ||||||
dictionaries_by_field[i] = Some(dictionary_values.clone()); | ||||||
} | ||||||
} | ||||||
// Add (possibly multiple) array refs to the dictionaries array. | ||||||
dictionaries_by_id.insert(id, dictionary_values.clone()); | ||||||
|
||||||
Ok(()) | ||||||
} | ||||||
|
@@ -592,7 +597,7 @@ pub struct FileReader<R: Read + Seek> { | |||||
/// Optional dictionaries for each schema field. | ||||||
/// | ||||||
/// Dictionaries may be appended to in the streaming format. | ||||||
dictionaries_by_field: Vec<Option<ArrayRef>>, | ||||||
dictionaries_by_id: HashMap<i64, ArrayRef>, | ||||||
|
||||||
/// Metadata version | ||||||
metadata_version: ipc::MetadataVersion, | ||||||
|
@@ -650,7 +655,7 @@ impl<R: Read + Seek> FileReader<R> { | |||||
let schema = ipc::convert::fb_to_schema(ipc_schema); | ||||||
|
||||||
// Create an array of optional dictionary value arrays, one per field. | ||||||
let mut dictionaries_by_field = vec![None; schema.all_fields().len()]; | ||||||
let mut dictionaries_by_id = HashMap::new(); | ||||||
if let Some(dictionaries) = footer.dictionaries() { | ||||||
for block in dictionaries { | ||||||
// read length from end of offset | ||||||
|
@@ -683,12 +688,7 @@ impl<R: Read + Seek> FileReader<R> { | |||||
))?; | ||||||
reader.read_exact(&mut buf)?; | ||||||
|
||||||
read_dictionary( | ||||||
&buf, | ||||||
batch, | ||||||
&schema, | ||||||
&mut dictionaries_by_field, | ||||||
)?; | ||||||
read_dictionary(&buf, batch, &schema, &mut dictionaries_by_id)?; | ||||||
} | ||||||
t => { | ||||||
return Err(ArrowError::IoError(format!( | ||||||
|
@@ -713,7 +713,7 @@ impl<R: Read + Seek> FileReader<R> { | |||||
blocks: blocks.to_vec(), | ||||||
current_block: 0, | ||||||
total_blocks, | ||||||
dictionaries_by_field, | ||||||
dictionaries_by_id, | ||||||
metadata_version: footer.version(), | ||||||
projection, | ||||||
}) | ||||||
|
@@ -795,7 +795,7 @@ impl<R: Read + Seek> FileReader<R> { | |||||
&buf, | ||||||
batch, | ||||||
self.schema(), | ||||||
&self.dictionaries_by_field, | ||||||
&self.dictionaries_by_id, | ||||||
self.projection.as_ref().map(|x| x.0.as_ref()), | ||||||
|
||||||
).map(Some) | ||||||
|
@@ -840,7 +840,7 @@ pub struct StreamReader<R: Read> { | |||||
/// Optional dictionaries for each schema field. | ||||||
/// | ||||||
/// Dictionaries may be appended to in the streaming format. | ||||||
dictionaries_by_field: Vec<Option<ArrayRef>>, | ||||||
dictionaries_by_id: HashMap<i64, ArrayRef>, | ||||||
|
||||||
/// An indicator of whether the stream is complete. | ||||||
/// | ||||||
|
@@ -884,7 +884,7 @@ impl<R: Read> StreamReader<R> { | |||||
let schema = ipc::convert::fb_to_schema(ipc_schema); | ||||||
|
||||||
// Create an array of optional dictionary value arrays, one per field. | ||||||
let dictionaries_by_field = vec![None; schema.all_fields().len()]; | ||||||
let dictionaries_by_id = HashMap::new(); | ||||||
|
||||||
let projection = match projection { | ||||||
Some(projection_indices) => { | ||||||
|
@@ -897,7 +897,7 @@ impl<R: Read> StreamReader<R> { | |||||
reader, | ||||||
schema: Arc::new(schema), | ||||||
finished: false, | ||||||
dictionaries_by_field, | ||||||
dictionaries_by_id, | ||||||
projection, | ||||||
}) | ||||||
} | ||||||
|
@@ -971,7 +971,7 @@ impl<R: Read> StreamReader<R> { | |||||
let mut buf = vec![0; message.bodyLength() as usize]; | ||||||
self.reader.read_exact(&mut buf)?; | ||||||
|
||||||
read_record_batch(&buf, batch, self.schema(), &self.dictionaries_by_field, self.projection.as_ref().map(|x| x.0.as_ref())).map(Some) | ||||||
read_record_batch(&buf, batch, self.schema(), &self.dictionaries_by_id, self.projection.as_ref().map(|x| x.0.as_ref())).map(Some) | ||||||
} | ||||||
ipc::MessageHeader::DictionaryBatch => { | ||||||
let batch = message.header_as_dictionary_batch().ok_or_else(|| { | ||||||
|
@@ -984,7 +984,7 @@ impl<R: Read> StreamReader<R> { | |||||
self.reader.read_exact(&mut buf)?; | ||||||
|
||||||
read_dictionary( | ||||||
&buf, batch, &self.schema, &mut self.dictionaries_by_field | ||||||
&buf, batch, &self.schema, &mut self.dictionaries_by_id | ||||||
)?; | ||||||
|
||||||
// read the next message until we encounter a RecordBatch | ||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -578,7 +578,12 @@ fn array_from_json( | |
.get(&dict_id); | ||
match dictionary { | ||
Some(dictionary) => dictionary_array_from_json( | ||
field, json_col, key_type, value_type, dictionary, | ||
field, | ||
json_col, | ||
key_type, | ||
value_type, | ||
dictionary, | ||
dictionaries, | ||
), | ||
None => Err(ArrowError::JsonError(format!( | ||
"Unable to find dictionary for field {:?}", | ||
|
@@ -640,6 +645,7 @@ fn dictionary_array_from_json( | |
dict_key: &DataType, | ||
dict_value: &DataType, | ||
dictionary: &ArrowJsonDictionaryBatch, | ||
dictionaries: Option<&HashMap<i64, ArrowJsonDictionaryBatch>>, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Passing in the map of dictionaries so nested dictionary can be used. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is where the following error comes out when running
After fixing this, there is index error fixed by #1636 (review). |
||
) -> Result<ArrayRef> { | ||
match dict_key { | ||
DataType::Int8 | ||
|
@@ -667,9 +673,11 @@ fn dictionary_array_from_json( | |
let keys = array_from_json(&key_field, json_col, None)?; | ||
// note: not enough info on nullability of dictionary | ||
let value_field = Field::new("value", dict_value.clone(), true); | ||
println!("dictionary value type: {:?}", dict_value); | ||
let values = | ||
array_from_json(&value_field, dictionary.data.columns[0].clone(), None)?; | ||
let values = array_from_json( | ||
&value_field, | ||
dictionary.data.columns[0].clone(), | ||
dictionaries, | ||
)?; | ||
|
||
// convert key and value to dictionary data | ||
let dict_data = ArrayData::builder(field.data_type().clone()) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe this would more consistent with the names used in the rest of this PR