Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Simpler IPC API
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Aug 4, 2022
1 parent 497d431 commit 931c7ac
Show file tree
Hide file tree
Showing 9 changed files with 452 additions and 449 deletions.
2 changes: 1 addition & 1 deletion integration-testing/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ logging = ["tracing-subscriber"]

[dependencies]
arrow2 = { path = "../", features = ["io_ipc", "io_ipc_compression", "io_flight", "io_json_integration"] }
arrow-format = { version = "0.7", features = ["full"] }
arrow-format = { version = "0.7", features = ["flight-data", "flight-service"] }
async-trait = "0.1.41"
clap = { version = "^3", features = ["derive"] }
futures = "0.3"
Expand Down
44 changes: 10 additions & 34 deletions integration-testing/src/flight_client_scenarios/integration_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,15 @@ use arrow2::{
array::Array,
chunk::Chunk,
datatypes::*,
io::ipc::{
read::{self, Dictionaries},
write, IpcSchema,
},
io::{
flight::{self, deserialize_batch, serialize_batch},
ipc::IpcField,
ipc::{read::Dictionaries, write, IpcField, IpcSchema},
},
};
use arrow_format::flight::service::flight_service_client::FlightServiceClient;
use arrow_format::ipc;
use arrow_format::{
flight::data::{
flight_descriptor::DescriptorType, FlightData, FlightDescriptor, Location, Ticket,
},
ipc::planus::ReadAsRoot,
use arrow_format::flight::data::{
flight_descriptor::DescriptorType, FlightData, FlightDescriptor, Location, Ticket,
};
use arrow_format::flight::service::flight_service_client::FlightServiceClient;
use futures::{stream::BoxStream, StreamExt, TryStreamExt};
use tonic::{Request, Streaming};

Expand Down Expand Up @@ -233,30 +225,14 @@ async fn read_dictionaries(
dictionaries: &mut Dictionaries,
) -> Option<FlightData> {
let mut data = stream.next().await?.ok()?;
let mut message =
ipc::MessageRef::read_as_root(&data.data_header).expect("Error parsing first message");

while let ipc::MessageHeaderRef::DictionaryBatch(chunk) = message
.header()
.expect("Header to be valid flatbuffers")
.expect("Header to be present")
{
let length = data.data_body.len();
let mut reader = std::io::Cursor::new(&data.data_body);
read::read_dictionary(
chunk,
fields,
ipc_schema,
dictionaries,
&mut reader,
0,
length as u64,
&mut Default::default(),
)
.expect("Error reading dictionary");

let existing = dictionaries.len();
loop {
flight::deserialize_dictionary(&data, fields, ipc_schema, dictionaries).ok()?;
if dictionaries.len() == existing {
break;
}
data = stream.next().await?.ok()?;
message = ipc::MessageRef::read_as_root(&data.data_header).expect("Error parsing message");
}

Some(data)
Expand Down
104 changes: 10 additions & 94 deletions integration-testing/src/flight_server_scenarios/integration_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,13 @@ use tonic::{transport::Server, Request, Response, Status, Streaming};
use arrow_format::flight::data::flight_descriptor::*;
use arrow_format::flight::data::*;
use arrow_format::flight::service::flight_service_server::*;
use arrow_format::ipc::planus::ReadAsRoot;
use arrow_format::ipc::MessageHeaderRef;

use arrow2::array::Array;
use arrow2::chunk::Chunk;
use arrow2::datatypes::{Field, Schema};
use arrow2::datatypes::Schema;
use arrow2::io::flight::{
deserialize_schemas, serialize_batch, serialize_schema, serialize_schema_to_info,
deserialize_message, deserialize_schemas, serialize_batch, serialize_schema,
serialize_schema_to_info,
};
use arrow2::io::ipc;
use arrow2::io::ipc::read::Dictionaries;
Expand Down Expand Up @@ -228,14 +227,14 @@ impl FlightService for Service {
let stream = try_stream! {
pin_mut!(input_stream);
for await item in input_stream {
let FlightData {data_header, data_body, app_metadata, ..} = item.map_err(|_| Status::invalid_argument(format!("Invalid")))?;
save_message(&data_header,
&data_body,
&schema,
let data = item.map_err(|_| Status::invalid_argument(format!("Invalid")))?;
let maybe_chunk = deserialize_message(&data, &schema.fields,
&ipc_schema,
&mut dictionaries,
&mut chunks)?;
yield PutResult {app_metadata}
&mut dictionaries).map_err(|_| Status::invalid_argument(format!("Invalid")))?;
if let Some(chunk) = maybe_chunk {
chunks.push(chunk)
}
yield PutResult {app_metadata: data.app_metadata}
}
let dataset = IntegrationDataset {
schema,
Expand Down Expand Up @@ -269,86 +268,3 @@ impl FlightService for Service {
Err(Status::unimplemented("do_exchange"))
}
}

fn chunk_from_message(
batch: arrow_format::ipc::RecordBatchRef<'_>,
data_body: &[u8],
fields: &[Field],
ipc_schema: &ipc::IpcSchema,
dictionaries: &mut Dictionaries,
) -> Result<Chunk<Box<dyn Array>>, Status> {
let length = data_body.len();
let mut reader = std::io::Cursor::new(data_body);

ipc::read::read_record_batch(
batch,
fields,
ipc_schema,
None,
None,
dictionaries,
arrow_format::ipc::MetadataVersion::V5,
&mut reader,
0,
length as u64,
&mut Default::default(),
)
.map_err(|e| Status::internal(format!("Could not convert to Chunk: {:?}", e)))
}

fn dictionary_from_message(
dict_batch: arrow_format::ipc::DictionaryBatchRef<'_>,
data_body: &[u8],
fields: &[Field],
ipc_schema: &ipc::IpcSchema,
dictionaries: &mut Dictionaries,
) -> Result<(), Status> {
let length = data_body.len();
let mut reader = std::io::Cursor::new(data_body);

ipc::read::read_dictionary(
dict_batch,
fields,
ipc_schema,
dictionaries,
&mut reader,
0,
length as u64,
&mut Default::default(),
)
.map_err(|e| Status::internal(format!("Could not convert to Dictionary: {:?}", e)))
}

fn save_message(
header: &[u8],
body: &[u8],
schema: &Schema,
ipc_schema: &ipc::IpcSchema,
dictionaries: &mut Dictionaries,
chunks: &mut Vec<Chunk<Box<dyn Array>>>,
) -> Result<(), Status> {
let message = arrow_format::ipc::MessageRef::read_as_root(header)
.map_err(|e| Status::internal(format!("Could not parse message: {:?}", e)))?;
let header = message
.header()
.map_err(|x| Status::internal(x.to_string()))?
.ok_or_else(|| Status::internal("Message must contain a header".to_string()))?;

match header {
MessageHeaderRef::RecordBatch(batch) => {
let chunk = chunk_from_message(batch, body, &schema.fields, ipc_schema, dictionaries)?;

chunks.push(chunk);
}
MessageHeaderRef::DictionaryBatch(dict_batch) => {
dictionary_from_message(dict_batch, body, &schema.fields, ipc_schema, dictionaries)?;
}
t => {
return Err(Status::internal(format!(
"Reading types other than record batches not yet supported, unable to read {:?}",
t
)));
}
}
Ok(())
}
102 changes: 101 additions & 1 deletion src/io/flight/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use crate::{
io::ipc::write::common::{encode_chunk, DictionaryTracker, EncodedData},
};

use super::ipc::read::Dictionaries;
use super::ipc::{IpcField, IpcSchema};

pub use super::ipc::write::default_ipc_fields;
Expand Down Expand Up @@ -114,7 +115,7 @@ pub fn deserialize_schemas(bytes: &[u8]) -> Result<(Schema, IpcSchema)> {
read::deserialize_schema(bytes)
}

/// Deserializes [`FlightData`] to [`Chunk`].
/// Deserializes [`FlightData`] representing a record batch message to [`Chunk`].
pub fn deserialize_batch(
data: &FlightData,
fields: &[Field],
Expand Down Expand Up @@ -149,3 +150,102 @@ pub fn deserialize_batch(
)),
}
}

/// Deserializes [`FlightData`], assuming it to be a dictionary message, into `dictionaries`.
pub fn deserialize_dictionary(
data: &FlightData,
fields: &[Field],
ipc_schema: &IpcSchema,
dictionaries: &mut read::Dictionaries,
) -> Result<()> {
let message = ipc::MessageRef::read_as_root(&data.data_header)?;

let chunk = if let ipc::MessageHeaderRef::DictionaryBatch(chunk) = message
.header()?
.ok_or_else(|| Error::oos("Header is required"))?
{
chunk
} else {
return Ok(());
};

let length = data.data_body.len();
let mut reader = std::io::Cursor::new(&data.data_body);
read::read_dictionary(
chunk,
fields,
ipc_schema,
dictionaries,
&mut reader,
0,
length as u64,
&mut Default::default(),
)?;

Ok(())
}

/// Deserializes [`FlightData`] into either a [`Chunk`] (when the message is a record batch)
/// or by upserting into `dictionaries` (when the message is a dictionary)
pub fn deserialize_message(
data: &FlightData,
fields: &[Field],
ipc_schema: &IpcSchema,
dictionaries: &mut Dictionaries,
) -> Result<Option<Chunk<Box<dyn Array>>>> {
let FlightData {
data_header,
data_body,
..
} = data;

let message = arrow_format::ipc::MessageRef::read_as_root(data_header)?;
let header = message
.header()?
.ok_or_else(|| Error::oos("IPC Message must contain a header"))?;

match header {
ipc::MessageHeaderRef::RecordBatch(batch) => {
let length = data_body.len();
let mut reader = std::io::Cursor::new(data_body);

let chunk = read::read_record_batch(
batch,
fields,
ipc_schema,
None,
None,
dictionaries,
arrow_format::ipc::MetadataVersion::V5,
&mut reader,
0,
length as u64,
&mut Default::default(),
)?;

Ok(chunk.into())
}
ipc::MessageHeaderRef::DictionaryBatch(dict_batch) => {
let length = data_body.len();
let mut reader = std::io::Cursor::new(data_body);

read::read_dictionary(
dict_batch,
fields,
ipc_schema,
dictionaries,
&mut reader,
0,
length as u64,
&mut Default::default(),
)?;
Ok(None)
}
t => {
return Err(Error::nyi(format!(
"Reading types other than record batches not yet supported, unable to read {:?}",
t
)));
}
}
}
Loading

0 comments on commit 931c7ac

Please sign in to comment.