From a94d930016f1b61c9c159c2b8a9ac38a3d2a4897 Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Tue, 21 Jun 2022 16:08:11 +0000 Subject: [PATCH] Fixed error in serializing batch to flight --- .../src/flight_client_scenarios/integration_test.rs | 2 +- .../src/flight_server_scenarios/integration_test.rs | 2 +- src/io/flight/mod.rs | 12 +++++++++--- 3 files changed, 11 insertions(+), 5 deletions(-) diff --git a/integration-testing/src/flight_client_scenarios/integration_test.rs b/integration-testing/src/flight_client_scenarios/integration_test.rs index 4d028292f40..19a4b1a4ef1 100644 --- a/integration-testing/src/flight_client_scenarios/integration_test.rs +++ b/integration-testing/src/flight_client_scenarios/integration_test.rs @@ -147,7 +147,7 @@ async fn send_batch( fields: &[IpcField], options: &write::WriteOptions, ) -> Result { - let (dictionary_flight_data, mut batch_flight_data) = serialize_batch(batch, fields, options); + let (dictionary_flight_data, mut batch_flight_data) = serialize_batch(batch, fields, options)?; upload_tx .send_all(&mut stream::iter(dictionary_flight_data).map(Ok)) diff --git a/integration-testing/src/flight_server_scenarios/integration_test.rs b/integration-testing/src/flight_server_scenarios/integration_test.rs index 73b4cd9dbe8..07453f5c724 100644 --- a/integration-testing/src/flight_server_scenarios/integration_test.rs +++ b/integration-testing/src/flight_server_scenarios/integration_test.rs @@ -122,7 +122,7 @@ impl FlightService for FlightServiceImpl { .enumerate() .flat_map(|(counter, batch)| { let (dictionary_flight_data, mut batch_flight_data) = - serialize_batch(batch, &flight.ipc_schema.fields, &options); + serialize_batch(batch, &flight.ipc_schema.fields, &options).unwrap(); // Only the record batch's FlightData gets app_metadata let metadata = counter.to_string().into_bytes(); diff --git a/src/io/flight/mod.rs b/src/io/flight/mod.rs index 4b848e3adf2..b439ced2d9b 100644 --- a/src/io/flight/mod.rs +++ b/src/io/flight/mod.rs @@ -14,16 +14,22 @@ use crate::{ io::ipc::write::common::{encode_chunk, DictionaryTracker, EncodedData, WriteOptions}, }; -use super::ipc::write::default_ipc_fields; +pub use super::ipc::write::default_ipc_fields; use super::ipc::{IpcField, IpcSchema}; /// Serializes [`Chunk`] to a vector of [`FlightData`] representing the serialized dictionaries /// and a [`FlightData`] representing the batch. +/// # Errors +/// This function errors iff `fields` is not consistent with `columns` pub fn serialize_batch( columns: &Chunk>, fields: &[IpcField], options: &WriteOptions, -) -> (Vec, FlightData) { +) -> Result<(Vec, FlightData)> { + if fields.len() != columns.arrays().len() { + return Err(Error::InvalidArgumentError("The argument `fields` must be consistent with the columns' schema. Use e.g. &arrow2::io::flight::default_ipc_fields(&schema.fields)".to_string())); + } + let mut dictionary_tracker = DictionaryTracker { dictionaries: Default::default(), cannot_replace: false, @@ -36,7 +42,7 @@ pub fn serialize_batch( let flight_dictionaries = encoded_dictionaries.into_iter().map(Into::into).collect(); let flight_batch = encoded_batch.into(); - (flight_dictionaries, flight_batch) + Ok((flight_dictionaries, flight_batch)) } impl From for FlightData {