Skip to content

Commit

Permalink
fix: Update HostEndpoints::from_env with new spec (#58)
Browse files Browse the repository at this point in the history
Resolves: #46

Signed-off-by: Vaibhav Rabber <[email protected]>
  • Loading branch information
vrongmeal authored Nov 15, 2024
1 parent 0d142ee commit f7a6ca2
Showing 1 changed file with 140 additions and 54 deletions.
194 changes: 140 additions & 54 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,6 @@ use crate::{
const DEFAULT_HTTP_CONNECTOR: Option<HttpConnector> = None;

/// Cloud deployment to be used to connect the client with.
///
/// Can be used to create the client with default hosted URIs:
///
/// ```
/// # use streamstore::client::{ClientConfig, HostCloud};
/// let client_config = ClientConfig::new("<token>")
/// .with_host_uri(HostCloud::Aws);
/// ```
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum HostCloud {
/// S2 hosted on AWS.
Expand All @@ -57,14 +49,6 @@ impl HostCloud {
Self::Aws => Self::AWS,
}
}

pub fn cell_endpoint(&self) -> Authority {
format!("{}.s2.dev", self.as_str()).parse().unwrap()
}

pub fn basin_zone(&self) -> Option<Authority> {
Some(format!("b.{}.s2.dev", self.as_str()).parse().unwrap())
}
}

impl Display for HostCloud {
Expand All @@ -74,30 +58,109 @@ impl Display for HostCloud {
}

impl FromStr for HostCloud {
type Err = InvalidHostError;
type Err = ParseError;

fn from_str(s: &str) -> Result<Self, Self::Err> {
if s.eq_ignore_ascii_case(Self::AWS) {
Ok(Self::Aws)
} else {
Err(InvalidHostError(s.to_string()))
Err(ParseError::new("host", s))
}
}
}

#[derive(Debug, Clone, thiserror::Error)]
#[error("Invalid host: {0}")]
pub struct InvalidHostError(pub String);
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum HostEnv {
#[default]
Prod,
Staging,
Sandbox,
}

impl From<HostCloud> for HostEndpoints {
fn from(value: HostCloud) -> Self {
Self {
cell: value.cell_endpoint(),
basin_zone: value.basin_zone(),
impl HostEnv {
const PROD: &'static str = "prod";
const STAGING: &'static str = "staging";
const SANDBOX: &'static str = "sandbox";

fn as_str(&self) -> &'static str {
match self {
Self::Prod => Self::PROD,
Self::Staging => Self::STAGING,
Self::Sandbox => Self::SANDBOX,
}
}
}

impl Display for HostEnv {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(self.as_str())
}
}

impl FromStr for HostEnv {
type Err = ParseError;

fn from_str(s: &str) -> Result<Self, Self::Err> {
if s.eq_ignore_ascii_case(Self::PROD) {
Ok(Self::Prod)
} else if s.eq_ignore_ascii_case(Self::STAGING) {
Ok(Self::Staging)
} else if s.eq_ignore_ascii_case(Self::SANDBOX) {
Ok(Self::Sandbox)
} else {
Err(ParseError::new("env", s))
}
}
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum BasinZone {
CloudEnv,
Static,
}

impl BasinZone {
const CLOUD_ENV: &'static str = "cloud-env";
const STATIC: &'static str = "static";

fn as_str(&self) -> &'static str {
match self {
Self::CloudEnv => Self::CLOUD_ENV,
Self::Static => Self::STATIC,
}
}
}

impl Display for BasinZone {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(self.as_str())
}
}

impl FromStr for BasinZone {
type Err = ParseError;

fn from_str(s: &str) -> Result<Self, Self::Err> {
if s.eq_ignore_ascii_case(Self::CLOUD_ENV) {
Ok(Self::CloudEnv)
} else if s.eq_ignore_ascii_case(Self::STATIC) {
Ok(Self::Static)
} else {
Err(ParseError::new("basin zone", s))
}
}
}

#[derive(Debug, Clone, thiserror::Error)]
#[error("Invalid {0}: {1}")]
pub struct ParseError(String, String);

impl ParseError {
fn new(what: impl Into<String>, details: impl Display) -> Self {
Self(what.into(), details.to_string())
}
}

/// Endpoints for the hosted S2 environment.
#[derive(Debug, Clone)]
pub struct HostEndpoints {
Expand All @@ -107,39 +170,62 @@ pub struct HostEndpoints {

impl Default for HostEndpoints {
fn default() -> Self {
HostCloud::default().into()
Self::from_parts(HostCloud::default(), HostEnv::default(), None, None)
}
}

impl HostEndpoints {
pub fn from_env() -> Result<Self, InvalidHostError> {
pub fn from_env() -> Result<Self, ParseError> {
fn env_var<T>(
name: &str,
parse: impl FnOnce(&str) -> Result<T, InvalidHostError>,
) -> Result<Option<T>, InvalidHostError> {
parse: impl FnOnce(String) -> Result<T, ParseError>,
) -> Result<Option<T>, ParseError> {
match std::env::var(name) {
Ok(value) => Ok(Some(parse(&value)?)),
Ok(value) => Ok(Some(parse(value)?)),
Err(std::env::VarError::NotPresent) => Ok(None),
Err(std::env::VarError::NotUnicode(value)) => {
Err(InvalidHostError(value.to_string_lossy().to_string()))
}
Err(std::env::VarError::NotUnicode(value)) => Err(ParseError::new(
format!("{name} env var"),
value.to_string_lossy(),
)),
}
}
fn parse_authority(v: &str) -> Result<Authority, InvalidHostError> {
v.parse().map_err(|_| InvalidHostError(v.to_owned()))
}
let cloud = env_var("S2_CLOUD", HostCloud::from_str)?.unwrap_or(HostCloud::default());
let cell = env_var("S2_CELL", parse_authority)?;
let basin_zone = env_var("S2_BASIN_ZONE", parse_authority)?;
let endpoints = match (cell, basin_zone, cloud) {
(None, None, cloud) => cloud.into(),
(Some(cell), basin_zone, _) => Self { cell, basin_zone },
(None, Some(basin_zone), cloud) => Self {
cell: cloud.cell_endpoint(),
basin_zone: Some(basin_zone),
},

let cloud = env_var("S2_CLOUD", |s| HostCloud::from_str(&s))?.unwrap_or_default();
let env = env_var("S2_ENV", |s| HostEnv::from_str(&s))?.unwrap_or_default();
let basin_zone = env_var("S2_BASIN_ZONE", |s| BasinZone::from_str(&s))?;
let cell_id = env_var("S2_CELL_ID", Ok)?;

Ok(Self::from_parts(cloud, env, basin_zone, cell_id.as_deref()))
}

pub fn from_parts(
cloud: HostCloud,
env: HostEnv,
basin_zone: Option<BasinZone>,
cell_id: Option<&str>,
) -> Self {
let env_suffix = match env {
HostEnv::Prod => String::new(),
env => format!("-{env}"),
};
Ok(endpoints)

let (cell_endpoint, default_basin_zone) = match cell_id {
None => (format!("{cloud}.s2.dev"), BasinZone::CloudEnv),
Some(cell_id) => (
format!("{cell_id}.o{env_suffix}.{cloud}.s2.dev"),
BasinZone::Static,
),
};

let basin_endpoint = match basin_zone.unwrap_or(default_basin_zone) {
BasinZone::CloudEnv => Some(format!("b{env_suffix}.{cloud}.s2.dev")),
BasinZone::Static => None,
};

Self {
cell: cell_endpoint.parse().unwrap(),
basin_zone: basin_endpoint.map(|b| b.parse().unwrap()),
}
}
}

Expand All @@ -149,7 +235,7 @@ pub struct ClientConfig {
/// Auth token for the client.
pub token: SecretString,
/// Host URI to connect with.
pub host_endpoint: HostEndpoints,
pub host_endpoints: HostEndpoints,
/// Timeout for connecting/reconnecting.
pub connection_timeout: Duration,
/// Timeout for a particular request.
Expand All @@ -167,7 +253,7 @@ impl ClientConfig {
pub fn new(token: impl Into<String>) -> Self {
Self {
token: token.into().into(),
host_endpoint: HostEndpoints::default(),
host_endpoints: HostEndpoints::default(),
connection_timeout: Duration::from_secs(3),
request_timeout: Duration::from_secs(5),
user_agent: "s2-sdk-rust".to_string(),
Expand All @@ -179,7 +265,7 @@ impl ClientConfig {
/// Construct from an existing configuration with the new host URIs.
pub fn with_host_endpoint(self, host_endpoint: impl Into<HostEndpoints>) -> Self {
Self {
host_endpoint: host_endpoint.into(),
host_endpoints: host_endpoint.into(),
..self
}
}
Expand Down Expand Up @@ -542,14 +628,14 @@ impl ClientInner {
C::Future: Send,
C::Error: std::error::Error + Send + Sync + 'static,
{
let cell_endpoint = config.host_endpoint.cell.clone();
let cell_endpoint = config.host_endpoints.cell.clone();
Self::new(config, cell_endpoint, connector)
}

fn new_basin(&self, basin: impl Into<String>) -> Result<Self, ConnectionError> {
let basin = basin.into();

match self.config.host_endpoint.basin_zone.clone() {
match self.config.host_endpoints.basin_zone.clone() {
Some(endpoint) => {
let basin_endpoint: Authority = format!("{basin}.{endpoint}").parse()?;
ClientInner::new(self.config.clone(), basin_endpoint, DEFAULT_HTTP_CONNECTOR)
Expand Down Expand Up @@ -591,7 +677,7 @@ impl ClientInner {

let channel = if let Some(connector) = connector {
assert!(
config.host_endpoint.basin_zone.is_none(),
config.host_endpoints.basin_zone.is_none(),
"cannot connect with connector if basin zone is provided"
);
endpoint.connect_with_connector_lazy(connector)
Expand Down

0 comments on commit f7a6ca2

Please sign in to comment.