Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: s2_request_token header (exercise idempotence) #86

Merged
merged 16 commits into from
Nov 29, 2024
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ tonic = { version = "0.12.3", features = ["tls", "tls-webpki-roots"] }
tonic-side-effect = { git = "https://github.com/s2-streamstore/tonic-side-effect.git", rev = "33c85a508a3dcb8f05194a7851592a83cec058d8" }
tower-service = "0.3.3"
tracing = "0.1.40"
uuid = { version = "1.11.0", features = ["v4", "fast-rng"] }

[build-dependencies]
tonic-build = { version = "0.12.3", features = ["prost"] }
Expand Down
7 changes: 3 additions & 4 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@ use http::{uri::Authority, HeaderValue};
use hyper_util::client::legacy::connect::HttpConnector;
use secrecy::SecretString;
use sync_docs::sync_docs;
use tokio::sync::mpsc;
use tokio::time::sleep;
use tokio::{sync::mpsc, time::sleep};
use tokio_stream::wrappers::ReceiverStream;
use tonic::{
metadata::AsciiMetadataValue,
Expand Down Expand Up @@ -373,7 +372,7 @@ impl Client {
req: types::CreateBasinRequest,
) -> Result<types::BasinInfo, ClientError> {
self.inner
.send(CreateBasinServiceRequest::new(
.send_retryable(CreateBasinServiceRequest::new(
self.inner.account_service_client(),
req,
))
Expand Down Expand Up @@ -458,7 +457,7 @@ impl BasinClient {
#[sync_docs]
pub async fn create_stream(&self, req: types::CreateStreamRequest) -> Result<(), ClientError> {
self.inner
.send(CreateStreamServiceRequest::new(
.send_retryable(CreateStreamServiceRequest::new(
self.inner.basin_service_client(),
req,
))
Expand Down
26 changes: 25 additions & 1 deletion src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ pub mod basin;
pub mod stream;

use std::{
fmt::Write,
pin::Pin,
task::{Context, Poll},
};
Expand Down Expand Up @@ -52,7 +53,30 @@ fn add_authorization_header(
Ok(())
}

pub trait ServiceRequest {
pub(crate) fn add_s2_request_token_header(
meta: &mut MetadataMap,
s2_request_token: &str,
) -> Result<(), types::ConvertError> {
let s2_request_token: AsciiMetadataValue = s2_request_token
.try_into()
.map_err(|_| "failed to parse token as metadata value")?;

meta.insert("s2-request-token", s2_request_token);

Ok(())
}

pub(crate) fn gen_s2_request_token() -> String {
uuid::Uuid::new_v4()
.as_bytes()
.iter()
.fold(String::with_capacity(32), |mut output, b| {
let _ = write!(output, "{b:02x}");
output
})
}

pub trait ServiceRequest: std::fmt::Debug {
/// Request parameters generated by prost.
type ApiRequest;
/// Response to be returned by the RPC.
Expand Down
18 changes: 12 additions & 6 deletions src/service/account.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use prost_types::method_options::IdempotencyLevel;
use tonic::transport::Channel;
use tonic::IntoRequest;
use tonic::{transport::Channel, IntoRequest};

use super::ServiceRequest;
use super::{add_s2_request_token_header, gen_s2_request_token, ServiceRequest};
use crate::{
api::{self, account_service_client::AccountServiceClient},
types,
Expand All @@ -12,23 +11,30 @@ use crate::{
pub struct CreateBasinServiceRequest {
client: AccountServiceClient<Channel>,
req: types::CreateBasinRequest,
s2_request_token: String,
}

impl CreateBasinServiceRequest {
pub fn new(client: AccountServiceClient<Channel>, req: types::CreateBasinRequest) -> Self {
Self { client, req }
Self {
client,
req,
s2_request_token: gen_s2_request_token(),
}
}
}

impl ServiceRequest for CreateBasinServiceRequest {
type ApiRequest = api::CreateBasinRequest;
type Response = types::BasinInfo;
type ApiResponse = api::CreateBasinResponse;
const IDEMPOTENCY_LEVEL: IdempotencyLevel = IdempotencyLevel::IdempotencyUnknown;
const IDEMPOTENCY_LEVEL: IdempotencyLevel = IdempotencyLevel::Idempotent;

fn prepare_request(&mut self) -> Result<tonic::Request<Self::ApiRequest>, types::ConvertError> {
let req: api::CreateBasinRequest = self.req.clone().try_into()?;
Ok(req.into_request())
let mut tonic_req = req.into_request();
add_s2_request_token_header(tonic_req.metadata_mut(), &self.s2_request_token)?;
Ok(tonic_req)
}

async fn send(
Expand Down
18 changes: 12 additions & 6 deletions src/service/basin.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use prost_types::method_options::IdempotencyLevel;
use tonic::transport::Channel;
use tonic::IntoRequest;
use tonic::{transport::Channel, IntoRequest};

use super::ServiceRequest;
use super::{add_s2_request_token_header, gen_s2_request_token, ServiceRequest};
use crate::{
api::{self, basin_service_client::BasinServiceClient},
types,
Expand Down Expand Up @@ -93,23 +92,30 @@ impl ServiceRequest for GetStreamConfigServiceRequest {
pub struct CreateStreamServiceRequest {
client: BasinServiceClient<Channel>,
req: types::CreateStreamRequest,
s2_request_token: String,
}

impl CreateStreamServiceRequest {
pub fn new(client: BasinServiceClient<Channel>, req: types::CreateStreamRequest) -> Self {
Self { client, req }
Self {
client,
req,
s2_request_token: gen_s2_request_token(),
}
}
}

impl ServiceRequest for CreateStreamServiceRequest {
type ApiRequest = api::CreateStreamRequest;
type Response = ();
type ApiResponse = api::CreateStreamResponse;
const IDEMPOTENCY_LEVEL: IdempotencyLevel = IdempotencyLevel::IdempotencyUnknown;
const IDEMPOTENCY_LEVEL: IdempotencyLevel = IdempotencyLevel::Idempotent;

fn prepare_request(&mut self) -> Result<tonic::Request<Self::ApiRequest>, types::ConvertError> {
let req: api::CreateStreamRequest = self.req.clone().try_into()?;
Ok(req.into_request())
let mut tonic_req = req.into_request();
add_s2_request_token_header(tonic_req.metadata_mut(), &self.s2_request_token)?;
Ok(tonic_req)
}

async fn send(
Expand Down
3 changes: 2 additions & 1 deletion src/service/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ impl ServiceRequest for AppendServiceRequest {
}
}

#[derive(Debug, Clone)]
pub struct AppendSessionServiceRequest<S>
where
S: Send + futures::Stream<Item = types::AppendInput> + Unpin,
Expand All @@ -269,7 +270,7 @@ where
}
}

impl<S> ServiceRequest for AppendSessionServiceRequest<S>
impl<S: std::fmt::Debug> ServiceRequest for AppendSessionServiceRequest<S>
where
S: 'static + Send + futures::Stream<Item = types::AppendInput> + Unpin,
{
Expand Down