From 9a1e946a833617dbe06438d14e548ed6249cf079 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Mon, 23 Dec 2024 14:45:10 -0500 Subject: [PATCH] feat(io): Convert FlatGeobuf reader to RecordBatchReader iterator (#933) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This PR updates the FlatGeobuf reader (the sync version only for now) to expose data as an iterator. This means that applications like Python can access data larger than memory, and process it in batches. - [x] This was blocked on Flatgeobuf exposing the `Seekable` and `NotSeekable` marker structs, but that happened in https://github.com/flatgeobuf/flatgeobuf/pull/402 Closes https://github.com/geoarrow/geoarrow-rs/issues/932 For https://github.com/geoarrow/geoarrow-rs/issues/594, for https://github.com/geoarrow/geoarrow-rs/issues/596 --- Cargo.lock | 7 +- js/Cargo.lock | 4 +- js/src/io/flatgeobuf.rs | 11 +- python/geoarrow-io/src/io/flatgeobuf/sync.rs | 10 +- rust/geoarrow/Cargo.toml | 5 +- rust/geoarrow/benches/area.rs | 12 +- rust/geoarrow/src/io/flatgeobuf/mod.rs | 2 +- rust/geoarrow/src/io/flatgeobuf/reader/mod.rs | 2 +- .../geoarrow/src/io/flatgeobuf/reader/sync.rs | 472 ++++++++++++++---- rust/geoarrow/src/io/flatgeobuf/writer.rs | 30 +- .../io/geozero/table/builder/properties.rs | 5 +- .../src/io/geozero/table/builder/table.rs | 2 - .../src/udf/native/measurement/area.rs | 15 +- .../src/udf/native/processing/centroid.rs | 15 +- 14 files changed, 449 insertions(+), 143 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1be65168..dbf5fa5f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1534,7 +1534,7 @@ dependencies = [ [[package]] name = "flatgeobuf" version = "4.5.0" -source = "git+https://github.com/kylebarron/flatgeobuf?rev=06e987d6d3d73edb95124a14cdaab9ee8e6e57ac#06e987d6d3d73edb95124a14cdaab9ee8e6e57ac" +source = "git+https://github.com/flatgeobuf/flatgeobuf?rev=f7563617549f8ab0c111e83ee423996f100ddb0c#f7563617549f8ab0c111e83ee423996f100ddb0c" dependencies = [ "byteorder", "bytes", @@ -2121,8 +2121,9 @@ dependencies = [ [[package]] name = "http-range-client" -version = "0.8.0" -source = "git+https://github.com/pka/http-range-client?rev=5699e32fafc416ce683bfbf1d179f80b0b6549a3#5699e32fafc416ce683bfbf1d179f80b0b6549a3" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3b0cb8b2a6444be75e1bb3bfa79911cae70865df20a36d7c70945273b13b641" dependencies = [ "async-trait", "byteorder", diff --git a/js/Cargo.lock b/js/Cargo.lock index aaede781..65a6d6d8 100644 --- a/js/Cargo.lock +++ b/js/Cargo.lock @@ -895,7 +895,7 @@ dependencies = [ [[package]] name = "flatgeobuf" version = "4.5.0" -source = "git+https://github.com/kylebarron/flatgeobuf?rev=06e987d6d3d73edb95124a14cdaab9ee8e6e57ac#06e987d6d3d73edb95124a14cdaab9ee8e6e57ac" +source = "git+https://github.com/flatgeobuf/flatgeobuf?rev=f7563617549f8ab0c111e83ee423996f100ddb0c#f7563617549f8ab0c111e83ee423996f100ddb0c" dependencies = [ "byteorder", "fallible-streaming-iterator", @@ -1092,7 +1092,7 @@ dependencies = [ [[package]] name = "geoarrow" -version = "0.4.0-beta.2" +version = "0.4.0-beta.3" dependencies = [ "arrow", "arrow-array", diff --git a/js/src/io/flatgeobuf.rs b/js/src/io/flatgeobuf.rs index 50c1f2d4..3465bd01 100644 --- a/js/src/io/flatgeobuf.rs +++ b/js/src/io/flatgeobuf.rs @@ -1,7 +1,8 @@ use std::io::Cursor; +use arrow_array::RecordBatchReader; use arrow_wasm::Table; -use geoarrow::io::flatgeobuf::{read_flatgeobuf as _read_flatgeobuf, FlatGeobufReaderOptions}; +use geoarrow::io::flatgeobuf::{FlatGeobufReaderBuilder, FlatGeobufReaderOptions}; // use parquet_wasm::utils::assert_parquet_file_not_empty; use wasm_bindgen::prelude::*; @@ -27,12 +28,14 @@ use crate::error::WasmResult; #[wasm_bindgen(js_name = readFlatGeobuf)] pub fn read_flatgeobuf(file: &[u8], batch_size: Option) -> WasmResult { // assert_parquet_file_not_empty(parquet_file)?; - let mut cursor = Cursor::new(file); + let cursor = Cursor::new(file); let options = FlatGeobufReaderOptions { batch_size, ..Default::default() }; - let geo_table = _read_flatgeobuf(&mut cursor, options)?; - let (batches, schema) = geo_table.into_inner(); + let reader_builder = FlatGeobufReaderBuilder::open(cursor)?; + let record_batch_reader = reader_builder.read(options)?; + let schema = record_batch_reader.schema(); + let batches = record_batch_reader.collect::>()?; Ok(Table::new(schema, batches)) } diff --git a/python/geoarrow-io/src/io/flatgeobuf/sync.rs b/python/geoarrow-io/src/io/flatgeobuf/sync.rs index 0c6cbf7e..962af317 100644 --- a/python/geoarrow-io/src/io/flatgeobuf/sync.rs +++ b/python/geoarrow-io/src/io/flatgeobuf/sync.rs @@ -3,10 +3,12 @@ use crate::io::input::sync::FileWriter; use crate::io::input::{construct_reader, AnyFileReader}; use crate::util::to_arro3_table; +use arrow::array::RecordBatchReader; use geoarrow::io::flatgeobuf::{ - read_flatgeobuf as _read_flatgeobuf, write_flatgeobuf_with_options as _write_flatgeobuf, + write_flatgeobuf_with_options as _write_flatgeobuf, FlatGeobufReaderBuilder, FlatGeobufReaderOptions, FlatGeobufWriterOptions, }; +use geoarrow::table::Table; use pyo3::prelude::*; use pyo3_arrow::export::Arro3Table; use pyo3_arrow::input::AnyRecordBatch; @@ -44,13 +46,15 @@ pub fn read_flatgeobuf( Ok(to_arro3_table(table)) }) } - AnyFileReader::Sync(mut sync_reader) => { + AnyFileReader::Sync(sync_reader) => { let options = FlatGeobufReaderOptions { batch_size: Some(batch_size), bbox, ..Default::default() }; - let table = _read_flatgeobuf(&mut sync_reader, options)?; + let reader_builder = FlatGeobufReaderBuilder::open(sync_reader)?; + let reader = reader_builder.read(options)?; + let table = Table::try_from(Box::new(reader) as Box).unwrap(); Ok(to_arro3_table(table)) } } diff --git a/rust/geoarrow/Cargo.toml b/rust/geoarrow/Cargo.toml index 9a982bb4..f21f35c2 100644 --- a/rust/geoarrow/Cargo.toml +++ b/rust/geoarrow/Cargo.toml @@ -60,7 +60,7 @@ chrono = { version = "0.4" } dbase = "0.5.0" enum-as-inner = "0.6.1" # TODO: update to 4.6 when released -flatgeobuf = { git = "https://github.com/kylebarron/flatgeobuf", rev = "06e987d6d3d73edb95124a14cdaab9ee8e6e57ac", version = "4.5", optional = true, default-features = false } +flatgeobuf = { git = "https://github.com/flatgeobuf/flatgeobuf", rev = "f7563617549f8ab0c111e83ee423996f100ddb0c", version = "4.5", optional = true, default-features = false } futures = { version = "0.3", optional = true } gdal = { version = "0.17", optional = true } geo = "0.29.3" @@ -69,8 +69,7 @@ geo-traits = "0.2" geos = { version = "9.1.1", features = ["v3_10_0"], optional = true } geozero = { version = "0.14", features = ["with-wkb"] } half = { version = "2.4.1" } -# TODO: update to 0.9 when released -http-range-client = { git = "https://github.com/pka/http-range-client", rev = "5699e32fafc416ce683bfbf1d179f80b0b6549a3", version = "0.8", optional = true, default-features = false } +http-range-client = { version = "0.9", optional = true, default-features = false } indexmap = { version = "2" } lexical-core = { version = "0.8.5" } num-traits = "0.2.19" diff --git a/rust/geoarrow/benches/area.rs b/rust/geoarrow/benches/area.rs index 8b2415a4..304d40d2 100644 --- a/rust/geoarrow/benches/area.rs +++ b/rust/geoarrow/benches/area.rs @@ -1,12 +1,18 @@ use criterion::{criterion_group, criterion_main, Criterion}; use geoarrow::algorithm::geo::Area; use geoarrow::array::{AsChunkedNativeArray, MultiPolygonArray}; -use geoarrow::io::flatgeobuf::read_flatgeobuf; +use geoarrow::io::flatgeobuf::FlatGeobufReaderBuilder; +use geoarrow::table::Table; use std::fs::File; fn load_file() -> MultiPolygonArray { - let mut file = File::open("fixtures/flatgeobuf/countries.fgb").unwrap(); - let table = read_flatgeobuf(&mut file, Default::default()).unwrap(); + let file = File::open("fixtures/flatgeobuf/countries.fgb").unwrap(); + let reader_builder = FlatGeobufReaderBuilder::open(file).unwrap(); + let record_batch_reader = reader_builder.read(Default::default()).unwrap(); + let table = + Table::try_from(Box::new(record_batch_reader) as Box) + .unwrap(); + table .geometry_column(None) .unwrap() diff --git a/rust/geoarrow/src/io/flatgeobuf/mod.rs b/rust/geoarrow/src/io/flatgeobuf/mod.rs index fdd435f8..85acb33d 100644 --- a/rust/geoarrow/src/io/flatgeobuf/mod.rs +++ b/rust/geoarrow/src/io/flatgeobuf/mod.rs @@ -5,5 +5,5 @@ mod writer; #[cfg(feature = "flatgeobuf_async")] pub use reader::read_flatgeobuf_async; -pub use reader::{read_flatgeobuf, FlatGeobufReaderOptions}; +pub use reader::{FlatGeobufReader, FlatGeobufReaderBuilder, FlatGeobufReaderOptions}; pub use writer::{write_flatgeobuf, write_flatgeobuf_with_options, FlatGeobufWriterOptions}; diff --git a/rust/geoarrow/src/io/flatgeobuf/reader/mod.rs b/rust/geoarrow/src/io/flatgeobuf/reader/mod.rs index 70125040..250aebad 100644 --- a/rust/geoarrow/src/io/flatgeobuf/reader/mod.rs +++ b/rust/geoarrow/src/io/flatgeobuf/reader/mod.rs @@ -8,4 +8,4 @@ mod sync; pub use common::FlatGeobufReaderOptions; #[cfg(feature = "flatgeobuf_async")] pub use r#async::read_flatgeobuf_async; -pub use sync::read_flatgeobuf; +pub use sync::{FlatGeobufReader, FlatGeobufReaderBuilder}; diff --git a/rust/geoarrow/src/io/flatgeobuf/reader/sync.rs b/rust/geoarrow/src/io/flatgeobuf/reader/sync.rs index 4d08704c..c84ba6f3 100644 --- a/rust/geoarrow/src/io/flatgeobuf/reader/sync.rs +++ b/rust/geoarrow/src/io/flatgeobuf/reader/sync.rs @@ -19,120 +19,361 @@ //! the GeomProcessor conversion from geozero, after initializing buffers with a better estimate of //! the total length. -use crate::algorithm::native::DowncastTable; +use crate::array::metadata::ArrayMetadata; use crate::array::*; -use crate::datatypes::Dimension; +use crate::datatypes::{Dimension, NativeType}; use crate::error::{GeoArrowError, Result}; use crate::io::flatgeobuf::reader::common::{infer_schema, parse_crs, FlatGeobufReaderOptions}; use crate::io::geozero::array::GeometryStreamBuilder; use crate::io::geozero::table::{GeoTableBuilder, GeoTableBuilderOptions}; -use crate::table::Table; -use flatgeobuf::{FallibleStreamingIterator, FgbReader, GeometryType}; +use arrow_array::{RecordBatch, RecordBatchReader}; +use arrow_schema::{ArrowError, Schema, SchemaRef}; +use flatgeobuf::{ + FallibleStreamingIterator, FeatureIter, FgbReader, GeometryType, NotSeekable, Seekable, +}; use geozero::{FeatureProcessor, FeatureProperties}; use std::io::{Read, Seek}; +use std::sync::Arc; -/// Read a FlatGeobuf file to a Table -pub fn read_flatgeobuf( - file: &mut R, - options: FlatGeobufReaderOptions, -) -> Result
{ - let reader = FgbReader::open(file)?; - - let header = reader.header(); - if header.has_m() | header.has_t() | header.has_tm() { - return Err(GeoArrowError::General( - "Only XY and XYZ dimensions are supported".to_string(), - )); - } - let has_z = header.has_z(); - - let schema = infer_schema(header); - let geometry_type = header.geometry_type(); - let array_metadata = parse_crs(header.crs()); - - let mut selection = if let Some((min_x, min_y, max_x, max_y)) = options.bbox { - reader.select_bbox(min_x, min_y, max_x, max_y)? - } else { - reader.select_all()? - }; - - let features_count = selection.features_count(); - - let options = GeoTableBuilderOptions::new( - options.coord_type, - true, - options.batch_size, - Some(schema), - features_count, - array_metadata, - ); - - macro_rules! impl_read { - ($builder:ty, $dim:expr) => {{ - let mut builder = GeoTableBuilder::<$builder>::new_with_options($dim, options); - while let Some(feature) = selection.next()? { - feature.process_properties(&mut builder)?; - builder.properties_end()?; - - builder.push_geometry(feature.geometry_trait()?.as_ref())?; - - builder.feature_end(0)?; - } - selection.process_features(&mut builder)?; - builder.finish() - }}; +/// A builder for [FlatGeobufReader] +pub struct FlatGeobufReaderBuilder { + reader: FgbReader, +} + +impl FlatGeobufReaderBuilder { + /// Open a new FlatGeobuf reader + pub fn open(reader: R) -> Result { + let reader = FgbReader::open(reader)?; + Ok(Self { reader }) } - match (geometry_type, has_z) { - (GeometryType::Point, false) => { - impl_read!(PointBuilder, Dimension::XY) - } - (GeometryType::LineString, false) => { - impl_read!(LineStringBuilder, Dimension::XY) - } - (GeometryType::Polygon, false) => { - impl_read!(PolygonBuilder, Dimension::XY) + fn infer_from_header(&self) -> Result<(NativeType, SchemaRef, Arc)> { + use Dimension::*; + + let header = self.reader.header(); + if header.has_m() | header.has_t() | header.has_tm() { + return Err(GeoArrowError::General( + "Only XY and XYZ dimensions are supported".to_string(), + )); } - (GeometryType::MultiPoint, false) => { - impl_read!(MultiPointBuilder, Dimension::XY) + let has_z = header.has_z(); + + let properties_schema = infer_schema(header); + let geometry_type = header.geometry_type(); + let array_metadata = parse_crs(header.crs()); + // TODO: pass through arg + let coord_type = CoordType::Interleaved; + let data_type = match (geometry_type, has_z) { + (GeometryType::Point, false) => NativeType::Point(coord_type, XY), + (GeometryType::LineString, false) => NativeType::LineString(coord_type, XY), + (GeometryType::Polygon, false) => NativeType::Polygon(coord_type, XY), + (GeometryType::MultiPoint, false) => NativeType::MultiPoint(coord_type, XY), + (GeometryType::MultiLineString, false) => NativeType::MultiLineString(coord_type, XY), + (GeometryType::MultiPolygon, false) => NativeType::MultiPolygon(coord_type, XY), + (GeometryType::Point, true) => NativeType::Point(coord_type, XYZ), + (GeometryType::LineString, true) => NativeType::LineString(coord_type, XYZ), + (GeometryType::Polygon, true) => NativeType::Polygon(coord_type, XYZ), + (GeometryType::MultiPoint, true) => NativeType::MultiPoint(coord_type, XYZ), + (GeometryType::MultiLineString, true) => NativeType::MultiLineString(coord_type, XYZ), + (GeometryType::MultiPolygon, true) => NativeType::MultiPolygon(coord_type, XYZ), + (GeometryType::Unknown, _) => NativeType::Geometry(coord_type), + _ => panic!("Unsupported type"), + }; + Ok((data_type, properties_schema, array_metadata)) + } + + /// Read features sequentially, without using `Seek` + pub fn read_seq( + self, + options: FlatGeobufReaderOptions, + ) -> Result> { + let (data_type, properties_schema, array_metadata) = self.infer_from_header()?; + if let Some((min_x, min_y, max_x, max_y)) = options.bbox { + let selection = self.reader.select_bbox_seq(min_x, min_y, max_x, max_y)?; + let num_rows = selection.features_count(); + Ok(FlatGeobufReader { + selection, + data_type, + batch_size: options.batch_size.unwrap_or(65_536), + properties_schema, + num_rows_remaining: num_rows, + array_metadata, + }) + } else { + let selection = self.reader.select_all_seq()?; + let num_rows = selection.features_count(); + Ok(FlatGeobufReader { + selection, + data_type, + batch_size: options.batch_size.unwrap_or(65_536), + properties_schema, + num_rows_remaining: num_rows, + array_metadata, + }) } - (GeometryType::MultiLineString, false) => impl_read!(MultiLineStringBuilder, Dimension::XY), - (GeometryType::MultiPolygon, false) => impl_read!(MultiPolygonBuilder, Dimension::XY), - (GeometryType::Unknown, false) => { - let mut builder = - GeoTableBuilder::::new_with_options(Dimension::XY, options); - selection.process_features(&mut builder)?; - let table = builder.finish()?; - table.downcast() + } +} + +impl FlatGeobufReaderBuilder { + /// Read features + pub fn read(self, options: FlatGeobufReaderOptions) -> Result> { + let (data_type, properties_schema, array_metadata) = self.infer_from_header()?; + if let Some((min_x, min_y, max_x, max_y)) = options.bbox { + let selection = self.reader.select_bbox(min_x, min_y, max_x, max_y)?; + let num_rows = selection.features_count(); + Ok(FlatGeobufReader { + selection, + data_type, + batch_size: options.batch_size.unwrap_or(65_536), + properties_schema, + num_rows_remaining: num_rows, + array_metadata, + }) + } else { + let selection = self.reader.select_all()?; + let num_rows = selection.features_count(); + Ok(FlatGeobufReader { + selection, + data_type, + batch_size: options.batch_size.unwrap_or(65_536), + properties_schema, + num_rows_remaining: num_rows, + array_metadata, + }) } - (GeometryType::Point, true) => { - impl_read!(PointBuilder, Dimension::XYZ) + } +} + +/// An iterator over record batches from a FlatGeobuf file. +/// +/// This implements [arrow_array::RecordBatchReader], which you can use to access data. +pub struct FlatGeobufReader { + selection: FeatureIter, + data_type: NativeType, + batch_size: usize, + properties_schema: SchemaRef, + num_rows_remaining: Option, + array_metadata: Arc, +} + +impl FlatGeobufReader { + fn construct_options(&self) -> GeoTableBuilderOptions { + let coord_type = self.data_type.coord_type(); + let mut batch_size = self.batch_size; + if let Some(num_rows_remaining) = self.num_rows_remaining { + batch_size = batch_size.min(num_rows_remaining); } - (GeometryType::LineString, true) => { - impl_read!(LineStringBuilder, Dimension::XYZ) + GeoTableBuilderOptions::new( + coord_type, + false, + Some(batch_size), + Some(self.properties_schema.clone()), + self.num_rows_remaining, + self.array_metadata.clone(), + ) + } +} + +impl FlatGeobufReader { + fn process_batch(&mut self) -> Result> { + let options = self.construct_options(); + let batch_size = options.batch_size; + + macro_rules! impl_read { + ($builder:expr) => {{ + let mut row_count = 0; + loop { + if row_count >= batch_size { + let (batches, _schema) = $builder.finish()?.into_inner(); + assert_eq!(batches.len(), 1); + return Ok(Some(batches.into_iter().next().unwrap())); + } + + if let Some(feature) = self.selection.next()? { + feature.process_properties(&mut $builder)?; + $builder.properties_end()?; + + $builder.push_geometry(feature.geometry_trait()?.as_ref())?; + + $builder.feature_end(0)?; + row_count += 1; + } else { + return Ok(None); + } + } + }}; } - (GeometryType::Polygon, true) => { - impl_read!(PolygonBuilder, Dimension::XYZ) + + match self.data_type { + NativeType::Point(_, dim) => { + let mut builder = GeoTableBuilder::::new_with_options(dim, options); + impl_read!(builder) + } + NativeType::LineString(_, dim) => { + let mut builder = + GeoTableBuilder::::new_with_options(dim, options); + impl_read!(builder) + } + NativeType::Polygon(_, dim) => { + let mut builder = GeoTableBuilder::::new_with_options(dim, options); + impl_read!(builder) + } + NativeType::MultiPoint(_, dim) => { + let mut builder = + GeoTableBuilder::::new_with_options(dim, options); + impl_read!(builder) + } + NativeType::MultiLineString(_, dim) => { + let mut builder = + GeoTableBuilder::::new_with_options(dim, options); + impl_read!(builder) + } + NativeType::MultiPolygon(_, dim) => { + let mut builder = + GeoTableBuilder::::new_with_options(dim, options); + impl_read!(builder) + } + NativeType::Geometry(_) | NativeType::GeometryCollection(_, _) => { + let mut builder = GeoTableBuilder::::new_with_options( + // TODO: I think this is unused? remove. + Dimension::XY, + options, + ); + impl_read!(builder) + } + geom_type => Err(GeoArrowError::NotYetImplemented(format!( + "Parsing FlatGeobuf from {:?} geometry type not yet supported", + geom_type + ))), } - (GeometryType::MultiPoint, true) => { - impl_read!(MultiPointBuilder, Dimension::XYZ) + } +} + +impl FlatGeobufReader { + fn process_batch(&mut self) -> Result> { + let options = self.construct_options(); + let batch_size = options.batch_size; + + macro_rules! impl_read { + ($builder:expr) => {{ + let mut row_count = 0; + loop { + if row_count >= batch_size { + let (batches, _schema) = $builder.finish()?.into_inner(); + assert_eq!(batches.len(), 1); + return Ok(Some(batches.into_iter().next().unwrap())); + } + + if let Some(feature) = self.selection.next()? { + feature.process_properties(&mut $builder)?; + $builder.properties_end()?; + + $builder.push_geometry(feature.geometry_trait()?.as_ref())?; + + $builder.feature_end(0)?; + row_count += 1; + } else { + return Ok(None); + } + } + }}; } - (GeometryType::MultiLineString, true) => impl_read!(MultiLineStringBuilder, Dimension::XYZ), - (GeometryType::MultiPolygon, true) => impl_read!(MultiPolygonBuilder, Dimension::XYZ), - (GeometryType::Unknown, true) => { - let mut builder = - GeoTableBuilder::::new_with_options(Dimension::XYZ, options); - selection.process_features(&mut builder)?; - let table = builder.finish()?; - // TODO: 3d downcasting not implemented - // table.downcast() - Ok(table) + + match self.data_type { + NativeType::Point(_, dim) => { + let mut builder = GeoTableBuilder::::new_with_options(dim, options); + impl_read!(builder) + } + NativeType::LineString(_, dim) => { + let mut builder = + GeoTableBuilder::::new_with_options(dim, options); + impl_read!(builder) + } + NativeType::Polygon(_, dim) => { + let mut builder = GeoTableBuilder::::new_with_options(dim, options); + impl_read!(builder) + } + NativeType::MultiPoint(_, dim) => { + let mut builder = + GeoTableBuilder::::new_with_options(dim, options); + impl_read!(builder) + } + NativeType::MultiLineString(_, dim) => { + let mut builder = + GeoTableBuilder::::new_with_options(dim, options); + impl_read!(builder) + } + NativeType::MultiPolygon(_, dim) => { + let mut builder = + GeoTableBuilder::::new_with_options(dim, options); + impl_read!(builder) + } + NativeType::Geometry(_) => { + let mut builder = GeoTableBuilder::::new_with_options( + // TODO: I think this is unused? remove. + Dimension::XY, + options, + ); + impl_read!(builder) + } + // NativeType::GeometryCollection(_, dim) => { + // let mut builder = + // GeoTableBuilder::::new_with_options(dim, options); + // impl_read!(builder) + // } + geom_type => Err(GeoArrowError::NotYetImplemented(format!( + "Parsing FlatGeobuf from {:?} geometry type not yet supported", + geom_type + ))), } - // TODO: Parse into a GeometryCollection array and then downcast to a single-typed array if possible. - geom_type => Err(GeoArrowError::NotYetImplemented(format!( - "Parsing FlatGeobuf from {:?} geometry type not yet supported", - geom_type - ))), + } +} + +impl Iterator for FlatGeobufReader { + type Item = std::result::Result; + + fn next(&mut self) -> Option { + self.process_batch() + .map_err(|err| ArrowError::ExternalError(Box::new(err))) + .transpose() + } +} + +impl RecordBatchReader for FlatGeobufReader { + fn schema(&self) -> SchemaRef { + let geom_field = + self.data_type + .to_field_with_metadata("geometry", true, &self.array_metadata); + let mut fields = self.properties_schema.fields().to_vec(); + fields.push(Arc::new(geom_field)); + Arc::new(Schema::new_with_metadata( + fields, + self.properties_schema.metadata().clone(), + )) + } +} + +impl Iterator for FlatGeobufReader { + type Item = std::result::Result; + + fn next(&mut self) -> Option { + self.process_batch() + .map_err(|err| ArrowError::ExternalError(Box::new(err))) + .transpose() + } +} + +impl RecordBatchReader for FlatGeobufReader { + fn schema(&self) -> SchemaRef { + let geom_field = + self.data_type + .to_field_with_metadata("geometry", true, &self.array_metadata); + let mut fields = self.properties_schema.fields().to_vec(); + fields.push(Arc::new(geom_field)); + Arc::new(Schema::new_with_metadata( + fields, + self.properties_schema.metadata().clone(), + )) } } @@ -144,27 +385,42 @@ mod test { use arrow_schema::DataType; use crate::datatypes::NativeType; + use crate::table::Table; use super::*; #[test] fn test_countries() { - let mut filein = BufReader::new(File::open("fixtures/flatgeobuf/countries.fgb").unwrap()); - let _table = read_flatgeobuf(&mut filein, Default::default()).unwrap(); + let filein = BufReader::new(File::open("fixtures/flatgeobuf/countries.fgb").unwrap()); + let reader_builder = FlatGeobufReaderBuilder::open(filein).unwrap(); + let record_batch_reader = reader_builder.read(Default::default()).unwrap(); + let _batches = record_batch_reader + .collect::, _>>() + .unwrap(); } #[test] fn test_nz_buildings() { - let mut filein = BufReader::new( + let filein = BufReader::new( File::open("fixtures/flatgeobuf/nz-building-outlines-small.fgb").unwrap(), ); - let _table = read_flatgeobuf(&mut filein, Default::default()).unwrap(); + let reader_builder = FlatGeobufReaderBuilder::open(filein).unwrap(); + let record_batch_reader = reader_builder.read(Default::default()).unwrap(); + let _batches = record_batch_reader + .collect::, _>>() + .unwrap(); } #[test] fn test_poly() { - let mut filein = BufReader::new(File::open("fixtures/flatgeobuf/poly00.fgb").unwrap()); - let table = read_flatgeobuf(&mut filein, Default::default()).unwrap(); + let filein = BufReader::new(File::open("fixtures/flatgeobuf/poly00.fgb").unwrap()); + + let reader_builder = FlatGeobufReaderBuilder::open(filein).unwrap(); + let record_batch_reader = reader_builder.read(Default::default()).unwrap(); + let table = Table::try_from( + Box::new(record_batch_reader) as Box + ) + .unwrap(); let geom_col = table.geometry_column(None).unwrap(); assert!(matches!(geom_col.data_type(), NativeType::Polygon(_, _))); @@ -187,12 +443,16 @@ mod test { #[test] fn test_all_datatypes() { - let mut filein = - BufReader::new(File::open("fixtures/flatgeobuf/alldatatypes.fgb").unwrap()); - let table = read_flatgeobuf(&mut filein, Default::default()).unwrap(); + let filein = BufReader::new(File::open("fixtures/flatgeobuf/alldatatypes.fgb").unwrap()); + let reader_builder = FlatGeobufReaderBuilder::open(filein).unwrap(); + let record_batch_reader = reader_builder.read(Default::default()).unwrap(); + let table = Table::try_from( + Box::new(record_batch_reader) as Box + ) + .unwrap(); let geom_col = table.geometry_column(None).unwrap(); - assert!(matches!(geom_col.data_type(), NativeType::Point(_, _))); + assert!(matches!(geom_col.data_type(), NativeType::Geometry(_))); let (batches, schema) = table.into_inner(); assert_eq!(batches[0].num_rows(), 1); diff --git a/rust/geoarrow/src/io/flatgeobuf/writer.rs b/rust/geoarrow/src/io/flatgeobuf/writer.rs index 3a69c80a..89518cc3 100644 --- a/rust/geoarrow/src/io/flatgeobuf/writer.rs +++ b/rust/geoarrow/src/io/flatgeobuf/writer.rs @@ -168,7 +168,8 @@ fn infer_flatgeobuf_geometry_type(schema: &Schema) -> Result + ) + .unwrap(); // Note: backwards row order is due to the reordering during the spatial index let batch = &new_table.batches()[0]; @@ -203,8 +209,13 @@ mod test { }; write_flatgeobuf_with_options(&table, writer, "name", options).unwrap(); - let mut reader = Cursor::new(output_buffer); - let new_table = read_flatgeobuf(&mut reader, Default::default()).unwrap(); + let reader = Cursor::new(output_buffer); + let reader_builder = FlatGeobufReaderBuilder::open(reader).unwrap(); + let record_batch_reader = reader_builder.read(Default::default()).unwrap(); + let new_table = Table::try_from( + Box::new(record_batch_reader) as Box + ) + .unwrap(); assert_eq!(table, new_table); } @@ -216,8 +227,13 @@ mod test { let writer = BufWriter::new(&mut output_buffer); write_flatgeobuf(&table, writer, "name").unwrap(); - let mut reader = Cursor::new(output_buffer); - let new_table = read_flatgeobuf(&mut reader, Default::default()).unwrap(); + let reader = Cursor::new(output_buffer); + let reader_builder = FlatGeobufReaderBuilder::open(reader).unwrap(); + let record_batch_reader = reader_builder.read(Default::default()).unwrap(); + let new_table = Table::try_from( + Box::new(record_batch_reader) as Box + ) + .unwrap(); // Note: backwards row order is due to the reordering during the spatial index let batch = &new_table.batches()[0]; diff --git a/rust/geoarrow/src/io/geozero/table/builder/properties.rs b/rust/geoarrow/src/io/geozero/table/builder/properties.rs index e85df409..272435ec 100644 --- a/rust/geoarrow/src/io/geozero/table/builder/properties.rs +++ b/rust/geoarrow/src/io/geozero/table/builder/properties.rs @@ -1,7 +1,7 @@ use std::sync::Arc; use arrow_array::RecordBatch; -use arrow_schema::{Field, Schema, SchemaBuilder}; +use arrow_schema::{Schema, SchemaBuilder}; use chrono::{DateTime, Utc}; use geozero::{FeatureProcessor, GeomProcessor, PropertyProcessor}; @@ -122,8 +122,9 @@ impl PropertiesBatchBuilder { let mut columns = Vec::with_capacity(self.columns.len()); for (name, builder) in self.columns.into_iter() { + let field = builder.field().with_name(name); let array = builder.finish()?; - schema_builder.push(Field::new(name, array.data_type().clone(), true)); + schema_builder.push(field); columns.push(array); } diff --git a/rust/geoarrow/src/io/geozero/table/builder/table.rs b/rust/geoarrow/src/io/geozero/table/builder/table.rs index 1c7340ac..7dab5e22 100644 --- a/rust/geoarrow/src/io/geozero/table/builder/table.rs +++ b/rust/geoarrow/src/io/geozero/table/builder/table.rs @@ -231,8 +231,6 @@ impl GeoTableBuilder { table.append_column(geom_field, geom_col.array_refs())?; Ok(table) - // TODO: 3d downcasting not yet supported - // table.downcast(false) } } diff --git a/rust/geodatafusion/src/udf/native/measurement/area.rs b/rust/geodatafusion/src/udf/native/measurement/area.rs index 8480e88c..30c1a756 100644 --- a/rust/geodatafusion/src/udf/native/measurement/area.rs +++ b/rust/geodatafusion/src/udf/native/measurement/area.rs @@ -77,15 +77,24 @@ mod test { use geoarrow::algorithm::native::Cast; use geoarrow::array::CoordType; use geoarrow::datatypes::NativeType; - use geoarrow::io::flatgeobuf::read_flatgeobuf; + use geoarrow::io::flatgeobuf::{FlatGeobufReaderBuilder, FlatGeobufReaderOptions}; + use geoarrow::table::Table; use std::fs::File; use std::sync::Arc; use super::*; fn load_file() -> RecordBatch { - let mut file = File::open("../../fixtures/flatgeobuf/countries.fgb").unwrap(); - let table = read_flatgeobuf(&mut file, Default::default()).unwrap(); + let file = File::open("../../fixtures/flatgeobuf/countries.fgb").unwrap(); + let reader_builder = FlatGeobufReaderBuilder::open(file).unwrap(); + let options = FlatGeobufReaderOptions { + coord_type: CoordType::Separated, + ..Default::default() + }; + let reader = reader_builder.read(options).unwrap(); + let table = + Table::try_from(Box::new(reader) as Box).unwrap(); + let geometry = table.geometry_column(None).unwrap(); let geometry = geometry .as_ref() diff --git a/rust/geodatafusion/src/udf/native/processing/centroid.rs b/rust/geodatafusion/src/udf/native/processing/centroid.rs index 91760fd8..425779a4 100644 --- a/rust/geodatafusion/src/udf/native/processing/centroid.rs +++ b/rust/geodatafusion/src/udf/native/processing/centroid.rs @@ -82,15 +82,24 @@ mod test { use geoarrow::algorithm::native::Cast; use geoarrow::array::CoordType; use geoarrow::datatypes::NativeType; - use geoarrow::io::flatgeobuf::read_flatgeobuf; + use geoarrow::io::flatgeobuf::{FlatGeobufReaderBuilder, FlatGeobufReaderOptions}; + use geoarrow::table::Table; use std::fs::File; use std::sync::Arc; use super::*; fn load_file() -> RecordBatch { - let mut file = File::open("../../fixtures/flatgeobuf/countries.fgb").unwrap(); - let table = read_flatgeobuf(&mut file, Default::default()).unwrap(); + let file = File::open("../../fixtures/flatgeobuf/countries.fgb").unwrap(); + let reader_builder = FlatGeobufReaderBuilder::open(file).unwrap(); + let options = FlatGeobufReaderOptions { + coord_type: CoordType::Separated, + ..Default::default() + }; + let reader = reader_builder.read(options).unwrap(); + let table = + Table::try_from(Box::new(reader) as Box).unwrap(); + let geometry = table.geometry_column(None).unwrap(); let geometry = geometry .as_ref()