From e032faa383c7669e536c13c7209bff4c04e41940 Mon Sep 17 00:00:00 2001 From: JonasDev1 Date: Mon, 30 Sep 2024 15:31:01 +0200 Subject: [PATCH 1/5] Add union resolving for nested struct arrays --- .../core/src/datasource/avro_to_arrow/arrow_array_reader.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/core/src/datasource/avro_to_arrow/arrow_array_reader.rs b/datafusion/core/src/datasource/avro_to_arrow/arrow_array_reader.rs index 3a5d50bba07f..98756d21b3b1 100644 --- a/datafusion/core/src/datasource/avro_to_arrow/arrow_array_reader.rs +++ b/datafusion/core/src/datasource/avro_to_arrow/arrow_array_reader.rs @@ -573,7 +573,7 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> { // extract list values, with non-lists converted to Value::Null let array_item_count = rows .iter() - .map(|row| match row { + .map(|row| match maybe_resolve_union(row) { Value::Array(values) => values.len(), _ => 1, }) From 6a1b2978e799f93b4d69e921b9af01f660dac182 Mon Sep 17 00:00:00 2001 From: JonasDev1 Date: Mon, 30 Sep 2024 18:47:04 +0200 Subject: [PATCH 2/5] Add test --- .../avro_to_arrow/arrow_array_reader.rs | 77 +++++++++++++++++++ 1 file changed, 77 insertions(+) diff --git a/datafusion/core/src/datasource/avro_to_arrow/arrow_array_reader.rs b/datafusion/core/src/datasource/avro_to_arrow/arrow_array_reader.rs index 98756d21b3b1..7e63211505cb 100644 --- a/datafusion/core/src/datasource/avro_to_arrow/arrow_array_reader.rs +++ b/datafusion/core/src/datasource/avro_to_arrow/arrow_array_reader.rs @@ -1643,6 +1643,83 @@ mod test { assert_batches_eq!(expected, &[batch]); } + #[test] + fn test_avro_nullable_struct() { + let schema = apache_avro::Schema::parse_str( + r#" + { + "type": "record", + "name": "r1", + "fields": [ + { + "name": "col1", + "type": [ + "null", + { + "type": "record", + "name": "r2", + "fields": [ + { + "name": "col2", + "type": ["null", "string"] + } + ] + } + ], + "default": null + } + ] + }"#, + ) + .unwrap(); + let r1 = apache_avro::to_value(serde_json::json!({ "col1": null })) + .unwrap() + .resolve(&schema) + .unwrap(); + let r2 = apache_avro::to_value(serde_json::json!({ + "col1": { + "col2": "hello" + } + })) + .unwrap() + .resolve(&schema) + .unwrap(); + let r3 = apache_avro::to_value(serde_json::json!({ + "col1": { + "col2": null + } + })) + .unwrap() + .resolve(&schema) + .unwrap(); + + let mut w = apache_avro::Writer::new(&schema, vec![]); + w.append(r1).unwrap(); + w.append(r2).unwrap(); + w.append(r3).unwrap(); + let bytes = w.into_inner().unwrap(); + + let mut reader = ReaderBuilder::new() + .read_schema() + .with_batch_size(3) + .build(std::io::Cursor::new(bytes)) + .unwrap(); + let batch = reader.next().unwrap().unwrap(); + assert_eq!(batch.num_rows(), 3); + assert_eq!(batch.num_columns(), 1); + + let expected = [ + "+---------------+", + "| col1 |", + "+---------------+", + "| |", + "| {col2: hello} |", + "| {col2: } |", + "+---------------+", + ]; + assert_batches_eq!(expected, &[batch]); + } + #[test] fn test_avro_iterator() { let reader = build_reader("alltypes_plain.avro", 5); From 640d36cc5bf0a94eaf4622fa8aef95746a0da333 Mon Sep 17 00:00:00 2001 From: JonasDev1 Date: Mon, 30 Sep 2024 18:51:12 +0200 Subject: [PATCH 3/5] Change test --- .../avro_to_arrow/arrow_array_reader.rs | 97 ++++++++++--------- 1 file changed, 49 insertions(+), 48 deletions(-) diff --git a/datafusion/core/src/datasource/avro_to_arrow/arrow_array_reader.rs b/datafusion/core/src/datasource/avro_to_arrow/arrow_array_reader.rs index 7e63211505cb..24bc4a61cb93 100644 --- a/datafusion/core/src/datasource/avro_to_arrow/arrow_array_reader.rs +++ b/datafusion/core/src/datasource/avro_to_arrow/arrow_array_reader.rs @@ -1644,82 +1644,83 @@ mod test { } #[test] - fn test_avro_nullable_struct() { + fn test_avro_nullable_struct_array() { let schema = apache_avro::Schema::parse_str( r#" { - "type": "record", - "name": "r1", - "fields": [ + "type": "record", + "name": "r1", + "fields": [ + { + "name": "col1", + "type": [ + "null", { - "name": "col1", - "type": [ - "null", - { - "type": "record", - "name": "r2", - "fields": [ - { - "name": "col2", - "type": ["null", "string"] - } - ] - } - ], - "default": null + "type": "array", + "items": { + "type": [ + "null", + { + "type": "record", + "name": "Item", + "fields": [ + { + "name": "id", + "type": "long" + } + ] + } + ] + } } - ] + ], + "default": null + } + ] }"#, ) .unwrap(); - let r1 = apache_avro::to_value(serde_json::json!({ "col1": null })) + let jv1 = serde_json::json!({ + "col1": [ + { + "id": 234 + } + ] + }); + let r1 = apache_avro::to_value(jv1) + .unwrap() + .resolve(&schema) + .unwrap(); + let r2 = apache_avro::to_value(serde_json::json!({ "col1": null })) .unwrap() .resolve(&schema) .unwrap(); - let r2 = apache_avro::to_value(serde_json::json!({ - "col1": { - "col2": "hello" - } - })) - .unwrap() - .resolve(&schema) - .unwrap(); - let r3 = apache_avro::to_value(serde_json::json!({ - "col1": { - "col2": null - } - })) - .unwrap() - .resolve(&schema) - .unwrap(); let mut w = apache_avro::Writer::new(&schema, vec![]); w.append(r1).unwrap(); w.append(r2).unwrap(); - w.append(r3).unwrap(); let bytes = w.into_inner().unwrap(); let mut reader = ReaderBuilder::new() .read_schema() - .with_batch_size(3) + .with_batch_size(2) .build(std::io::Cursor::new(bytes)) .unwrap(); let batch = reader.next().unwrap().unwrap(); - assert_eq!(batch.num_rows(), 3); + assert_eq!(batch.num_rows(), 2); assert_eq!(batch.num_columns(), 1); let expected = [ - "+---------------+", - "| col1 |", - "+---------------+", - "| |", - "| {col2: hello} |", - "| {col2: } |", - "+---------------+", + "+-------------+", + "| col1 |", + "+-------------+", + "| [{id: 234}] |", + "| |", + "+-------------+", ]; assert_batches_eq!(expected, &[batch]); } - + #[test] fn test_avro_iterator() { let reader = build_reader("alltypes_plain.avro", 5); From 38280a36549c318b01e2418783bfaa4d1c55f01b Mon Sep 17 00:00:00 2001 From: JonasDev1 Date: Tue, 1 Oct 2024 09:58:23 +0200 Subject: [PATCH 4/5] Reproduce index error --- .../avro_to_arrow/arrow_array_reader.rs | 27 ++++++++++++------- 1 file changed, 18 insertions(+), 9 deletions(-) diff --git a/datafusion/core/src/datasource/avro_to_arrow/arrow_array_reader.rs b/datafusion/core/src/datasource/avro_to_arrow/arrow_array_reader.rs index 24bc4a61cb93..9173b47b5a00 100644 --- a/datafusion/core/src/datasource/avro_to_arrow/arrow_array_reader.rs +++ b/datafusion/core/src/datasource/avro_to_arrow/arrow_array_reader.rs @@ -1684,6 +1684,9 @@ mod test { "col1": [ { "id": 234 + }, + { + "id": 345 } ] }); @@ -1697,26 +1700,32 @@ mod test { .unwrap(); let mut w = apache_avro::Writer::new(&schema, vec![]); - w.append(r1).unwrap(); + for _i in 0..5 { + w.append(r1.clone()).unwrap(); + } w.append(r2).unwrap(); let bytes = w.into_inner().unwrap(); let mut reader = ReaderBuilder::new() .read_schema() - .with_batch_size(2) + .with_batch_size(20) .build(std::io::Cursor::new(bytes)) .unwrap(); let batch = reader.next().unwrap().unwrap(); - assert_eq!(batch.num_rows(), 2); + assert_eq!(batch.num_rows(), 6); assert_eq!(batch.num_columns(), 1); let expected = [ - "+-------------+", - "| col1 |", - "+-------------+", - "| [{id: 234}] |", - "| |", - "+-------------+", + "+------------------------+", + "| col1 |", + "+------------------------+", + "| [{id: 234}, {id: 345}] |", + "| [{id: 234}, {id: 345}] |", + "| [{id: 234}, {id: 345}] |", + "| [{id: 234}, {id: 345}] |", + "| [{id: 234}, {id: 345}] |", + "| |", + "+------------------------+", ]; assert_batches_eq!(expected, &[batch]); } From c0ef0aee72b7ebcd497b138103f0f62dc01d685c Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 2 Oct 2024 07:25:29 -0400 Subject: [PATCH 5/5] fmt --- .../core/src/datasource/avro_to_arrow/arrow_array_reader.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/core/src/datasource/avro_to_arrow/arrow_array_reader.rs b/datafusion/core/src/datasource/avro_to_arrow/arrow_array_reader.rs index 9173b47b5a00..98b6702bc383 100644 --- a/datafusion/core/src/datasource/avro_to_arrow/arrow_array_reader.rs +++ b/datafusion/core/src/datasource/avro_to_arrow/arrow_array_reader.rs @@ -1729,7 +1729,7 @@ mod test { ]; assert_batches_eq!(expected, &[batch]); } - + #[test] fn test_avro_iterator() { let reader = build_reader("alltypes_plain.avro", 5);