diff --git a/examples/basic.rs b/examples/basic.rs index 8d20095..b1e765f 100644 --- a/examples/basic.rs +++ b/examples/basic.rs @@ -1,8 +1,9 @@ use std::time::Duration; use futures::StreamExt; +use http::uri::Authority; use streamstore::{ - client::{Client, ClientConfig, HostCloud}, + client::{Client, ClientConfig, HostCloud, HostEndpoint}, service_error::{CreateBasinError, CreateStreamError, ServiceError}, streams::AppendRecordStream, types::{ @@ -11,12 +12,29 @@ use streamstore::{ }, }; +fn host_endpoint_from_env() -> HostEndpoint { + let cloud = HostCloud::default(); + + fn endpoint_from_env(env: &str) -> Option { + std::env::var(env).ok().and_then(|e| e.parse().ok()) + } + + let cell_endpoint = + endpoint_from_env("S2_CELL_ENDPOINT").unwrap_or_else(|| cloud.cell_endpoint()); + let basin_zone = endpoint_from_env("S2_BASIN_ZONE").or_else(|| cloud.basin_zone()); + + HostEndpoint { + cell_endpoint, + basin_zone, + } +} + #[tokio::main] async fn main() { let token = std::env::var("S2_AUTH_TOKEN").unwrap(); let config = ClientConfig::new(token) - .with_host_uri(HostCloud::Local) + .with_host_endpoint(host_endpoint_from_env()) .with_request_timeout(Duration::from_secs(10)); println!("Connecting with {config:#?}"); diff --git a/src/client.rs b/src/client.rs index 64c71b8..1123794 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,7 +1,7 @@ -use std::time::Duration; +use std::{fmt::Display, str::FromStr, time::Duration}; use backon::{ConstantBuilder, Retryable}; -use http::{uri::Authority, Uri}; +use http::uri::Authority; use secrecy::SecretString; use sync_docs::sync_docs; use tonic::transport::{Channel, ClientTlsConfig, Endpoint}; @@ -46,74 +46,86 @@ use crate::{ /// ``` #[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] pub enum HostCloud { - /// Localhost (to be used for testing). - #[default] - Local, /// S2 hosted on AWS. + #[default] Aws, } -impl From for HostUri { - fn from(value: HostCloud) -> Self { - match value { - HostCloud::Local => HostUri { - global: std::env::var("S2_FRONTEND_AUTHORITY") - .expect("S2_FRONTEND_AUTHORITY required") - .try_into() - .unwrap(), - cell: None, - prefix_host_with_basin: false, - }, - HostCloud::Aws => todo!("prod aws uris"), +impl HostCloud { + const AWS: &str = "aws"; + + fn as_str(&self) -> &'static str { + match self { + Self::Aws => Self::AWS, } } + + pub fn cell_endpoint(&self) -> Authority { + format!("{}.s2.dev", self.as_str()).parse().unwrap() + } + + pub fn basin_zone(&self) -> Option { + Some(format!("b.{}.s2.dev", self.as_str()).parse().unwrap()) + } } -/// URIs for the hosted S2 environment. -#[derive(Debug, Clone)] -pub struct HostUri { - /// Global URI to connect to. - pub global: Uri, - /// Cell specific URI (for basin and stream service requests). - /// - /// Client uses the same URI as the global URI if cell URI is absent. - pub cell: Option, - /// Whether the cell URI host should be prefixed with the basin name or not. - /// - /// If set to true, the cell URI `cell.aws.s2.dev` would be prefixed with - /// the basin name and set to `.cell.aws.s2.dev`. - pub prefix_host_with_basin: bool, +impl Display for HostCloud { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str(self.as_str()) + } } -impl Default for HostUri { - fn default() -> Self { - HostCloud::default().into() +impl FromStr for HostCloud { + type Err = InvalidHostCloudError; + + fn from_str(s: &str) -> Result { + if s.eq_ignore_ascii_case(Self::AWS) { + Ok(Self::Aws) + } else { + Err(InvalidHostCloudError) + } } } -impl HostUri { - /// Construct a new host URI with the given global URI. - pub fn new(global_uri: impl Into) -> Self { +#[derive(Debug, Clone, Copy, thiserror::Error)] +#[error("invalid host cloud")] +pub struct InvalidHostCloudError; + +impl From for HostEndpoint { + fn from(value: HostCloud) -> Self { Self { - global: global_uri.into(), - cell: None, - prefix_host_with_basin: false, + cell_endpoint: value.cell_endpoint(), + basin_zone: value.basin_zone(), } } +} + +/// Endpoints for the hosted S2 environment. +#[derive(Debug, Clone)] +pub struct HostEndpoint { + pub cell_endpoint: Authority, + pub basin_zone: Option, +} - /// Construct from an existing host URI with the given cell URI. - pub fn with_cell_uri(self, cell_uri: impl Into) -> Self { +impl Default for HostEndpoint { + fn default() -> Self { + HostCloud::default().into() + } +} + +impl HostEndpoint { + /// Construct a new host endpoint with the given cell endpoint.. + pub fn new(cell_endpoint: impl Into) -> Self { Self { - cell: Some(cell_uri.into()), - ..self + cell_endpoint: cell_endpoint.into(), + basin_zone: None, } } - /// Construct from an existing host URI with the new - /// `prefix_host_with_basin` configuration. - pub fn with_prefix_host_with_basin(self, prefix_host_with_basin: bool) -> Self { + /// Construct from an existing host endpoint with the given basin zone. + pub fn with_basin_zone(self, basin_zone: impl Into) -> Self { Self { - prefix_host_with_basin, + basin_zone: Some(basin_zone.into()), ..self } } @@ -125,7 +137,7 @@ pub struct ClientConfig { /// Auth token for the client. pub token: SecretString, /// Host URI to connect with. - pub host_uri: HostUri, + pub host_endpoint: HostEndpoint, /// Should the connection be lazy, i.e., only be made when making the very /// first request. pub connect_lazily: bool, @@ -143,7 +155,7 @@ impl ClientConfig { pub fn new(token: impl Into) -> Self { Self { token: token.into().into(), - host_uri: HostUri::default(), + host_endpoint: HostEndpoint::default(), connect_lazily: true, connection_timeout: Duration::from_secs(3), request_timeout: Duration::from_secs(5), @@ -152,9 +164,9 @@ impl ClientConfig { } /// Construct from an existing configuration with the new host URIs. - pub fn with_host_uri(self, host_uri: impl Into) -> Self { + pub fn with_host_endpoint(self, host_endpoint: impl Into) -> Self { Self { - host_uri: host_uri.into(), + host_endpoint: host_endpoint.into(), ..self } } @@ -206,7 +218,7 @@ impl Client { force_lazy_connection: bool, ) -> Result { Ok(Self { - inner: ClientInner::connect_global(config, force_lazy_connection).await?, + inner: ClientInner::connect_cell(config, force_lazy_connection).await?, }) } @@ -220,7 +232,7 @@ impl Client { Ok(BasinClient { inner: self .inner - .connect_cell(basin, /* force_lazy_connection = */ false) + .connect_basin(basin, /* force_lazy_connection = */ false) .await?, }) } @@ -307,7 +319,7 @@ impl BasinClient { // connection with the global client so we don't end up making 2 // connections for connecting with the basin client directly (given the // cell URI and global URIs are different). - let force_lazy_connection = config.host_uri.cell.is_some(); + let force_lazy_connection = config.host_endpoint.basin_zone.is_some(); let client = Client::connect_inner(config, force_lazy_connection).await?; client.basin_client(basin).await } @@ -488,39 +500,31 @@ struct ClientInner { } impl ClientInner { - async fn connect_global( + async fn connect_cell( config: ClientConfig, force_lazy_connection: bool, ) -> Result { - let uri = config.host_uri.global.clone(); - Self::connect(config, uri, force_lazy_connection).await + let cell_endpoint = config.host_endpoint.cell_endpoint.clone(); + Self::connect(config, cell_endpoint, force_lazy_connection).await } - async fn connect_cell( + async fn connect_basin( &self, basin: impl Into, force_lazy_connection: bool, ) -> Result { let basin = basin.into(); - match self.config.host_uri.cell.clone() { - Some(uri) if self.config.host_uri.prefix_host_with_basin => { - let host = uri.host().ok_or(ClientError::MissingHost)?; - let port = uri.port_u16().map_or(String::new(), |p| format!(":{}", p)); - let authority: Authority = format!("{basin}.{host}{port}").parse()?; - let mut uri_parts = uri.into_parts(); - uri_parts.authority = Some(authority); - + match self.config.host_endpoint.basin_zone.clone() { + Some(endpoint) => { + let basin_zone_endpoint: Authority = format!("{basin}.{endpoint}").parse()?; ClientInner::connect( self.config.clone(), - Uri::from_parts(uri_parts).expect("invalid uri"), + basin_zone_endpoint, force_lazy_connection, ) .await } - Some(uri) => { - ClientInner::connect(self.config.clone(), uri, force_lazy_connection).await - } None => Ok(Self { basin: Some(basin), ..self.clone() @@ -530,11 +534,11 @@ impl ClientInner { async fn connect( config: ClientConfig, - uri: Uri, + endpoint: Authority, force_lazy_connection: bool, ) -> Result { - let endpoint: Endpoint = uri.clone().into(); - let endpoint = endpoint + let endpoint = format!("https://{endpoint}") + .parse::()? .user_agent(config.user_agent.clone())? .tls_config( ClientTlsConfig::default()