Skip to content

Commit

Permalink
FlatGeobuf async stream
Browse files Browse the repository at this point in the history
  • Loading branch information
kylebarron committed Dec 23, 2024
1 parent 9a1e946 commit a9dad02
Showing 1 changed file with 104 additions and 4 deletions.
108 changes: 104 additions & 4 deletions rust/geoarrow/src/io/flatgeobuf/reader/async.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,120 @@
use std::sync::Arc;

use flatgeobuf::{GeometryType, HttpFgbReader};
use arrow_array::RecordBatch;
use arrow_schema::{Schema, SchemaRef};
use async_stream::stream;
use flatgeobuf::{AsyncFeatureIter, FgbFeature, GeometryType, HttpFgbReader};
use futures::future::BoxFuture;
use futures::Stream;
use geozero::{FeatureProcessor, FeatureProperties};
use http_range_client::AsyncBufferedHttpRangeClient;
use http_range_client::{AsyncBufferedHttpRangeClient, AsyncHttpRangeClient};
use object_store::path::Path;
use object_store::ObjectStore;

use crate::algorithm::native::DowncastTable;
use crate::array::metadata::ArrayMetadata;
use crate::array::*;
use crate::datatypes::Dimension;
use crate::datatypes::{Dimension, NativeType};
use crate::error::{GeoArrowError, Result};
use crate::io::flatgeobuf::reader::common::{infer_schema, parse_crs, FlatGeobufReaderOptions};
use crate::io::flatgeobuf::reader::object_store_reader::ObjectStoreWrapper;
use crate::io::geozero::array::GeometryStreamBuilder;
use crate::io::geozero::table::{GeoTableBuilder, GeoTableBuilderOptions};
use crate::table::Table;

/// A builder for [FlatGeobufReader]
pub struct FlatGeobufStreamBuilder<T: AsyncHttpRangeClient> {
reader: HttpFgbReader<T>,
}

impl<T: AsyncHttpRangeClient> FlatGeobufStreamBuilder<T> {
// pub async fn open(reader: T) -> Result<Self> {
// HttpFgbReader::new(client)
// todo!()
// }

pub async fn open_existing(reader: AsyncBufferedHttpRangeClient<T>) -> Result<Self> {
let reader = HttpFgbReader::new(reader).await.unwrap();
Ok(Self { reader })
}

pub async fn read(self) {
let features = self.reader.select_all().await.unwrap();
}
}

impl FlatGeobufStreamBuilder<ObjectStoreWrapper> {
pub async fn open_store(
reader: Arc<dyn ObjectStore>,
location: Path,
options: FlatGeobufReaderOptions,
) -> Result<Self> {
let head = reader.head(&location).await?;

let object_store_wrapper = ObjectStoreWrapper {
reader,
location,
size: head.size,
};

let async_client = AsyncBufferedHttpRangeClient::with(object_store_wrapper, "");

Self::open_existing(async_client).await
}
}

enum StreamState {
/// At the start of a new row group, or the end of the FlatGeobuf stream
Init,
/// Decoding a batch
Decoding,
/// Reading data from input
Reading(BoxFuture<'static, ()>),
/// Error
Error,
}

/// An iterator over record batches from a FlatGeobuf file.
///
/// This implements [arrow_array::RecordBatchReader], which you can use to access data.
pub struct FlatGeobufStreamReader<T: AsyncHttpRangeClient> {
selection: AsyncFeatureIter<T>,
data_type: NativeType,
batch_size: usize,
properties_schema: SchemaRef,
num_rows_remaining: Option<usize>,
array_metadata: Arc<ArrayMetadata>,
state: StreamState,
}

impl<T: AsyncHttpRangeClient> FlatGeobufStreamReader<T> {
fn schema(&self) -> SchemaRef {
let geom_field =
self.data_type
.to_field_with_metadata("geometry", true, &self.array_metadata);
let mut fields = self.properties_schema.fields().to_vec();
fields.push(Arc::new(geom_field));
Arc::new(Schema::new_with_metadata(
fields,
self.properties_schema.metadata().clone(),
))
}

fn into_stream(self) -> impl Stream<Item = Result<Option<FgbFeature>>> {
stream! {
for await value in self.selection {
yield value
}
}
}
}

// impl<T: AsyncHttpRangeClient + Unpin + Send + 'static> Stream for FlatGeobufStreamReader<T> {
// type Item = Result<RecordBatch>;
// }

// fn fgb_to_stream<T: AsyncHttpRangeClient>(selection: AsyncFeatureIter<T>) ->

/// Read a FlatGeobuf file to a Table asynchronously from object storage.
pub async fn read_flatgeobuf_async(
reader: Arc<dyn ObjectStore>,
Expand All @@ -29,7 +128,8 @@ pub async fn read_flatgeobuf_async(
location,
size: head.size,
};
let async_client = AsyncBufferedHttpRangeClient::with(object_store_wrapper, "");
let async_client: AsyncBufferedHttpRangeClient<ObjectStoreWrapper> =
AsyncBufferedHttpRangeClient::with(object_store_wrapper, "");

let reader = HttpFgbReader::new(async_client).await.unwrap();

Expand Down

0 comments on commit a9dad02

Please sign in to comment.