Skip to content

Commit

Permalink
feat: Implement lingering for append record stream (#55)
Browse files Browse the repository at this point in the history
Resolves: #37

---------

Signed-off-by: Vaibhav Rabber <[email protected]>
  • Loading branch information
vrongmeal authored Nov 18, 2024
1 parent c36fb38 commit 7b85760
Show file tree
Hide file tree
Showing 10 changed files with 444 additions and 231 deletions.
20 changes: 19 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,31 @@ jobs:
permissions:
contents: none
name: CI
needs: lint
needs: [test, lint]
runs-on: ubuntu-latest
if: always()
steps:
- name: Failed
run: exit 1
if: contains(needs.*.result, 'failure') || contains(needs.*.result, 'cancelled') || contains(needs.*.result, 'skipped')
test:
runs-on: ubuntu-latest
steps:
- name: checkout
uses: actions/checkout@v4
with:
submodules: true
token: ${{ secrets.GH_TOKEN }}
- name: install rust
uses: dtolnay/rust-toolchain@stable
with:
toolchain: stable
components: rustfmt, clippy
- name: install protoc
uses: arduino/setup-protoc@v3
- uses: Swatinem/rust-cache@v2
- name: Run cargo tests
run: cargo test
lint:
runs-on: ubuntu-latest
steps:
Expand Down
6 changes: 5 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ edition = "2021"
license = "Apache-2.0"

[dependencies]
async-stream = "0.3.6"
backon = "1.2.0"
bytesize = "1.3.0"
futures = "0.3.31"
Expand All @@ -22,14 +23,17 @@ secrecy = "0.8.0"
serde = { version = "1.0.214", optional = true, features = ["derive"] }
sync_docs = { path = "sync_docs" }
thiserror = "1.0.67"
tokio = { version = "1.41.1", features = ["time"] }
tonic = { version = "0.12.3", features = ["tls", "tls-webpki-roots"] }
tower-service = "0.3.3"

[build-dependencies]
tonic-build = { version = "0.12.3", features = ["prost"] }

[dev-dependencies]
tokio = { version = "*", features = ["full"] }
rstest = "0.23.0"
tokio = { version = "1.41.1", features = ["full", "test-util"] }
tokio-stream = "0.1.16"

[features]
serde = ["dep:serde"]
Expand Down
4 changes: 2 additions & 2 deletions examples/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ use std::time::Duration;

use futures::StreamExt;
use streamstore::{
batching::AppendRecordsBatchingStream,
client::{Client, ClientConfig, ClientError, HostEndpoints},
streams::AppendRecordStream,
types::{
AppendInput, AppendRecord, CreateBasinRequest, CreateStreamRequest, DeleteBasinRequest,
DeleteStreamRequest, ListBasinsRequest, ListStreamsRequest, ReadSessionRequest,
Expand Down Expand Up @@ -123,7 +123,7 @@ async fn main() {
};

let append_session_req =
AppendRecordStream::new(futures::stream::iter(records), Default::default()).unwrap();
AppendRecordsBatchingStream::new(futures::stream::iter(records), Default::default());

match stream_client.append_session(append_session_req).await {
Ok(mut stream) => {
Expand Down
8 changes: 4 additions & 4 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -579,15 +579,15 @@ impl StreamClient {
pub async fn read_session(
&self,
req: types::ReadSessionRequest,
) -> Result<Streaming<types::ReadSessionResponse>, ClientError> {
) -> Result<Streaming<types::ReadOutput>, ClientError> {
self.inner
.send_retryable(ReadSessionServiceRequest::new(
self.inner.stream_service_client(),
&self.stream,
req,
))
.await
.map(Streaming::new)
.map(|s| Box::pin(s) as _)
}

#[sync_docs]
Expand All @@ -610,7 +610,7 @@ impl StreamClient {
req: S,
) -> Result<Streaming<types::AppendOutput>, ClientError>
where
S: 'static + Send + futures::Stream<Item = types::AppendInput> + Unpin,
S: 'static + Send + Unpin + futures::Stream<Item = types::AppendInput>,
{
self.inner
.send(AppendSessionServiceRequest::new(
Expand All @@ -619,7 +619,7 @@ impl StreamClient {
req,
))
.await
.map(Streaming::new)
.map(|s| Box::pin(s) as _)
}
}

Expand Down
3 changes: 1 addition & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,10 @@ mod api;
mod service;

pub mod client;
pub mod streams;
pub mod types;

pub use bytesize;
pub use futures;
pub use http::uri;
pub use secrecy::SecretString;
pub use service::Streaming;
pub use service::stream::batching;
20 changes: 2 additions & 18 deletions src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,21 +193,5 @@ impl<S: StreamingResponse> futures::Stream for ServiceStreamingResponse<S> {
}
}

pub struct Streaming<R>(Box<dyn Unpin + Send + futures::Stream<Item = Result<R, ClientError>>>);

impl<R> Streaming<R> {
pub(crate) fn new<S>(s: ServiceStreamingResponse<S>) -> Self
where
S: StreamingResponse<ResponseItem = R> + Send + 'static,
{
Self(Box::new(s))
}
}

impl<R> futures::Stream for Streaming<R> {
type Item = Result<R, ClientError>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.0.poll_next_unpin(cx)
}
}
/// Generic type for streaming response.
pub type Streaming<R> = Pin<Box<dyn Send + futures::Stream<Item = Result<R, ClientError>>>>;
4 changes: 3 additions & 1 deletion src/service/stream.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
pub mod batching;

use tonic::{transport::Channel, IntoRequest};

use super::{
Expand Down Expand Up @@ -161,7 +163,7 @@ impl IdempotentRequest for ReadSessionServiceRequest {
pub struct ReadSessionStreamingResponse;

impl StreamingResponse for ReadSessionStreamingResponse {
type ResponseItem = types::ReadSessionResponse;
type ResponseItem = types::ReadOutput;
type ApiResponseItem = api::ReadSessionResponse;

fn parse_response_item(
Expand Down
Loading

0 comments on commit 7b85760

Please sign in to comment.