From 576651064b50f108722746581c2a6467da838ed7 Mon Sep 17 00:00:00 2001 From: Vaibhav Rabber Date: Tue, 19 Nov 2024 23:43:46 +0530 Subject: [PATCH] fix: Remove `ConnectionError` in favour of pre-processing Resolves: #60 Signed-off-by: Vaibhav Rabber --- Cargo.toml | 1 + examples/basic.rs | 15 +++--- src/client.rs | 112 +++++++++++++++++------------------------ src/service.rs | 9 ++-- src/service/account.rs | 11 ++-- src/types.rs | 84 ++++++++++++++++++++++++++----- 6 files changed, 135 insertions(+), 97 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 76e9b21..5569afb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,6 +19,7 @@ hyper = "1.5.0" hyper-util = "0.1.10" prost = "0.13.3" prost-types = "0.13.3" +regex = "1.11.1" secrecy = "0.8.0" serde = { version = "1.0.214", optional = true, features = ["derive"] } sync_docs = { path = "sync_docs" } diff --git a/examples/basic.rs b/examples/basic.rs index 771f969..e4c3db7 100644 --- a/examples/basic.rs +++ b/examples/basic.rs @@ -5,8 +5,9 @@ use streamstore::{ batching::AppendRecordsBatchingStream, client::{Client, ClientConfig, ClientError, HostEndpoints}, types::{ - AppendInput, AppendRecord, CreateBasinRequest, CreateStreamRequest, DeleteBasinRequest, - DeleteStreamRequest, ListBasinsRequest, ListStreamsRequest, ReadSessionRequest, + AppendInput, AppendRecord, BasinName, CreateBasinRequest, CreateStreamRequest, + DeleteBasinRequest, DeleteStreamRequest, ListBasinsRequest, ListStreamsRequest, + ReadSessionRequest, }, }; @@ -22,11 +23,11 @@ async fn main() { println!("Connecting with {config:#?}"); - let client = Client::new(config).unwrap(); + let client = Client::new(config); - let basin = "s2-sdk-example-basin"; + let basin: BasinName = "s2-sdk-example-basin".parse().unwrap(); - let create_basin_req = CreateBasinRequest::new(basin); + let create_basin_req = CreateBasinRequest::new(basin.clone()); match client.create_basin(create_basin_req).await { Ok(created_basin) => { @@ -57,7 +58,7 @@ async fn main() { Err(err) => exit_with_err(err), }; - match client.get_basin_config(basin).await { + match client.get_basin_config(basin.clone()).await { Ok(config) => { println!("Basin config: {config:#?}"); } @@ -68,7 +69,7 @@ async fn main() { let create_stream_req = CreateStreamRequest::new(stream); - let basin_client = client.basin_client(basin).unwrap(); + let basin_client = client.basin_client(basin.clone()); match basin_client.create_stream(create_stream_req).await { Ok(()) => { diff --git a/src/client.rs b/src/client.rs index 2e8ff8f..ecc47a0 100644 --- a/src/client.rs +++ b/src/client.rs @@ -2,7 +2,7 @@ use std::{fmt::Display, str::FromStr, time::Duration}; use backon::{BackoffBuilder, ConstantBuilder, Retryable}; use futures::StreamExt; -use http::uri::Authority; +use http::{uri::Authority, HeaderValue}; use hyper_util::client::legacy::connect::HttpConnector; use secrecy::SecretString; use sync_docs::sync_docs; @@ -166,8 +166,8 @@ impl ParseError { /// Endpoints for the hosted S2 environment. #[derive(Debug, Clone)] pub struct HostEndpoints { - pub cell: Authority, - pub basin_zone: Option, + cell: Authority, + basin_zone: Option, } impl From for HostEndpoints { @@ -253,7 +253,7 @@ pub struct ClientConfig { /// Timeout for a particular request. pub request_timeout: Duration, /// User agent to be used for the client. - pub user_agent: String, + pub user_agent: HeaderValue, /// URI scheme to use to connect. #[cfg(feature = "connector")] pub uri_scheme: http::uri::Scheme, @@ -272,7 +272,7 @@ impl ClientConfig { host_endpoints: HostEndpoints::default(), connection_timeout: Duration::from_secs(3), request_timeout: Duration::from_secs(5), - user_agent: "s2-sdk-rust".to_string(), + user_agent: "s2-sdk-rust".parse().unwrap(), #[cfg(feature = "connector")] uri_scheme: http::uri::Scheme::HTTPS, retry_backoff_duration: Duration::from_millis(100), @@ -306,7 +306,7 @@ impl ClientConfig { } /// Construct from an existing configuration with the new user agent. - pub fn with_user_agent(self, user_agent: impl Into) -> Self { + pub fn with_user_agent(self, user_agent: impl Into) -> Self { Self { user_agent: user_agent.into(), ..self @@ -355,33 +355,30 @@ pub struct Client { impl Client { /// Create the client to connect with the S2 API. - pub fn new(config: ClientConfig) -> Result { - Ok(Self { - inner: ClientInner::new_cell(config, DEFAULT_HTTP_CONNECTOR)?, - }) + pub fn new(config: ClientConfig) -> Self { + Self { + inner: ClientInner::new_cell(config, DEFAULT_HTTP_CONNECTOR), + } } #[cfg(feature = "connector")] - pub fn new_with_connector( - config: ClientConfig, - connector: C, - ) -> Result + pub fn new_with_connector(config: ClientConfig, connector: C) -> Self where C: tower_service::Service + Send + 'static, C::Response: hyper::rt::Read + hyper::rt::Write + Send + Unpin, C::Future: Send, C::Error: std::error::Error + Send + Sync + 'static, { - Ok(Self { - inner: ClientInner::new_cell(config, Some(connector))?, - }) + Self { + inner: ClientInner::new_cell(config, Some(connector)), + } } /// Get the client to interact with the S2 basin service API. - pub fn basin_client(&self, basin: impl Into) -> Result { - Ok(BasinClient { - inner: self.inner.new_basin(basin)?, - }) + pub fn basin_client(&self, basin: types::BasinName) -> BasinClient { + BasinClient { + inner: self.inner.new_basin(basin), + } } #[sync_docs] @@ -423,7 +420,7 @@ impl Client { #[sync_docs] pub async fn get_basin_config( &self, - basin: impl Into, + basin: types::BasinName, ) -> Result { self.inner .send_retryable(GetBasinConfigServiceRequest::new( @@ -455,25 +452,23 @@ pub struct BasinClient { impl BasinClient { /// Create the client to connect with the S2 basin service API. - pub fn new(config: ClientConfig, basin: impl Into) -> Result { - let client = Client::new(config)?; - client.basin_client(basin) + pub fn new(config: ClientConfig, basin: types::BasinName) -> Self { + Client::new(config).basin_client(basin) } #[cfg(feature = "connector")] pub fn new_with_connector( config: ClientConfig, - basin: impl Into, + basin: types::BasinName, connector: C, - ) -> Result + ) -> Self where C: tower_service::Service + Send + 'static, C::Response: hyper::rt::Read + hyper::rt::Write + Send + Unpin, C::Future: Send, C::Error: std::error::Error + Send + Sync + 'static, { - let client = Client::new_with_connector(config, connector)?; - client.basin_client(basin) + Client::new_with_connector(config, connector).basin_client(basin) } /// Get the client to interact with the S2 stream service API. @@ -553,29 +548,24 @@ pub struct StreamClient { impl StreamClient { /// Create the client to connect with the S2 stream service API. - pub fn new( - config: ClientConfig, - basin: impl Into, - stream: impl Into, - ) -> Result { - BasinClient::new(config, basin).map(|client| client.stream_client(stream)) + pub fn new(config: ClientConfig, basin: types::BasinName, stream: impl Into) -> Self { + BasinClient::new(config, basin).stream_client(stream) } #[cfg(feature = "connector")] pub fn new_with_connector( config: ClientConfig, - basin: impl Into, + basin: types::BasinName, stream: impl Into, connector: C, - ) -> Result + ) -> Self where C: tower_service::Service + Send + 'static, C::Response: hyper::rt::Read + hyper::rt::Write + Send + Unpin, C::Future: Send, C::Error: std::error::Error + Send + Sync + 'static, { - BasinClient::new_with_connector(config, basin, connector) - .map(|client| client.stream_client(stream)) + BasinClient::new_with_connector(config, basin, connector).stream_client(stream) } #[sync_docs] @@ -654,12 +644,12 @@ impl StreamClient { #[derive(Debug, Clone)] struct ClientInner { channel: Channel, - basin: Option, + basin: Option, config: ClientConfig, } impl ClientInner { - fn new_cell(config: ClientConfig, connector: Option) -> Result + fn new_cell(config: ClientConfig, connector: Option) -> Self where C: tower_service::Service + Send + 'static, C::Response: hyper::rt::Read + hyper::rt::Write + Send + Unpin, @@ -670,26 +660,20 @@ impl ClientInner { Self::new(config, cell_endpoint, connector) } - fn new_basin(&self, basin: impl Into) -> Result { - let basin = basin.into(); - + fn new_basin(&self, basin: types::BasinName) -> Self { match self.config.host_endpoints.basin_zone.clone() { Some(endpoint) => { - let basin_endpoint: Authority = format!("{basin}.{endpoint}").parse()?; + let basin_endpoint: Authority = format!("{basin}.{endpoint}").parse().unwrap(); ClientInner::new(self.config.clone(), basin_endpoint, DEFAULT_HTTP_CONNECTOR) } - None => Ok(Self { + None => Self { basin: Some(basin), ..self.clone() - }), + }, } } - fn new( - config: ClientConfig, - endpoint: Authority, - connector: Option, - ) -> Result + fn new(config: ClientConfig, endpoint: Authority, connector: Option) -> Self where C: tower_service::Service + Send + 'static, C::Response: hyper::rt::Read + hyper::rt::Write + Send + Unpin, @@ -702,14 +686,17 @@ impl ClientInner { let scheme = config.uri_scheme.as_str(); let endpoint = format!("{scheme}://{endpoint}") - .parse::()? - .user_agent(config.user_agent.clone())? + .parse::() + .unwrap() + .user_agent(config.user_agent.clone()) + .unwrap() .http2_adaptive_window(true) .tls_config( ClientTlsConfig::default() .with_webpki_roots() .assume_http2(true), - )? + ) + .unwrap() .connect_timeout(config.connection_timeout) .timeout(config.request_timeout); @@ -723,15 +710,15 @@ impl ClientInner { endpoint.connect_lazy() }; - Ok(Self { + Self { channel, basin: None, config, - }) + } } async fn send(&self, service_req: T) -> Result { - send_request(service_req, &self.config.token, self.basin.as_deref()).await + send_request(service_req, &self.config.token, self.basin.as_ref()).await } async fn send_retryable_with_backoff( @@ -775,15 +762,6 @@ impl ClientInner { } } -/// Error connecting to S2 endpoint. -#[derive(Debug, thiserror::Error)] -pub enum ConnectionError { - #[error(transparent)] - TonicTransportError(#[from] tonic::transport::Error), - #[error(transparent)] - UriParseError(#[from] http::uri::InvalidUri), -} - fn read_resumption_stream( mut request: ReadSessionServiceRequest, mut responses: ServiceStreamingResponse, diff --git a/src/service.rs b/src/service.rs index c4b5388..4612aa9 100644 --- a/src/service.rs +++ b/src/service.rs @@ -17,7 +17,7 @@ use crate::{client::ClientError, types}; pub async fn send_request( mut service: T, token: &SecretString, - basin: Option<&str>, + basin: Option<&types::BasinName>, ) -> Result { let req = prepare_request(&mut service, token, basin)?; match service.send(req).await { @@ -29,7 +29,7 @@ pub async fn send_request( fn prepare_request( service: &mut T, token: &SecretString, - basin: Option<&str>, + basin: Option<&types::BasinName>, ) -> Result, types::ConvertError> { let mut req = service.prepare_request()?; add_authorization_header(req.metadata_mut(), token)?; @@ -51,7 +51,10 @@ fn add_authorization_header( Ok(()) } -fn add_basin_header(meta: &mut MetadataMap, basin: &str) -> Result<(), types::ConvertError> { +fn add_basin_header( + meta: &mut MetadataMap, + basin: &types::BasinName, +) -> Result<(), types::ConvertError> { meta.insert( "s2-basin", basin diff --git a/src/service/account.rs b/src/service/account.rs index cdbb326..0b57165 100644 --- a/src/service/account.rs +++ b/src/service/account.rs @@ -129,15 +129,12 @@ impl RetryableRequest for DeleteBasinServiceRequest { #[derive(Debug, Clone)] pub struct GetBasinConfigServiceRequest { client: AccountServiceClient, - basin: String, + basin: types::BasinName, } impl GetBasinConfigServiceRequest { - pub fn new(client: AccountServiceClient, basin: impl Into) -> Self { - Self { - client, - basin: basin.into(), - } + pub fn new(client: AccountServiceClient, basin: types::BasinName) -> Self { + Self { client, basin } } } @@ -148,7 +145,7 @@ impl ServiceRequest for GetBasinConfigServiceRequest { fn prepare_request(&mut self) -> Result, types::ConvertError> { let req = api::GetBasinConfigRequest { - basin: self.basin.clone(), + basin: self.basin.to_string(), }; Ok(req.into_request()) } diff --git a/src/types.rs b/src/types.rs index 71c347a..63e8b02 100644 --- a/src/types.rs +++ b/src/types.rs @@ -1,6 +1,7 @@ -use std::{str::FromStr, time::Duration}; +use std::{ops::Deref, str::FromStr, sync::OnceLock, time::Duration}; use bytesize::ByteSize; +use regex::Regex; #[cfg(feature = "serde")] use serde::{Deserialize, Serialize}; use sync_docs::sync_docs; @@ -50,15 +51,15 @@ macro_rules! metered_impl { #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] #[derive(Debug, Clone)] pub struct CreateBasinRequest { - pub basin: String, + pub basin: BasinName, pub config: Option, // TODO: Add assignment (when it's supported). } impl CreateBasinRequest { - pub fn new(basin: impl Into) -> Self { + pub fn new(basin: BasinName) -> Self { Self { - basin: basin.into(), + basin, config: None, } } @@ -76,7 +77,7 @@ impl TryFrom for api::CreateBasinRequest { fn try_from(value: CreateBasinRequest) -> Result { let CreateBasinRequest { basin, config } = value; Ok(Self { - basin, + basin: basin.0, config: config.map(TryInto::try_into).transpose()?, assignment: None, }) @@ -584,14 +585,14 @@ impl TryFrom for ListBasinsResponse { #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] #[derive(Debug, Clone)] pub struct DeleteBasinRequest { - pub basin: String, + pub basin: BasinName, pub if_exists: bool, } impl DeleteBasinRequest { - pub fn new(basin: impl Into) -> Self { + pub fn new(basin: BasinName) -> Self { Self { - basin: basin.into(), + basin, if_exists: false, } } @@ -604,7 +605,7 @@ impl DeleteBasinRequest { impl From for api::DeleteBasinRequest { fn from(value: DeleteBasinRequest) -> Self { let DeleteBasinRequest { basin, .. } = value; - Self { basin } + Self { basin: basin.0 } } } @@ -640,15 +641,15 @@ impl From for api::DeleteStreamRequest { #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] #[derive(Debug, Clone)] pub struct ReconfigureBasinRequest { - pub basin: String, + pub basin: BasinName, pub config: Option, pub mask: Option>, } impl ReconfigureBasinRequest { - pub fn new(basin: impl Into) -> Self { + pub fn new(basin: BasinName) -> Self { Self { - basin: basin.into(), + basin, config: None, mask: None, } @@ -678,7 +679,7 @@ impl TryFrom for api::ReconfigureBasinRequest { mask, } = value; Ok(Self { - basin, + basin: basin.0, config: config.map(TryInto::try_into).transpose()?, mask: mask.map(|paths| prost_types::FieldMask { paths }), }) @@ -1160,3 +1161,60 @@ impl TryFrom for ReadOutput { output.try_into() } } + +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] +#[derive(Debug, Clone)] +pub struct BasinName(String); + +impl AsRef for BasinName { + fn as_ref(&self) -> &str { + &self.0 + } +} + +impl Deref for BasinName { + type Target = str; + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl TryFrom for BasinName { + type Error = ConvertError; + + fn try_from(name: String) -> Result { + if name.len() < 8 || name.len() > 48 { + return Err("Basin name must be between 8 and 48 characters in length".into()); + } + + static BASIN_NAME_REGEX: OnceLock = OnceLock::new(); + let regex = BASIN_NAME_REGEX.get_or_init(|| { + Regex::new(r"^[a-z0-9]([a-z0-9-]*[a-z0-9])?$") + .expect("Failed to compile basin name regex") + }); + + if !regex.is_match(&name) { + return Err( + "Basin name must comprise lowercase letters, numbers, and hyphens. \ + It cannot begin or end with a hyphen." + .into(), + ); + } + + Ok(Self(name)) + } +} + +impl FromStr for BasinName { + type Err = ConvertError; + + fn from_str(s: &str) -> Result { + s.to_string().try_into() + } +} + +impl std::fmt::Display for BasinName { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str(&self.0) + } +}