Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Simpler IPC API #1208

Merged
merged 1 commit into from
Aug 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion integration-testing/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ logging = ["tracing-subscriber"]

[dependencies]
arrow2 = { path = "../", features = ["io_ipc", "io_ipc_compression", "io_flight", "io_json_integration"] }
arrow-format = { version = "0.7", features = ["full"] }
arrow-format = { version = "0.7", features = ["flight-data", "flight-service"] }
async-trait = "0.1.41"
clap = { version = "^3", features = ["derive"] }
futures = "0.3"
Expand Down
44 changes: 10 additions & 34 deletions integration-testing/src/flight_client_scenarios/integration_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,15 @@ use arrow2::{
array::Array,
chunk::Chunk,
datatypes::*,
io::ipc::{
read::{self, Dictionaries},
write, IpcSchema,
},
io::{
flight::{self, deserialize_batch, serialize_batch},
ipc::IpcField,
ipc::{read::Dictionaries, write, IpcField, IpcSchema},
},
};
use arrow_format::flight::service::flight_service_client::FlightServiceClient;
use arrow_format::ipc;
use arrow_format::{
flight::data::{
flight_descriptor::DescriptorType, FlightData, FlightDescriptor, Location, Ticket,
},
ipc::planus::ReadAsRoot,
use arrow_format::flight::data::{
flight_descriptor::DescriptorType, FlightData, FlightDescriptor, Location, Ticket,
};
use arrow_format::flight::service::flight_service_client::FlightServiceClient;
use futures::{stream::BoxStream, StreamExt, TryStreamExt};
use tonic::{Request, Streaming};

Expand Down Expand Up @@ -233,30 +225,14 @@ async fn read_dictionaries(
dictionaries: &mut Dictionaries,
) -> Option<FlightData> {
let mut data = stream.next().await?.ok()?;
let mut message =
ipc::MessageRef::read_as_root(&data.data_header).expect("Error parsing first message");

while let ipc::MessageHeaderRef::DictionaryBatch(chunk) = message
.header()
.expect("Header to be valid flatbuffers")
.expect("Header to be present")
{
let length = data.data_body.len();
let mut reader = std::io::Cursor::new(&data.data_body);
read::read_dictionary(
chunk,
fields,
ipc_schema,
dictionaries,
&mut reader,
0,
length as u64,
&mut Default::default(),
)
.expect("Error reading dictionary");

let existing = dictionaries.len();
loop {
flight::deserialize_dictionary(&data, fields, ipc_schema, dictionaries).ok()?;
if dictionaries.len() == existing {
break;
}
data = stream.next().await?.ok()?;
message = ipc::MessageRef::read_as_root(&data.data_header).expect("Error parsing message");
}

Some(data)
Expand Down
104 changes: 10 additions & 94 deletions integration-testing/src/flight_server_scenarios/integration_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,13 @@ use tonic::{transport::Server, Request, Response, Status, Streaming};
use arrow_format::flight::data::flight_descriptor::*;
use arrow_format::flight::data::*;
use arrow_format::flight::service::flight_service_server::*;
use arrow_format::ipc::planus::ReadAsRoot;
use arrow_format::ipc::MessageHeaderRef;

use arrow2::array::Array;
use arrow2::chunk::Chunk;
use arrow2::datatypes::{Field, Schema};
use arrow2::datatypes::Schema;
use arrow2::io::flight::{
deserialize_schemas, serialize_batch, serialize_schema, serialize_schema_to_info,
deserialize_message, deserialize_schemas, serialize_batch, serialize_schema,
serialize_schema_to_info,
};
use arrow2::io::ipc;
use arrow2::io::ipc::read::Dictionaries;
Expand Down Expand Up @@ -228,14 +227,14 @@ impl FlightService for Service {
let stream = try_stream! {
pin_mut!(input_stream);
for await item in input_stream {
let FlightData {data_header, data_body, app_metadata, ..} = item.map_err(|_| Status::invalid_argument(format!("Invalid")))?;
save_message(&data_header,
&data_body,
&schema,
let data = item.map_err(|_| Status::invalid_argument(format!("Invalid")))?;
let maybe_chunk = deserialize_message(&data, &schema.fields,
&ipc_schema,
&mut dictionaries,
&mut chunks)?;
yield PutResult {app_metadata}
&mut dictionaries).map_err(|_| Status::invalid_argument(format!("Invalid")))?;
if let Some(chunk) = maybe_chunk {
chunks.push(chunk)
}
yield PutResult {app_metadata: data.app_metadata}
}
let dataset = IntegrationDataset {
schema,
Expand Down Expand Up @@ -269,86 +268,3 @@ impl FlightService for Service {
Err(Status::unimplemented("do_exchange"))
}
}

fn chunk_from_message(
batch: arrow_format::ipc::RecordBatchRef<'_>,
data_body: &[u8],
fields: &[Field],
ipc_schema: &ipc::IpcSchema,
dictionaries: &mut Dictionaries,
) -> Result<Chunk<Box<dyn Array>>, Status> {
let length = data_body.len();
let mut reader = std::io::Cursor::new(data_body);

ipc::read::read_record_batch(
batch,
fields,
ipc_schema,
None,
None,
dictionaries,
arrow_format::ipc::MetadataVersion::V5,
&mut reader,
0,
length as u64,
&mut Default::default(),
)
.map_err(|e| Status::internal(format!("Could not convert to Chunk: {:?}", e)))
}

fn dictionary_from_message(
dict_batch: arrow_format::ipc::DictionaryBatchRef<'_>,
data_body: &[u8],
fields: &[Field],
ipc_schema: &ipc::IpcSchema,
dictionaries: &mut Dictionaries,
) -> Result<(), Status> {
let length = data_body.len();
let mut reader = std::io::Cursor::new(data_body);

ipc::read::read_dictionary(
dict_batch,
fields,
ipc_schema,
dictionaries,
&mut reader,
0,
length as u64,
&mut Default::default(),
)
.map_err(|e| Status::internal(format!("Could not convert to Dictionary: {:?}", e)))
}

fn save_message(
header: &[u8],
body: &[u8],
schema: &Schema,
ipc_schema: &ipc::IpcSchema,
dictionaries: &mut Dictionaries,
chunks: &mut Vec<Chunk<Box<dyn Array>>>,
) -> Result<(), Status> {
let message = arrow_format::ipc::MessageRef::read_as_root(header)
.map_err(|e| Status::internal(format!("Could not parse message: {:?}", e)))?;
let header = message
.header()
.map_err(|x| Status::internal(x.to_string()))?
.ok_or_else(|| Status::internal("Message must contain a header".to_string()))?;

match header {
MessageHeaderRef::RecordBatch(batch) => {
let chunk = chunk_from_message(batch, body, &schema.fields, ipc_schema, dictionaries)?;

chunks.push(chunk);
}
MessageHeaderRef::DictionaryBatch(dict_batch) => {
dictionary_from_message(dict_batch, body, &schema.fields, ipc_schema, dictionaries)?;
}
t => {
return Err(Status::internal(format!(
"Reading types other than record batches not yet supported, unable to read {:?}",
t
)));
}
}
Ok(())
}
102 changes: 101 additions & 1 deletion src/io/flight/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use crate::{
io::ipc::write::common::{encode_chunk, DictionaryTracker, EncodedData},
};

use super::ipc::read::Dictionaries;
use super::ipc::{IpcField, IpcSchema};

pub use super::ipc::write::default_ipc_fields;
Expand Down Expand Up @@ -114,7 +115,7 @@ pub fn deserialize_schemas(bytes: &[u8]) -> Result<(Schema, IpcSchema)> {
read::deserialize_schema(bytes)
}

/// Deserializes [`FlightData`] to [`Chunk`].
/// Deserializes [`FlightData`] representing a record batch message to [`Chunk`].
pub fn deserialize_batch(
data: &FlightData,
fields: &[Field],
Expand Down Expand Up @@ -149,3 +150,102 @@ pub fn deserialize_batch(
)),
}
}

/// Deserializes [`FlightData`], assuming it to be a dictionary message, into `dictionaries`.
pub fn deserialize_dictionary(
data: &FlightData,
fields: &[Field],
ipc_schema: &IpcSchema,
dictionaries: &mut read::Dictionaries,
) -> Result<()> {
let message = ipc::MessageRef::read_as_root(&data.data_header)?;

let chunk = if let ipc::MessageHeaderRef::DictionaryBatch(chunk) = message
.header()?
.ok_or_else(|| Error::oos("Header is required"))?
{
chunk
} else {
return Ok(());
};

let length = data.data_body.len();
let mut reader = std::io::Cursor::new(&data.data_body);
read::read_dictionary(
chunk,
fields,
ipc_schema,
dictionaries,
&mut reader,
0,
length as u64,
&mut Default::default(),
)?;

Ok(())
}

/// Deserializes [`FlightData`] into either a [`Chunk`] (when the message is a record batch)
/// or by upserting into `dictionaries` (when the message is a dictionary)
pub fn deserialize_message(
data: &FlightData,
fields: &[Field],
ipc_schema: &IpcSchema,
dictionaries: &mut Dictionaries,
) -> Result<Option<Chunk<Box<dyn Array>>>> {
let FlightData {
data_header,
data_body,
..
} = data;

let message = arrow_format::ipc::MessageRef::read_as_root(data_header)?;
let header = message
.header()?
.ok_or_else(|| Error::oos("IPC Message must contain a header"))?;

match header {
ipc::MessageHeaderRef::RecordBatch(batch) => {
let length = data_body.len();
let mut reader = std::io::Cursor::new(data_body);

let chunk = read::read_record_batch(
batch,
fields,
ipc_schema,
None,
None,
dictionaries,
arrow_format::ipc::MetadataVersion::V5,
&mut reader,
0,
length as u64,
&mut Default::default(),
)?;

Ok(chunk.into())
}
ipc::MessageHeaderRef::DictionaryBatch(dict_batch) => {
let length = data_body.len();
let mut reader = std::io::Cursor::new(data_body);

read::read_dictionary(
dict_batch,
fields,
ipc_schema,
dictionaries,
&mut reader,
0,
length as u64,
&mut Default::default(),
)?;
Ok(None)
}
t => {
return Err(Error::nyi(format!(
"Reading types other than record batches not yet supported, unable to read {:?}",
t
)));
}
}
}
Loading