Skip to content

Commit

Permalink
Add example reading data from an mmaped IPC file
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Jan 15, 2025
1 parent cc18d0f commit b6df782
Show file tree
Hide file tree
Showing 3 changed files with 166 additions and 0 deletions.
7 changes: 7 additions & 0 deletions arrow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ criterion = { version = "0.5", default-features = false }
half = { version = "2.1", default-features = false }
rand = { version = "0.8", default-features = false, features = ["std", "std_rng"] }
serde = { version = "1.0", default-features = false, features = ["derive"] }
# used in examples
memmap2 = "0.9.3"
bytes = "1.9"

[build-dependencies]

Expand All @@ -105,6 +108,10 @@ name = "read_csv_infer_schema"
required-features = ["prettyprint", "csv"]
path = "./examples/read_csv_infer_schema.rs"

[[example]]
name = "zero_copy_ipc"
path = "examples/zero_copy_ipc.rs"

[[bench]]
name = "aggregate_kernels"
harness = false
Expand Down
1 change: 1 addition & 0 deletions arrow/examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,6 @@
- [`dynamic_types.rs`](dynamic_types.rs): Dealing with mixed types dynamically at runtime
- [`read_csv.rs`](read_csv.rs): Reading CSV files with explicit schema, pretty printing Arrays
- [`read_csv_infer_schema.rs`](read_csv_infer_schema.rs): Reading CSV files, pretty printing Arrays
- [`zero_copy_ipc`](zero_copy_ipc): Zero copy read of Arrow IPC streams and file from memory buffers and `mmap`d files
- [`tensor_builder.rs`](tensor_builder.rs): Using tensor builder
- [`version.rs`](version.rs): Print the arrow version and exit
158 changes: 158 additions & 0 deletions arrow/examples/zero_copy_ipc.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! This example shows how to read arrow IPC files and streams using "zero copy" API
//!
//! Zero copy in this case the Arrow arrays refer directly to a user
//! provided buffer or memory region.
use arrow::array::{record_batch, RecordBatch};
use arrow::error::Result;
use arrow_buffer::Buffer;
use arrow_ipc::convert::fb_to_schema;
use arrow_ipc::reader::{read_footer_length, FileDecoder};
use arrow_ipc::writer::FileWriter;
use arrow_ipc::{root_as_footer, Block};
use std::path::PathBuf;
use std::sync::Arc;

fn main() {
println!("Begin");

// Data from arrow IPC files can be read via "zero" copy by mmap-ing the file into memory
// and using the FileDecoder API
let ipc_path = ipc_file();
let ipc_file = std::fs::File::open(&ipc_path.path).expect("failed to open file");
let mmap = unsafe { memmap2::Mmap::map(&ipc_file).expect("failed to mmap file") };

// Convert mmap'ed file to an Arrow `Buffer` to back the arrow arrays. We do
// this by first creating a `bytes::Bytes` (which is zero copy) and then
// creating a buffer from the `Bytes` (which is also zero copy)
let bytes = bytes::Bytes::from_owner(mmap);
let buffer = Buffer::from(bytes);

// now we use the FileDecoder API (wrapped with some convenience methods) to
// create the Arrays that re-use the data in the underlying buffer
let decoder = IPCBufferDecoder::new(buffer);
assert_eq!(decoder.num_batches(), 3);

// read the batches back into memory
for i in 0..decoder.num_batches() {
let batch = decoder.get_batch(i).unwrap().expect("failed to read batch");
println!("Batch {} has {} rows", i, batch.num_rows());
}
}

/// Return 3 record batches of example data
fn example_data() -> Vec<RecordBatch> {
vec![
record_batch!(("my_column", Int32, [1, 2, 3])).unwrap(),
record_batch!(("my_column", Int32, [4, 5, 6])).unwrap(),
record_batch!(("my_column", Int32, [7, 8, 9])).unwrap(),
]
}

/// Return a temporary file that contains an IPC file with 3 record batches
fn ipc_file() -> TempFile {
let path = PathBuf::from("example.arrow");
let file = std::fs::File::create(&path).unwrap();
let data = example_data();
let mut writer = FileWriter::try_new(file, &data[0].schema()).unwrap();
for batch in &data {
writer.write(batch).unwrap();
}
writer.finish().unwrap();
TempFile { path }
}

/// This structure incrementally decodes [`RecordBatch`]es from an
/// IPC file stored in a Arrow [`Buffer`]
///
/// It is based on the example in the `FileDecoder` documentation and handles
/// the low level interaction with the Arrow IPC format.
struct IPCBufferDecoder {
/// Buffer with the data
buffer: Buffer,
/// Decoder that reads arrays that refers to the underlying buffers
decoder: FileDecoder,
/// Location of the batches within the buffer
batches: Vec<Block>,
}

impl IPCBufferDecoder {
fn new(buffer: Buffer) -> Self {
let trailer_start = buffer.len() - 10;
let footer_len = read_footer_length(buffer[trailer_start..].try_into().unwrap()).unwrap();
let footer = root_as_footer(&buffer[trailer_start - footer_len..trailer_start]).unwrap();

let schema = fb_to_schema(footer.schema().unwrap());

let mut decoder = FileDecoder::new(Arc::new(schema), footer.version());

// Read dictionaries
for block in footer.dictionaries().iter().flatten() {
let block_len = block.bodyLength() as usize + block.metaDataLength() as usize;
let data = buffer.slice_with_length(block.offset() as _, block_len);
decoder.read_dictionary(&block, &data).unwrap();
}

// convert to Vec from the flatbuffers Vector to avoid having a direct dependency on flatbuffers
let batches = footer
.recordBatches()
.map(|b| b.iter().map(|block| *block).collect())
.unwrap_or_default();

println!("buffer length is {}", buffer.len());

Self {
buffer,
decoder,
batches,
}
}

/// return the number of record batches in this buffer
fn num_batches(&self) -> usize {
self.batches.len()
}

/// return the record batch at message index `i`.
///
/// This may return `None` if the IPC message was None
fn get_batch(&self, i: usize) -> Result<Option<RecordBatch>> {
let block = &self.batches[i];
println!("Getting block {}", i);
println!("block is {:?}", block);
let data = self
.buffer
.slice_with_length(block.offset() as _, block.metaDataLength() as usize);
self.decoder.read_record_batch(&block, &data)
}
}

/// This structure deletes the file when it is dropped
struct TempFile {
path: PathBuf,
}

impl Drop for TempFile {
fn drop(&mut self) {
if let Err(e) = std::fs::remove_file(&self.path) {
println!("Error deleting '{:?}': {:?}", self.path, e);
}
}
}

0 comments on commit b6df782

Please sign in to comment.