From b6df7821386702dee01216da880f6e3fbb019fd0 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 15 Jan 2025 07:45:54 -0500 Subject: [PATCH] Add example reading data from an `mmap`ed IPC file --- arrow/Cargo.toml | 7 ++ arrow/examples/README.md | 1 + arrow/examples/zero_copy_ipc.rs | 158 ++++++++++++++++++++++++++++++++ 3 files changed, 166 insertions(+) create mode 100644 arrow/examples/zero_copy_ipc.rs diff --git a/arrow/Cargo.toml b/arrow/Cargo.toml index 76119ec4abb4..a92b3001779e 100644 --- a/arrow/Cargo.toml +++ b/arrow/Cargo.toml @@ -87,6 +87,9 @@ criterion = { version = "0.5", default-features = false } half = { version = "2.1", default-features = false } rand = { version = "0.8", default-features = false, features = ["std", "std_rng"] } serde = { version = "1.0", default-features = false, features = ["derive"] } +# used in examples +memmap2 = "0.9.3" +bytes = "1.9" [build-dependencies] @@ -105,6 +108,10 @@ name = "read_csv_infer_schema" required-features = ["prettyprint", "csv"] path = "./examples/read_csv_infer_schema.rs" +[[example]] +name = "zero_copy_ipc" +path = "examples/zero_copy_ipc.rs" + [[bench]] name = "aggregate_kernels" harness = false diff --git a/arrow/examples/README.md b/arrow/examples/README.md index 87aa6ee0475b..75ab36022a96 100644 --- a/arrow/examples/README.md +++ b/arrow/examples/README.md @@ -24,5 +24,6 @@ - [`dynamic_types.rs`](dynamic_types.rs): Dealing with mixed types dynamically at runtime - [`read_csv.rs`](read_csv.rs): Reading CSV files with explicit schema, pretty printing Arrays - [`read_csv_infer_schema.rs`](read_csv_infer_schema.rs): Reading CSV files, pretty printing Arrays +- [`zero_copy_ipc`](zero_copy_ipc): Zero copy read of Arrow IPC streams and file from memory buffers and `mmap`d files - [`tensor_builder.rs`](tensor_builder.rs): Using tensor builder - [`version.rs`](version.rs): Print the arrow version and exit diff --git a/arrow/examples/zero_copy_ipc.rs b/arrow/examples/zero_copy_ipc.rs new file mode 100644 index 000000000000..f9c443f7ae27 --- /dev/null +++ b/arrow/examples/zero_copy_ipc.rs @@ -0,0 +1,158 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! This example shows how to read arrow IPC files and streams using "zero copy" API +//! +//! Zero copy in this case the Arrow arrays refer directly to a user +//! provided buffer or memory region. + +use arrow::array::{record_batch, RecordBatch}; +use arrow::error::Result; +use arrow_buffer::Buffer; +use arrow_ipc::convert::fb_to_schema; +use arrow_ipc::reader::{read_footer_length, FileDecoder}; +use arrow_ipc::writer::FileWriter; +use arrow_ipc::{root_as_footer, Block}; +use std::path::PathBuf; +use std::sync::Arc; + +fn main() { + println!("Begin"); + + // Data from arrow IPC files can be read via "zero" copy by mmap-ing the file into memory + // and using the FileDecoder API + let ipc_path = ipc_file(); + let ipc_file = std::fs::File::open(&ipc_path.path).expect("failed to open file"); + let mmap = unsafe { memmap2::Mmap::map(&ipc_file).expect("failed to mmap file") }; + + // Convert mmap'ed file to an Arrow `Buffer` to back the arrow arrays. We do + // this by first creating a `bytes::Bytes` (which is zero copy) and then + // creating a buffer from the `Bytes` (which is also zero copy) + let bytes = bytes::Bytes::from_owner(mmap); + let buffer = Buffer::from(bytes); + + // now we use the FileDecoder API (wrapped with some convenience methods) to + // create the Arrays that re-use the data in the underlying buffer + let decoder = IPCBufferDecoder::new(buffer); + assert_eq!(decoder.num_batches(), 3); + + // read the batches back into memory + for i in 0..decoder.num_batches() { + let batch = decoder.get_batch(i).unwrap().expect("failed to read batch"); + println!("Batch {} has {} rows", i, batch.num_rows()); + } +} + +/// Return 3 record batches of example data +fn example_data() -> Vec { + vec![ + record_batch!(("my_column", Int32, [1, 2, 3])).unwrap(), + record_batch!(("my_column", Int32, [4, 5, 6])).unwrap(), + record_batch!(("my_column", Int32, [7, 8, 9])).unwrap(), + ] +} + +/// Return a temporary file that contains an IPC file with 3 record batches +fn ipc_file() -> TempFile { + let path = PathBuf::from("example.arrow"); + let file = std::fs::File::create(&path).unwrap(); + let data = example_data(); + let mut writer = FileWriter::try_new(file, &data[0].schema()).unwrap(); + for batch in &data { + writer.write(batch).unwrap(); + } + writer.finish().unwrap(); + TempFile { path } +} + +/// This structure incrementally decodes [`RecordBatch`]es from an +/// IPC file stored in a Arrow [`Buffer`] +/// +/// It is based on the example in the `FileDecoder` documentation and handles +/// the low level interaction with the Arrow IPC format. +struct IPCBufferDecoder { + /// Buffer with the data + buffer: Buffer, + /// Decoder that reads arrays that refers to the underlying buffers + decoder: FileDecoder, + /// Location of the batches within the buffer + batches: Vec, +} + +impl IPCBufferDecoder { + fn new(buffer: Buffer) -> Self { + let trailer_start = buffer.len() - 10; + let footer_len = read_footer_length(buffer[trailer_start..].try_into().unwrap()).unwrap(); + let footer = root_as_footer(&buffer[trailer_start - footer_len..trailer_start]).unwrap(); + + let schema = fb_to_schema(footer.schema().unwrap()); + + let mut decoder = FileDecoder::new(Arc::new(schema), footer.version()); + + // Read dictionaries + for block in footer.dictionaries().iter().flatten() { + let block_len = block.bodyLength() as usize + block.metaDataLength() as usize; + let data = buffer.slice_with_length(block.offset() as _, block_len); + decoder.read_dictionary(&block, &data).unwrap(); + } + + // convert to Vec from the flatbuffers Vector to avoid having a direct dependency on flatbuffers + let batches = footer + .recordBatches() + .map(|b| b.iter().map(|block| *block).collect()) + .unwrap_or_default(); + + println!("buffer length is {}", buffer.len()); + + Self { + buffer, + decoder, + batches, + } + } + + /// return the number of record batches in this buffer + fn num_batches(&self) -> usize { + self.batches.len() + } + + /// return the record batch at message index `i`. + /// + /// This may return `None` if the IPC message was None + fn get_batch(&self, i: usize) -> Result> { + let block = &self.batches[i]; + println!("Getting block {}", i); + println!("block is {:?}", block); + let data = self + .buffer + .slice_with_length(block.offset() as _, block.metaDataLength() as usize); + self.decoder.read_record_batch(&block, &data) + } +} + +/// This structure deletes the file when it is dropped +struct TempFile { + path: PathBuf, +} + +impl Drop for TempFile { + fn drop(&mut self) { + if let Err(e) = std::fs::remove_file(&self.path) { + println!("Error deleting '{:?}': {:?}", self.path, e); + } + } +}