Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Remove ConnectionError in favour of pre-processing #68

Merged
merged 1 commit into from
Nov 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ hyper = "1.5.0"
hyper-util = "0.1.10"
prost = "0.13.3"
prost-types = "0.13.3"
regex = "1.11.1"
secrecy = "0.8.0"
serde = { version = "1.0.214", optional = true, features = ["derive"] }
sync_docs = { path = "sync_docs" }
Expand Down
15 changes: 8 additions & 7 deletions examples/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@ use streamstore::{
batching::AppendRecordsBatchingStream,
client::{Client, ClientConfig, ClientError, HostEndpoints},
types::{
AppendInput, AppendRecord, CreateBasinRequest, CreateStreamRequest, DeleteBasinRequest,
DeleteStreamRequest, ListBasinsRequest, ListStreamsRequest, ReadSessionRequest,
AppendInput, AppendRecord, BasinName, CreateBasinRequest, CreateStreamRequest,
DeleteBasinRequest, DeleteStreamRequest, ListBasinsRequest, ListStreamsRequest,
ReadSessionRequest,
},
};

Expand All @@ -22,11 +23,11 @@ async fn main() {

println!("Connecting with {config:#?}");

let client = Client::new(config).unwrap();
let client = Client::new(config);

let basin = "s2-sdk-example-basin";
let basin: BasinName = "s2-sdk-example-basin".parse().unwrap();

let create_basin_req = CreateBasinRequest::new(basin);
let create_basin_req = CreateBasinRequest::new(basin.clone());

match client.create_basin(create_basin_req).await {
Ok(created_basin) => {
Expand Down Expand Up @@ -57,7 +58,7 @@ async fn main() {
Err(err) => exit_with_err(err),
};

match client.get_basin_config(basin).await {
match client.get_basin_config(basin.clone()).await {
Ok(config) => {
println!("Basin config: {config:#?}");
}
Expand All @@ -68,7 +69,7 @@ async fn main() {

let create_stream_req = CreateStreamRequest::new(stream);

let basin_client = client.basin_client(basin).unwrap();
let basin_client = client.basin_client(basin.clone());

match basin_client.create_stream(create_stream_req).await {
Ok(()) => {
Expand Down
112 changes: 45 additions & 67 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::{fmt::Display, str::FromStr, time::Duration};

use backon::{BackoffBuilder, ConstantBuilder, Retryable};
use futures::StreamExt;
use http::uri::Authority;
use http::{uri::Authority, HeaderValue};
use hyper_util::client::legacy::connect::HttpConnector;
use secrecy::SecretString;
use sync_docs::sync_docs;
Expand Down Expand Up @@ -166,8 +166,8 @@ impl ParseError {
/// Endpoints for the hosted S2 environment.
#[derive(Debug, Clone)]
pub struct HostEndpoints {
pub cell: Authority,
pub basin_zone: Option<Authority>,
cell: Authority,
basin_zone: Option<Authority>,
}

impl From<HostCloud> for HostEndpoints {
Expand Down Expand Up @@ -253,7 +253,7 @@ pub struct ClientConfig {
/// Timeout for a particular request.
pub request_timeout: Duration,
/// User agent to be used for the client.
pub user_agent: String,
pub user_agent: HeaderValue,
/// URI scheme to use to connect.
#[cfg(feature = "connector")]
pub uri_scheme: http::uri::Scheme,
Expand All @@ -272,7 +272,7 @@ impl ClientConfig {
host_endpoints: HostEndpoints::default(),
connection_timeout: Duration::from_secs(3),
request_timeout: Duration::from_secs(5),
user_agent: "s2-sdk-rust".to_string(),
user_agent: "s2-sdk-rust".parse().unwrap(),
#[cfg(feature = "connector")]
uri_scheme: http::uri::Scheme::HTTPS,
retry_backoff_duration: Duration::from_millis(100),
Expand Down Expand Up @@ -306,7 +306,7 @@ impl ClientConfig {
}

/// Construct from an existing configuration with the new user agent.
pub fn with_user_agent(self, user_agent: impl Into<String>) -> Self {
pub fn with_user_agent(self, user_agent: impl Into<HeaderValue>) -> Self {
Self {
user_agent: user_agent.into(),
..self
Expand Down Expand Up @@ -355,33 +355,30 @@ pub struct Client {

impl Client {
/// Create the client to connect with the S2 API.
pub fn new(config: ClientConfig) -> Result<Self, ConnectionError> {
Ok(Self {
inner: ClientInner::new_cell(config, DEFAULT_HTTP_CONNECTOR)?,
})
pub fn new(config: ClientConfig) -> Self {
Self {
inner: ClientInner::new_cell(config, DEFAULT_HTTP_CONNECTOR),
}
}

#[cfg(feature = "connector")]
pub fn new_with_connector<C>(
config: ClientConfig,
connector: C,
) -> Result<Self, ConnectionError>
pub fn new_with_connector<C>(config: ClientConfig, connector: C) -> Self
where
C: tower_service::Service<http::Uri> + Send + 'static,
C::Response: hyper::rt::Read + hyper::rt::Write + Send + Unpin,
C::Future: Send,
C::Error: std::error::Error + Send + Sync + 'static,
{
Ok(Self {
inner: ClientInner::new_cell(config, Some(connector))?,
})
Self {
inner: ClientInner::new_cell(config, Some(connector)),
}
}

/// Get the client to interact with the S2 basin service API.
pub fn basin_client(&self, basin: impl Into<String>) -> Result<BasinClient, ConnectionError> {
Ok(BasinClient {
inner: self.inner.new_basin(basin)?,
})
pub fn basin_client(&self, basin: types::BasinName) -> BasinClient {
BasinClient {
inner: self.inner.new_basin(basin),
}
}

#[sync_docs]
Expand Down Expand Up @@ -423,7 +420,7 @@ impl Client {
#[sync_docs]
pub async fn get_basin_config(
&self,
basin: impl Into<String>,
basin: types::BasinName,
) -> Result<types::BasinConfig, ClientError> {
self.inner
.send_retryable(GetBasinConfigServiceRequest::new(
Expand Down Expand Up @@ -455,25 +452,23 @@ pub struct BasinClient {

impl BasinClient {
/// Create the client to connect with the S2 basin service API.
pub fn new(config: ClientConfig, basin: impl Into<String>) -> Result<Self, ConnectionError> {
let client = Client::new(config)?;
client.basin_client(basin)
pub fn new(config: ClientConfig, basin: types::BasinName) -> Self {
Client::new(config).basin_client(basin)
}

#[cfg(feature = "connector")]
pub fn new_with_connector<C>(
config: ClientConfig,
basin: impl Into<String>,
basin: types::BasinName,
connector: C,
) -> Result<Self, ConnectionError>
) -> Self
where
C: tower_service::Service<http::Uri> + Send + 'static,
C::Response: hyper::rt::Read + hyper::rt::Write + Send + Unpin,
C::Future: Send,
C::Error: std::error::Error + Send + Sync + 'static,
{
let client = Client::new_with_connector(config, connector)?;
client.basin_client(basin)
Client::new_with_connector(config, connector).basin_client(basin)
}

/// Get the client to interact with the S2 stream service API.
Expand Down Expand Up @@ -553,29 +548,24 @@ pub struct StreamClient {

impl StreamClient {
/// Create the client to connect with the S2 stream service API.
pub fn new(
config: ClientConfig,
basin: impl Into<String>,
stream: impl Into<String>,
) -> Result<Self, ConnectionError> {
BasinClient::new(config, basin).map(|client| client.stream_client(stream))
pub fn new(config: ClientConfig, basin: types::BasinName, stream: impl Into<String>) -> Self {
BasinClient::new(config, basin).stream_client(stream)
}

#[cfg(feature = "connector")]
pub fn new_with_connector<C>(
config: ClientConfig,
basin: impl Into<String>,
basin: types::BasinName,
stream: impl Into<String>,
connector: C,
) -> Result<Self, ConnectionError>
) -> Self
where
C: tower_service::Service<http::Uri> + Send + 'static,
C::Response: hyper::rt::Read + hyper::rt::Write + Send + Unpin,
C::Future: Send,
C::Error: std::error::Error + Send + Sync + 'static,
{
BasinClient::new_with_connector(config, basin, connector)
.map(|client| client.stream_client(stream))
BasinClient::new_with_connector(config, basin, connector).stream_client(stream)
}

#[sync_docs]
Expand Down Expand Up @@ -654,12 +644,12 @@ impl StreamClient {
#[derive(Debug, Clone)]
struct ClientInner {
channel: Channel,
basin: Option<String>,
basin: Option<types::BasinName>,
config: ClientConfig,
}

impl ClientInner {
fn new_cell<C>(config: ClientConfig, connector: Option<C>) -> Result<Self, ConnectionError>
fn new_cell<C>(config: ClientConfig, connector: Option<C>) -> Self
where
C: tower_service::Service<http::Uri> + Send + 'static,
C::Response: hyper::rt::Read + hyper::rt::Write + Send + Unpin,
Expand All @@ -670,26 +660,20 @@ impl ClientInner {
Self::new(config, cell_endpoint, connector)
}

fn new_basin(&self, basin: impl Into<String>) -> Result<Self, ConnectionError> {
let basin = basin.into();

fn new_basin(&self, basin: types::BasinName) -> Self {
match self.config.host_endpoints.basin_zone.clone() {
Some(endpoint) => {
let basin_endpoint: Authority = format!("{basin}.{endpoint}").parse()?;
let basin_endpoint: Authority = format!("{basin}.{endpoint}").parse().unwrap();
ClientInner::new(self.config.clone(), basin_endpoint, DEFAULT_HTTP_CONNECTOR)
}
None => Ok(Self {
None => Self {
basin: Some(basin),
..self.clone()
}),
},
}
}

fn new<C>(
config: ClientConfig,
endpoint: Authority,
connector: Option<C>,
) -> Result<Self, ConnectionError>
fn new<C>(config: ClientConfig, endpoint: Authority, connector: Option<C>) -> Self
where
C: tower_service::Service<http::Uri> + Send + 'static,
C::Response: hyper::rt::Read + hyper::rt::Write + Send + Unpin,
Expand All @@ -702,14 +686,17 @@ impl ClientInner {
let scheme = config.uri_scheme.as_str();

let endpoint = format!("{scheme}://{endpoint}")
.parse::<Endpoint>()?
.user_agent(config.user_agent.clone())?
.parse::<Endpoint>()
.unwrap()
.user_agent(config.user_agent.clone())
.unwrap()
Comment on lines +691 to +692
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately need to unwrap since the arg is TryInto<HeaderValue> which for HeaderValue would never fail.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you switch as many unwrap()s as reasonable to expect() with a very brief message about why it is safe?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops saw this a bit late. Will update.

.http2_adaptive_window(true)
.tls_config(
ClientTlsConfig::default()
.with_webpki_roots()
.assume_http2(true),
)?
)
.unwrap()
.connect_timeout(config.connection_timeout)
.timeout(config.request_timeout);

Expand All @@ -723,15 +710,15 @@ impl ClientInner {
endpoint.connect_lazy()
};

Ok(Self {
Self {
channel,
basin: None,
config,
})
}
}

async fn send<T: ServiceRequest>(&self, service_req: T) -> Result<T::Response, ClientError> {
send_request(service_req, &self.config.token, self.basin.as_deref()).await
send_request(service_req, &self.config.token, self.basin.as_ref()).await
}

async fn send_retryable_with_backoff<T: RetryableRequest>(
Expand Down Expand Up @@ -775,15 +762,6 @@ impl ClientInner {
}
}

/// Error connecting to S2 endpoint.
#[derive(Debug, thiserror::Error)]
pub enum ConnectionError {
#[error(transparent)]
TonicTransportError(#[from] tonic::transport::Error),
#[error(transparent)]
UriParseError(#[from] http::uri::InvalidUri),
}

fn read_resumption_stream(
mut request: ReadSessionServiceRequest,
mut responses: ServiceStreamingResponse<ReadSessionStreamingResponse>,
Expand Down
9 changes: 6 additions & 3 deletions src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use crate::{client::ClientError, types};
pub async fn send_request<T: ServiceRequest>(
mut service: T,
token: &SecretString,
basin: Option<&str>,
basin: Option<&types::BasinName>,
) -> Result<T::Response, ClientError> {
let req = prepare_request(&mut service, token, basin)?;
match service.send(req).await {
Expand All @@ -29,7 +29,7 @@ pub async fn send_request<T: ServiceRequest>(
fn prepare_request<T: ServiceRequest>(
service: &mut T,
token: &SecretString,
basin: Option<&str>,
basin: Option<&types::BasinName>,
) -> Result<tonic::Request<T::ApiRequest>, types::ConvertError> {
let mut req = service.prepare_request()?;
add_authorization_header(req.metadata_mut(), token)?;
Expand All @@ -51,7 +51,10 @@ fn add_authorization_header(
Ok(())
}

fn add_basin_header(meta: &mut MetadataMap, basin: &str) -> Result<(), types::ConvertError> {
fn add_basin_header(
meta: &mut MetadataMap,
basin: &types::BasinName,
) -> Result<(), types::ConvertError> {
meta.insert(
"s2-basin",
basin
Expand Down
11 changes: 4 additions & 7 deletions src/service/account.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,15 +129,12 @@ impl RetryableRequest for DeleteBasinServiceRequest {
#[derive(Debug, Clone)]
pub struct GetBasinConfigServiceRequest {
client: AccountServiceClient<Channel>,
basin: String,
basin: types::BasinName,
}

impl GetBasinConfigServiceRequest {
pub fn new(client: AccountServiceClient<Channel>, basin: impl Into<String>) -> Self {
Self {
client,
basin: basin.into(),
}
pub fn new(client: AccountServiceClient<Channel>, basin: types::BasinName) -> Self {
Self { client, basin }
}
}

Expand All @@ -148,7 +145,7 @@ impl ServiceRequest for GetBasinConfigServiceRequest {

fn prepare_request(&mut self) -> Result<tonic::Request<Self::ApiRequest>, types::ConvertError> {
let req = api::GetBasinConfigRequest {
basin: self.basin.clone(),
basin: self.basin.to_string(),
};
Ok(req.into_request())
}
Expand Down
Loading