Skip to content

Commit

Permalink
.
Browse files Browse the repository at this point in the history
  • Loading branch information
infiniteregrets committed Nov 12, 2024
1 parent c2aab4f commit df856eb
Show file tree
Hide file tree
Showing 4 changed files with 160 additions and 84 deletions.
1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,3 @@ tokio = { version = "*", features = ["full"] }
[features]
serde = ["dep:serde"]
connector = ["dep:hyper", "dep:hyper-util", "dep:tower-service"]

195 changes: 115 additions & 80 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,25 +145,9 @@ impl HostEndpoints {
}
}

#[cfg(not(feature = "connector"))]
#[doc(hidden)]
macro_rules! generic {
($b:tt, $t:ty) => {
$b
};
}

#[cfg(feature = "connector")]
#[doc(hidden)]
macro_rules! generic {
($b:tt, $t:ty) => {
$b<$t>
};
}

/// Client configuration to be used to connect with the host.
#[derive(Debug, Clone)]
pub struct ClientConfig<#[cfg(feature = "connector")] U> {
pub struct ClientConfig {
/// Auth token for the client.
pub token: SecretString,
/// Host URI to connect with.
Expand All @@ -177,33 +161,19 @@ pub struct ClientConfig<#[cfg(feature = "connector")] U> {
pub request_timeout: Duration,
/// User agent to be used for the client.
pub user_agent: String,
#[cfg(feature = "connector")]
/// Connect with a custom connector.
pub connector: Option<U>,
}

impl<
#[cfg(feature = "connector")] U: tower_service::Service<
http::Uri,
Response: hyper::rt::Read + hyper::rt::Write + Send + Unpin,
Error: Into<Box<dyn std::error::Error + Send + Sync>>,
Future: Send + 'static,
> + Send
+ 'static,
> generic!(ClientConfig, U)
{
impl ClientConfig {
/// Construct a new client configuration with given auth token and other
/// defaults.
pub fn new(token: impl Into<String>) -> Self {
Self {
token: token.into().into(),
host_endpoint: HostEndpoints::default(),
connect_lazily: true,
connection_timeout: Duration::from_secs(3),
connection_timeout: Duration::from_secs(10),
request_timeout: Duration::from_secs(5),
user_agent: "s2-sdk-rust".to_string(),
#[cfg(feature = "connector")]
connector: None,
}
}

Expand Down Expand Up @@ -248,29 +218,17 @@ impl<
..self
}
}

#[cfg(feature = "connector")]
pub fn with_connector(self, connector: U) -> Self {
Self {
connector: Some(connector),
..self
}
}
}

/// The S2 client to interact with the API.
#[derive(Debug, Clone)]
pub struct Client<#[cfg(feature = "connector")] U: Clone> {
#[cfg(feature = "connector")]
inner: ClientInner<U>,
#[cfg(not(feature = "connector"))]
pub struct Client {
inner: ClientInner,
}

impl<#[cfg(feature = "connector")] U: Clone> generic!(Client, U) {
impl Client {
async fn connect_inner(
#[cfg(feature = "connector")] config: ClientConfig<U>,
#[cfg(not(feature = "connector"))] config: ClientConfig,
config: ClientConfig,
force_lazy_connection: bool,
) -> Result<Self, ConnectError> {
Ok(Self {
Expand All @@ -279,18 +237,31 @@ impl<#[cfg(feature = "connector")] U: Clone> generic!(Client, U) {
}

/// Connect the client with the S2 API.
pub async fn connect(
#[cfg(feature = "connector")] config: ClientConfig<U>,
#[cfg(not(feature = "connector"))] config: ClientConfig,
) -> Result<Self, ConnectError> {
pub async fn connect(config: ClientConfig) -> Result<Self, ConnectError> {
Self::connect_inner(config, /* force_lazy_connection = */ false).await
}

#[cfg(feature = "connector")]
pub async fn connect_with_connector<U>(
config: ClientConfig,
connector: U,
) -> Result<Self, ConnectError>
where
U: tower_service::Service<http::Uri> + Send + 'static,
U::Response: hyper::rt::Read + hyper::rt::Write + Send + Unpin,
U::Future: Send,
U::Error: std::error::Error + Send + Sync + 'static,
{
Ok(Self {
inner: ClientInner::connect_cell_with_connector(config, connector).await?,
})
}

/// Get the client to interact with the S2 basin service API.
pub async fn basin_client(
&self,
basin: impl Into<String>,
) -> Result<generic!(BasinClient, U), ConnectError> {
) -> Result<BasinClient, ConnectError> {
Ok(BasinClient {
inner: self
.inner
Expand Down Expand Up @@ -367,18 +338,14 @@ impl<#[cfg(feature = "connector")] U: Clone> generic!(Client, U) {

/// Client to interact with the S2 basin service API.
#[derive(Debug, Clone)]
pub struct BasinClient<#[cfg(feature = "connector")] U: Clone> {
#[cfg(feature = "connector")]
inner: ClientInner<U>,
#[cfg(not(feature = "connector"))]
pub struct BasinClient {
inner: ClientInner,
}

impl<#[cfg(feature = "connector")] U: Clone> generic!(BasinClient, U) {
impl BasinClient {
/// Connect the client with the S2 basin service API.
pub async fn connect(
#[cfg(feature = "connector")] config: ClientConfig<U>,
#[cfg(not(feature = "connector"))] config: ClientConfig,
config: ClientConfig,
basin: impl Into<String>,
) -> Result<Self, ConnectError> {
// Since we're directly trying to connect to the basin, force lazy
Expand All @@ -390,8 +357,24 @@ impl<#[cfg(feature = "connector")] U: Clone> generic!(BasinClient, U) {
client.basin_client(basin).await
}

#[cfg(feature = "connector")]
pub async fn connect_with_connector<U>(
config: ClientConfig,
basin: impl Into<String>,
connector: U,
) -> Result<Self, ConnectError>
where
U: tower_service::Service<http::Uri> + Send + 'static,
U::Response: hyper::rt::Read + hyper::rt::Write + Send + Unpin,
U::Future: Send,
U::Error: std::error::Error + Send + Sync + 'static,
{
let client = Client::connect_with_connector(config, connector).await?;
client.basin_client(basin).await
}

/// Get the client to interact with the S2 stream service API.
pub fn stream_client(&self, stream: impl Into<String>) -> generic!(StreamClient, U) {
pub fn stream_client(&self, stream: impl Into<String>) -> StreamClient {
StreamClient {
inner: self.inner.clone(),
stream: stream.into(),
Expand Down Expand Up @@ -466,19 +449,15 @@ impl<#[cfg(feature = "connector")] U: Clone> generic!(BasinClient, U) {

/// Client to interact with the S2 stream service API.
#[derive(Debug, Clone)]
pub struct StreamClient<#[cfg(feature = "connector")] U: Clone> {
#[cfg(feature = "connector")]
inner: ClientInner<U>,
#[cfg(not(feature = "connector"))]
pub struct StreamClient {
inner: ClientInner,
stream: String,
}

impl<#[cfg(feature = "connector")] U: Clone> generic!(StreamClient, U) {
impl StreamClient {
/// Connect the client with the S2 stream service API.
pub async fn connect(
#[cfg(feature = "connector")] config: ClientConfig<U>,
#[cfg(not(feature = "connector"))] config: ClientConfig,
config: ClientConfig,
basin: impl Into<String>,
stream: impl Into<String>,
) -> Result<Self, ConnectError> {
Expand All @@ -487,6 +466,24 @@ impl<#[cfg(feature = "connector")] U: Clone> generic!(StreamClient, U) {
.map(|client| client.stream_client(stream))
}

#[cfg(feature = "connector")]
pub async fn connect_with_connector<U>(
config: ClientConfig,
basin: impl Into<String>,
stream: impl Into<String>,
connector: U,
) -> Result<Self, ConnectError>
where
U: tower_service::Service<http::Uri> + Send + 'static,
U::Response: hyper::rt::Read + hyper::rt::Write + Send + Unpin,
U::Future: Send,
U::Error: std::error::Error + Send + Sync + 'static,
{
BasinClient::connect_with_connector(config, basin, connector)
.await
.map(|client| client.stream_client(stream))
}

#[sync_docs]
pub async fn check_tail(&self) -> Result<u64, ServiceError<CheckTailError>> {
self.inner
Expand Down Expand Up @@ -563,26 +560,36 @@ impl<#[cfg(feature = "connector")] U: Clone> generic!(StreamClient, U) {
}

#[derive(Debug, Clone)]
struct ClientInner<#[cfg(feature = "connector")] U: Clone> {
struct ClientInner {
channel: Channel,
basin: Option<String>,

#[cfg(feature = "connector")]
config: ClientConfig<U>,
#[cfg(not(feature = "connector"))]
config: ClientConfig,
}

impl<#[cfg(feature = "connector")] U: Clone> generic!(ClientInner, U) {
impl ClientInner {
async fn connect_cell(
#[cfg(feature = "connector")] config: ClientConfig<U>,
#[cfg(not(feature = "connector"))] config: ClientConfig,
config: ClientConfig,
force_lazy_connection: bool,
) -> Result<Self, ConnectError> {
let cell_endpoint = config.host_endpoint.cell.clone();
Self::connect(config, cell_endpoint, force_lazy_connection).await
}

#[cfg(feature = "connector")]
async fn connect_cell_with_connector<U>(
config: ClientConfig,
connector: U,
) -> Result<Self, ConnectError>
where
U: tower_service::Service<http::Uri> + Send + 'static,
U::Response: hyper::rt::Read + hyper::rt::Write + Send + Unpin,
U::Future: Send,
U::Error: std::error::Error + Send + Sync + 'static,
{
let cell_endpoint = config.host_endpoint.cell.clone();
Self::connect_with_connector(config, cell_endpoint, connector).await
}

async fn connect_basin(
&self,
basin: impl Into<String>,
Expand All @@ -604,8 +611,7 @@ impl<#[cfg(feature = "connector")] U: Clone> generic!(ClientInner, U) {
}

async fn connect(
#[cfg(feature = "connector")] config: ClientConfig<U>,
#[cfg(not(feature = "connector"))] config: ClientConfig,
config: ClientConfig,
endpoint: Authority,
force_lazy_connection: bool,
) -> Result<Self, ConnectError> {
Expand All @@ -632,14 +638,43 @@ impl<#[cfg(feature = "connector")] U: Clone> generic!(ClientInner, U) {
})
}

async fn send<T: ServiceRequest>(
#[cfg(feature = "connector")]
async fn connect_with_connector<U>(
config: ClientConfig,
endpoint: Authority,
connector: U,
) -> Result<Self, ConnectError>
where
U: tower_service::Service<http::Uri> + Send + 'static,
U::Response: hyper::rt::Read + hyper::rt::Write + Send + Unpin,
U::Future: Send,
U::Error: std::error::Error + Send + Sync + 'static,
{
let endpoint = format!("http://{endpoint}")
.parse::<Endpoint>()?
.user_agent(config.user_agent.clone())?
.http2_adaptive_window(true)
.keep_alive_timeout(Duration::from_secs(5))
.http2_keep_alive_interval(Duration::from_secs(5))
.connect_timeout(config.connection_timeout)
.timeout(config.request_timeout);

let channel = endpoint.connect_with_connector(connector).await.unwrap();
Ok(Self {
channel,
basin: None,
config,
})
}

async fn send<T: ServiceRequest + std::fmt::Debug>(
&self,
service_req: T,
) -> Result<T::Response, ServiceError<T::Error>> {
send_request(service_req, &self.config.token, self.basin.as_deref()).await
}

async fn send_retryable<T: RetryableRequest>(
async fn send_retryable<T: RetryableRequest + std::fmt::Debug>(
&self,
service_req: T,
) -> Result<T::Response, ServiceError<T::Error>> {
Expand Down
11 changes: 8 additions & 3 deletions src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,24 @@ pub enum ServiceError<T: std::error::Error> {
NotSupported(String),
#[error("User not authenticated: {0}")]
Unauthenticated(String),
#[error("Unavailable: {0}")]
#[error("Deadline exceeded: {0}")]
Unavailable(String),
#[error("Aborted: {0}")]
Aborted(String),
#[error("Cancelled: {0}")]
Cancelled(String),
#[error("{0}")]
Unknown(String),
#[error(transparent)]
Remote(T),
}

pub async fn send_request<T: ServiceRequest>(
pub async fn send_request<T: ServiceRequest + std::fmt::Debug>(
mut service: T,
token: &SecretString,
basin: Option<&str>,
) -> Result<T::Response, ServiceError<T::Error>> {
let req = prepare_request(&mut service, token, basin).map_err(ServiceError::Convert)?;

match service.send(req).await {
Ok(resp) => service.parse_response(resp).map_err(ServiceError::Convert),
Err(status) => match status.code() {
Expand All @@ -51,6 +54,8 @@ pub async fn send_request<T: ServiceRequest>(
tonic::Code::Unavailable => {
Err(ServiceError::Unavailable(status.message().to_string()))
}
tonic::Code::Aborted => Err(ServiceError::Aborted(status.message().to_string())),
tonic::Code::Cancelled => Err(ServiceError::Cancelled(status.message().to_string())),
_ => match service.parse_status(&status) {
Ok(resp) => Ok(resp),
Err(None) => Err(ServiceError::Unknown(status.message().to_string())),
Expand Down
Loading

0 comments on commit df856eb

Please sign in to comment.