Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: s2_request_token header (exercise idempotence) #86

Merged
merged 16 commits into from
Nov 29, 2024
11 changes: 8 additions & 3 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,15 @@ jobs:
- name: check documentation
env:
RUSTDOCFLAGS: -D warnings
run: cargo doc --workspace --all-features --no-deps --document-private-items
- name: check formatting
run: cargo fmt --all -- --check
run: cargo doc --workspace --all-features --no-deps --document-private-items
- name: check clippy
run: cargo clippy --workspace --all-features --all-targets -- -D warnings --allow deprecated
- name: check Cargo.toml sorting
run: cargo sort --workspace --check
- name: install rust for fmt
uses: dtolnay/rust-toolchain@stable
with:
toolchain: nightly
components: rustfmt, clippy
- name: check formatting
run: cargo +nightly fmt --all -- --check
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ tonic = { version = "0.12.3", features = ["tls", "tls-webpki-roots"] }
tonic-side-effect = { git = "https://github.com/s2-streamstore/tonic-side-effect.git", rev = "33c85a508a3dcb8f05194a7851592a83cec058d8" }
tower-service = "0.3.3"
tracing = "0.1.40"
uuid = { version = "1.11.0", features = ["v4", "fast-rng"] }

[build-dependencies]
tonic-build = { version = "0.12.3", features = ["prost"] }
Expand All @@ -46,3 +47,5 @@ connector = []

[lints.clippy]
unused_async = "deny"
declare_interior_mutable_const = "allow"
borrow_interior_mutable_const = "allow"
38 changes: 24 additions & 14 deletions src/append_session.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,32 @@
use crate::client::{AppendRetryPolicy, ClientError, StreamClient};
use crate::service::stream::{AppendSessionServiceRequest, AppendSessionStreamingResponse};
use crate::service::ServiceStreamingResponse;
use crate::types;
use crate::types::MeteredSize;
use std::{
collections::VecDeque,
ops::{DerefMut, RangeTo},
sync::Arc,
time::Duration,
};

use bytesize::ByteSize;
use futures::StreamExt;
use std::collections::VecDeque;
use std::ops::{DerefMut, RangeTo};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc::Permit;
use tokio::sync::{mpsc, Mutex};
use tokio::time::Instant;
use tokio::{
sync::{mpsc, mpsc::Permit, Mutex},
time::Instant,
};
use tokio_muxt::{CoalesceMode, MuxTimer};
use tokio_stream::wrappers::ReceiverStream;
use tonic::Status;
use tonic_side_effect::FrameSignal;
use tracing::debug;

use crate::{
client::{AppendRetryPolicy, ClientError, StreamClient},
service::{
stream::{AppendSessionServiceRequest, AppendSessionStreamingResponse},
ServiceStreamingResponse,
},
types,
types::MeteredSize,
};

async fn connect(
stream_client: &StreamClient,
frame_signal: FrameSignal,
Expand Down Expand Up @@ -355,8 +364,9 @@ pub(crate) async fn manage_session<S>(
match stream_client.inner.config.append_retry_policy {
AppendRetryPolicy::All => true,
AppendRetryPolicy::NoSideEffects => {
// If no request frame has been produced, we conclude that the failing append
// never left this host, so it is safe to retry.
// If no request frame has been produced, we conclude that the failing
// append never left this host, so it is
// safe to retry.
infiniteregrets marked this conversation as resolved.
Show resolved Hide resolved
!frame_signal.is_signalled()
}
}
Expand Down
3 changes: 1 addition & 2 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@ use http::{uri::Authority, HeaderValue};
use hyper_util::client::legacy::connect::HttpConnector;
use secrecy::SecretString;
use sync_docs::sync_docs;
use tokio::sync::mpsc;
use tokio::time::sleep;
use tokio::{sync::mpsc, time::sleep};
use tokio_stream::wrappers::ReceiverStream;
use tonic::{
metadata::AsciiMetadataValue,
Expand Down
19 changes: 19 additions & 0 deletions src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@ pub mod stream;

use std::{
pin::Pin,
sync::LazyLock,
task::{Context, Poll},
};

use futures::StreamExt;
use prost_types::method_options::IdempotencyLevel;
use secrecy::{ExposeSecret, SecretString};
use tonic::metadata::{AsciiMetadataKey, AsciiMetadataValue, MetadataMap};
use uuid::Uuid;

use crate::{client::ClientError, types};

Expand Down Expand Up @@ -52,6 +54,19 @@ fn add_authorization_header(
Ok(())
}

pub(crate) fn add_s2_request_token_header(
meta: &mut MetadataMap,
s2_request_token: &str,
) -> Result<(), types::ConvertError> {
let s2_request_token: AsciiMetadataValue = s2_request_token
.try_into()
.map_err(|_| "failed to parse token as metadata value")?;

meta.insert("s2-request-token", s2_request_token);

Ok(())
}

pub trait ServiceRequest {
/// Request parameters generated by prost.
type ApiRequest;
Expand All @@ -63,6 +78,10 @@ pub trait ServiceRequest {
/// Idempotency level for the underlying service.
const IDEMPOTENCY_LEVEL: IdempotencyLevel;

/// Request token for the service to be used for idempotency.
/// Only use by `CreateBasin` and `CreateStream`.
const S2_REQUEST_TOKEN: LazyLock<String> = LazyLock::new(|| Uuid::new_v4().to_string());
infiniteregrets marked this conversation as resolved.
Show resolved Hide resolved

/// Take the request parameters and generate the corresponding tonic request.
fn prepare_request(&mut self) -> Result<tonic::Request<Self::ApiRequest>, types::ConvertError>;

Expand Down
11 changes: 6 additions & 5 deletions src/service/account.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use prost_types::method_options::IdempotencyLevel;
use tonic::transport::Channel;
use tonic::IntoRequest;
use tonic::{transport::Channel, IntoRequest};

use super::ServiceRequest;
use super::{add_s2_request_token_header, ServiceRequest};
use crate::{
api::{self, account_service_client::AccountServiceClient},
types,
Expand All @@ -24,11 +23,13 @@ impl ServiceRequest for CreateBasinServiceRequest {
type ApiRequest = api::CreateBasinRequest;
type Response = types::BasinInfo;
type ApiResponse = api::CreateBasinResponse;
const IDEMPOTENCY_LEVEL: IdempotencyLevel = IdempotencyLevel::IdempotencyUnknown;
const IDEMPOTENCY_LEVEL: IdempotencyLevel = IdempotencyLevel::Idempotent;

fn prepare_request(&mut self) -> Result<tonic::Request<Self::ApiRequest>, types::ConvertError> {
let req: api::CreateBasinRequest = self.req.clone().try_into()?;
Ok(req.into_request())
let mut tonic_req = req.into_request();
add_s2_request_token_header(tonic_req.metadata_mut(), &Self::S2_REQUEST_TOKEN)?;
Ok(tonic_req)
}

async fn send(
Expand Down
11 changes: 6 additions & 5 deletions src/service/basin.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use prost_types::method_options::IdempotencyLevel;
use tonic::transport::Channel;
use tonic::IntoRequest;
use tonic::{transport::Channel, IntoRequest};

use super::ServiceRequest;
use super::{add_s2_request_token_header, ServiceRequest};
use crate::{
api::{self, basin_service_client::BasinServiceClient},
types,
Expand Down Expand Up @@ -105,11 +104,13 @@ impl ServiceRequest for CreateStreamServiceRequest {
type ApiRequest = api::CreateStreamRequest;
type Response = ();
type ApiResponse = api::CreateStreamResponse;
const IDEMPOTENCY_LEVEL: IdempotencyLevel = IdempotencyLevel::IdempotencyUnknown;
const IDEMPOTENCY_LEVEL: IdempotencyLevel = IdempotencyLevel::Idempotent;

fn prepare_request(&mut self) -> Result<tonic::Request<Self::ApiRequest>, types::ConvertError> {
let req: api::CreateStreamRequest = self.req.clone().try_into()?;
Ok(req.into_request())
let mut tonic_req = req.into_request();
add_s2_request_token_header(tonic_req.metadata_mut(), &Self::S2_REQUEST_TOKEN)?;
Ok(tonic_req)
}

async fn send(
Expand Down
6 changes: 2 additions & 4 deletions src/service/stream.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
use prost_types::method_options::IdempotencyLevel;
use tonic::transport::Channel;
use tonic::IntoRequest;
use tonic::{transport::Channel, IntoRequest};
use tonic_side_effect::{FrameSignal, RequestFrameMonitor};

use super::{
ClientError, ServiceRequest, ServiceStreamingRequest, ServiceStreamingResponse,
StreamingRequest, StreamingResponse,
};

use crate::client::AppendRetryPolicy;
use crate::{
api::{self, stream_service_client::StreamServiceClient},
client::AppendRetryPolicy,
types,
};

Expand Down