From a9dad020d2d0f2ce73be2f2f04074afecd207e03 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Mon, 23 Dec 2024 15:26:53 -0500 Subject: [PATCH] FlatGeobuf async stream --- .../src/io/flatgeobuf/reader/async.rs | 108 +++++++++++++++++- 1 file changed, 104 insertions(+), 4 deletions(-) diff --git a/rust/geoarrow/src/io/flatgeobuf/reader/async.rs b/rust/geoarrow/src/io/flatgeobuf/reader/async.rs index 9526ec54..5bf7de40 100644 --- a/rust/geoarrow/src/io/flatgeobuf/reader/async.rs +++ b/rust/geoarrow/src/io/flatgeobuf/reader/async.rs @@ -1,14 +1,20 @@ use std::sync::Arc; -use flatgeobuf::{GeometryType, HttpFgbReader}; +use arrow_array::RecordBatch; +use arrow_schema::{Schema, SchemaRef}; +use async_stream::stream; +use flatgeobuf::{AsyncFeatureIter, FgbFeature, GeometryType, HttpFgbReader}; +use futures::future::BoxFuture; +use futures::Stream; use geozero::{FeatureProcessor, FeatureProperties}; -use http_range_client::AsyncBufferedHttpRangeClient; +use http_range_client::{AsyncBufferedHttpRangeClient, AsyncHttpRangeClient}; use object_store::path::Path; use object_store::ObjectStore; use crate::algorithm::native::DowncastTable; +use crate::array::metadata::ArrayMetadata; use crate::array::*; -use crate::datatypes::Dimension; +use crate::datatypes::{Dimension, NativeType}; use crate::error::{GeoArrowError, Result}; use crate::io::flatgeobuf::reader::common::{infer_schema, parse_crs, FlatGeobufReaderOptions}; use crate::io::flatgeobuf::reader::object_store_reader::ObjectStoreWrapper; @@ -16,6 +22,99 @@ use crate::io::geozero::array::GeometryStreamBuilder; use crate::io::geozero::table::{GeoTableBuilder, GeoTableBuilderOptions}; use crate::table::Table; +/// A builder for [FlatGeobufReader] +pub struct FlatGeobufStreamBuilder { + reader: HttpFgbReader, +} + +impl FlatGeobufStreamBuilder { + // pub async fn open(reader: T) -> Result { + // HttpFgbReader::new(client) + // todo!() + // } + + pub async fn open_existing(reader: AsyncBufferedHttpRangeClient) -> Result { + let reader = HttpFgbReader::new(reader).await.unwrap(); + Ok(Self { reader }) + } + + pub async fn read(self) { + let features = self.reader.select_all().await.unwrap(); + } +} + +impl FlatGeobufStreamBuilder { + pub async fn open_store( + reader: Arc, + location: Path, + options: FlatGeobufReaderOptions, + ) -> Result { + let head = reader.head(&location).await?; + + let object_store_wrapper = ObjectStoreWrapper { + reader, + location, + size: head.size, + }; + + let async_client = AsyncBufferedHttpRangeClient::with(object_store_wrapper, ""); + + Self::open_existing(async_client).await + } +} + +enum StreamState { + /// At the start of a new row group, or the end of the FlatGeobuf stream + Init, + /// Decoding a batch + Decoding, + /// Reading data from input + Reading(BoxFuture<'static, ()>), + /// Error + Error, +} + +/// An iterator over record batches from a FlatGeobuf file. +/// +/// This implements [arrow_array::RecordBatchReader], which you can use to access data. +pub struct FlatGeobufStreamReader { + selection: AsyncFeatureIter, + data_type: NativeType, + batch_size: usize, + properties_schema: SchemaRef, + num_rows_remaining: Option, + array_metadata: Arc, + state: StreamState, +} + +impl FlatGeobufStreamReader { + fn schema(&self) -> SchemaRef { + let geom_field = + self.data_type + .to_field_with_metadata("geometry", true, &self.array_metadata); + let mut fields = self.properties_schema.fields().to_vec(); + fields.push(Arc::new(geom_field)); + Arc::new(Schema::new_with_metadata( + fields, + self.properties_schema.metadata().clone(), + )) + } + + fn into_stream(self) -> impl Stream>> { + stream! { + for await value in self.selection { + yield value + } + } + } +} + +// impl Stream for FlatGeobufStreamReader { +// type Item = Result; +// } + +// fn fgb_to_stream(selection: AsyncFeatureIter) -> + /// Read a FlatGeobuf file to a Table asynchronously from object storage. pub async fn read_flatgeobuf_async( reader: Arc, @@ -29,7 +128,8 @@ pub async fn read_flatgeobuf_async( location, size: head.size, }; - let async_client = AsyncBufferedHttpRangeClient::with(object_store_wrapper, ""); + let async_client: AsyncBufferedHttpRangeClient = + AsyncBufferedHttpRangeClient::with(object_store_wrapper, ""); let reader = HttpFgbReader::new(async_client).await.unwrap();