Skip to content

Commit

Permalink
feat(io): Convert FlatGeobuf reader to RecordBatchReader iterator (#933)
Browse files Browse the repository at this point in the history
This PR updates the FlatGeobuf reader (the sync version only for now) to
expose data as an iterator. This means that applications like Python can
access data larger than memory, and process it in batches.

- [x] This was blocked on Flatgeobuf exposing the `Seekable` and
`NotSeekable` marker structs, but that happened in
flatgeobuf/flatgeobuf#402

Closes #932

For #594, for
#596
  • Loading branch information
kylebarron authored Dec 23, 2024
1 parent 8af6586 commit 9a1e946
Show file tree
Hide file tree
Showing 14 changed files with 449 additions and 143 deletions.
7 changes: 4 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions js/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 7 additions & 4 deletions js/src/io/flatgeobuf.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use std::io::Cursor;

use arrow_array::RecordBatchReader;
use arrow_wasm::Table;
use geoarrow::io::flatgeobuf::{read_flatgeobuf as _read_flatgeobuf, FlatGeobufReaderOptions};
use geoarrow::io::flatgeobuf::{FlatGeobufReaderBuilder, FlatGeobufReaderOptions};
// use parquet_wasm::utils::assert_parquet_file_not_empty;
use wasm_bindgen::prelude::*;

Expand All @@ -27,12 +28,14 @@ use crate::error::WasmResult;
#[wasm_bindgen(js_name = readFlatGeobuf)]
pub fn read_flatgeobuf(file: &[u8], batch_size: Option<usize>) -> WasmResult<Table> {
// assert_parquet_file_not_empty(parquet_file)?;
let mut cursor = Cursor::new(file);
let cursor = Cursor::new(file);
let options = FlatGeobufReaderOptions {
batch_size,
..Default::default()
};
let geo_table = _read_flatgeobuf(&mut cursor, options)?;
let (batches, schema) = geo_table.into_inner();
let reader_builder = FlatGeobufReaderBuilder::open(cursor)?;
let record_batch_reader = reader_builder.read(options)?;
let schema = record_batch_reader.schema();
let batches = record_batch_reader.collect::<std::result::Result<_, _>>()?;
Ok(Table::new(schema, batches))
}
10 changes: 7 additions & 3 deletions python/geoarrow-io/src/io/flatgeobuf/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ use crate::io::input::sync::FileWriter;
use crate::io::input::{construct_reader, AnyFileReader};
use crate::util::to_arro3_table;

use arrow::array::RecordBatchReader;
use geoarrow::io::flatgeobuf::{
read_flatgeobuf as _read_flatgeobuf, write_flatgeobuf_with_options as _write_flatgeobuf,
write_flatgeobuf_with_options as _write_flatgeobuf, FlatGeobufReaderBuilder,
FlatGeobufReaderOptions, FlatGeobufWriterOptions,
};
use geoarrow::table::Table;
use pyo3::prelude::*;
use pyo3_arrow::export::Arro3Table;
use pyo3_arrow::input::AnyRecordBatch;
Expand Down Expand Up @@ -44,13 +46,15 @@ pub fn read_flatgeobuf(
Ok(to_arro3_table(table))
})
}
AnyFileReader::Sync(mut sync_reader) => {
AnyFileReader::Sync(sync_reader) => {
let options = FlatGeobufReaderOptions {
batch_size: Some(batch_size),
bbox,
..Default::default()
};
let table = _read_flatgeobuf(&mut sync_reader, options)?;
let reader_builder = FlatGeobufReaderBuilder::open(sync_reader)?;
let reader = reader_builder.read(options)?;
let table = Table::try_from(Box::new(reader) as Box<dyn RecordBatchReader>).unwrap();
Ok(to_arro3_table(table))
}
}
Expand Down
5 changes: 2 additions & 3 deletions rust/geoarrow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ chrono = { version = "0.4" }
dbase = "0.5.0"
enum-as-inner = "0.6.1"
# TODO: update to 4.6 when released
flatgeobuf = { git = "https://github.com/kylebarron/flatgeobuf", rev = "06e987d6d3d73edb95124a14cdaab9ee8e6e57ac", version = "4.5", optional = true, default-features = false }
flatgeobuf = { git = "https://github.com/flatgeobuf/flatgeobuf", rev = "f7563617549f8ab0c111e83ee423996f100ddb0c", version = "4.5", optional = true, default-features = false }
futures = { version = "0.3", optional = true }
gdal = { version = "0.17", optional = true }
geo = "0.29.3"
Expand All @@ -69,8 +69,7 @@ geo-traits = "0.2"
geos = { version = "9.1.1", features = ["v3_10_0"], optional = true }
geozero = { version = "0.14", features = ["with-wkb"] }
half = { version = "2.4.1" }
# TODO: update to 0.9 when released
http-range-client = { git = "https://github.com/pka/http-range-client", rev = "5699e32fafc416ce683bfbf1d179f80b0b6549a3", version = "0.8", optional = true, default-features = false }
http-range-client = { version = "0.9", optional = true, default-features = false }
indexmap = { version = "2" }
lexical-core = { version = "0.8.5" }
num-traits = "0.2.19"
Expand Down
12 changes: 9 additions & 3 deletions rust/geoarrow/benches/area.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
use criterion::{criterion_group, criterion_main, Criterion};
use geoarrow::algorithm::geo::Area;
use geoarrow::array::{AsChunkedNativeArray, MultiPolygonArray};
use geoarrow::io::flatgeobuf::read_flatgeobuf;
use geoarrow::io::flatgeobuf::FlatGeobufReaderBuilder;
use geoarrow::table::Table;
use std::fs::File;

fn load_file() -> MultiPolygonArray {
let mut file = File::open("fixtures/flatgeobuf/countries.fgb").unwrap();
let table = read_flatgeobuf(&mut file, Default::default()).unwrap();
let file = File::open("fixtures/flatgeobuf/countries.fgb").unwrap();
let reader_builder = FlatGeobufReaderBuilder::open(file).unwrap();
let record_batch_reader = reader_builder.read(Default::default()).unwrap();
let table =
Table::try_from(Box::new(record_batch_reader) as Box<dyn arrow_array::RecordBatchReader>)
.unwrap();

table
.geometry_column(None)
.unwrap()
Expand Down
2 changes: 1 addition & 1 deletion rust/geoarrow/src/io/flatgeobuf/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,5 @@ mod writer;

#[cfg(feature = "flatgeobuf_async")]
pub use reader::read_flatgeobuf_async;
pub use reader::{read_flatgeobuf, FlatGeobufReaderOptions};
pub use reader::{FlatGeobufReader, FlatGeobufReaderBuilder, FlatGeobufReaderOptions};
pub use writer::{write_flatgeobuf, write_flatgeobuf_with_options, FlatGeobufWriterOptions};
2 changes: 1 addition & 1 deletion rust/geoarrow/src/io/flatgeobuf/reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@ mod sync;
pub use common::FlatGeobufReaderOptions;
#[cfg(feature = "flatgeobuf_async")]
pub use r#async::read_flatgeobuf_async;
pub use sync::read_flatgeobuf;
pub use sync::{FlatGeobufReader, FlatGeobufReaderBuilder};
Loading

0 comments on commit 9a1e946

Please sign in to comment.