Skip to content

Commit

Permalink
feat: Redo endpoint logic
Browse files Browse the repository at this point in the history
Fixes: #39

Signed-off-by: Vaibhav Rabber <[email protected]>
  • Loading branch information
vrongmeal committed Nov 4, 2024
1 parent 326610d commit 7381edf
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 76 deletions.
22 changes: 20 additions & 2 deletions examples/basic.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use std::time::Duration;

use futures::StreamExt;
use http::uri::Authority;
use streamstore::{
client::{Client, ClientConfig, HostCloud},
client::{Client, ClientConfig, HostCloud, HostEndpoint},
service_error::{CreateBasinError, CreateStreamError, ServiceError},
streams::AppendRecordStream,
types::{
Expand All @@ -11,12 +12,29 @@ use streamstore::{
},
};

fn host_endpoint_from_env() -> HostEndpoint {
let cloud = HostCloud::default();

fn endpoint_from_env(env: &str) -> Option<Authority> {
std::env::var(env).ok().and_then(|e| e.parse().ok())
}

let cell_endpoint =
endpoint_from_env("S2_CELL_ENDPOINT").unwrap_or_else(|| cloud.cell_endpoint());
let basin_zone = endpoint_from_env("S2_BASIN_ZONE").or_else(|| cloud.basin_zone());

HostEndpoint {
cell_endpoint,
basin_zone,
}
}

#[tokio::main]
async fn main() {
let token = std::env::var("S2_AUTH_TOKEN").unwrap();

let config = ClientConfig::new(token)
.with_host_uri(HostCloud::Local)
.with_host_endpoint(host_endpoint_from_env())
.with_request_timeout(Duration::from_secs(10));

println!("Connecting with {config:#?}");
Expand Down
152 changes: 78 additions & 74 deletions src/client.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::time::Duration;
use std::{fmt::Display, str::FromStr, time::Duration};

use backon::{ConstantBuilder, Retryable};
use http::{uri::Authority, Uri};
use http::uri::Authority;
use secrecy::SecretString;
use sync_docs::sync_docs;
use tonic::transport::{Channel, ClientTlsConfig, Endpoint};
Expand Down Expand Up @@ -46,74 +46,86 @@ use crate::{
/// ```
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum HostCloud {
/// Localhost (to be used for testing).
#[default]
Local,
/// S2 hosted on AWS.
#[default]
Aws,
}

impl From<HostCloud> for HostUri {
fn from(value: HostCloud) -> Self {
match value {
HostCloud::Local => HostUri {
global: std::env::var("S2_FRONTEND_AUTHORITY")
.expect("S2_FRONTEND_AUTHORITY required")
.try_into()
.unwrap(),
cell: None,
prefix_host_with_basin: false,
},
HostCloud::Aws => todo!("prod aws uris"),
impl HostCloud {
const AWS: &str = "aws";

fn as_str(&self) -> &'static str {
match self {
Self::Aws => Self::AWS,
}
}

pub fn cell_endpoint(&self) -> Authority {
format!("{}.s2.dev", self.as_str()).parse().unwrap()
}

pub fn basin_zone(&self) -> Option<Authority> {
Some(format!("b.{}.s2.dev", self.as_str()).parse().unwrap())
}
}

/// URIs for the hosted S2 environment.
#[derive(Debug, Clone)]
pub struct HostUri {
/// Global URI to connect to.
pub global: Uri,
/// Cell specific URI (for basin and stream service requests).
///
/// Client uses the same URI as the global URI if cell URI is absent.
pub cell: Option<Uri>,
/// Whether the cell URI host should be prefixed with the basin name or not.
///
/// If set to true, the cell URI `cell.aws.s2.dev` would be prefixed with
/// the basin name and set to `<basin>.cell.aws.s2.dev`.
pub prefix_host_with_basin: bool,
impl Display for HostCloud {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(self.as_str())
}
}

impl Default for HostUri {
fn default() -> Self {
HostCloud::default().into()
impl FromStr for HostCloud {
type Err = InvalidHostCloudError;

fn from_str(s: &str) -> Result<Self, Self::Err> {
if s.eq_ignore_ascii_case(Self::AWS) {
Ok(Self::Aws)
} else {
Err(InvalidHostCloudError)
}
}
}

impl HostUri {
/// Construct a new host URI with the given global URI.
pub fn new(global_uri: impl Into<Uri>) -> Self {
#[derive(Debug, Clone, Copy, thiserror::Error)]
#[error("invalid host cloud")]
pub struct InvalidHostCloudError;

impl From<HostCloud> for HostEndpoint {
fn from(value: HostCloud) -> Self {
Self {
global: global_uri.into(),
cell: None,
prefix_host_with_basin: false,
cell_endpoint: value.cell_endpoint(),
basin_zone: value.basin_zone(),
}
}
}

/// Endpoints for the hosted S2 environment.
#[derive(Debug, Clone)]
pub struct HostEndpoint {
pub cell_endpoint: Authority,
pub basin_zone: Option<Authority>,
}

/// Construct from an existing host URI with the given cell URI.
pub fn with_cell_uri(self, cell_uri: impl Into<Uri>) -> Self {
impl Default for HostEndpoint {
fn default() -> Self {
HostCloud::default().into()
}
}

impl HostEndpoint {
/// Construct a new host endpoint with the given cell endpoint..
pub fn new(cell_endpoint: impl Into<Authority>) -> Self {
Self {
cell: Some(cell_uri.into()),
..self
cell_endpoint: cell_endpoint.into(),
basin_zone: None,
}
}

/// Construct from an existing host URI with the new
/// `prefix_host_with_basin` configuration.
pub fn with_prefix_host_with_basin(self, prefix_host_with_basin: bool) -> Self {
/// Construct from an existing host endpoint with the given basin zone.
pub fn with_basin_zone(self, basin_zone: impl Into<Authority>) -> Self {
Self {
prefix_host_with_basin,
basin_zone: Some(basin_zone.into()),
..self
}
}
Expand All @@ -125,7 +137,7 @@ pub struct ClientConfig {
/// Auth token for the client.
pub token: SecretString,
/// Host URI to connect with.
pub host_uri: HostUri,
pub host_endpoint: HostEndpoint,
/// Should the connection be lazy, i.e., only be made when making the very
/// first request.
pub connect_lazily: bool,
Expand All @@ -143,7 +155,7 @@ impl ClientConfig {
pub fn new(token: impl Into<String>) -> Self {
Self {
token: token.into().into(),
host_uri: HostUri::default(),
host_endpoint: HostEndpoint::default(),
connect_lazily: true,
connection_timeout: Duration::from_secs(3),
request_timeout: Duration::from_secs(5),
Expand All @@ -152,9 +164,9 @@ impl ClientConfig {
}

/// Construct from an existing configuration with the new host URIs.
pub fn with_host_uri(self, host_uri: impl Into<HostUri>) -> Self {
pub fn with_host_endpoint(self, host_endpoint: impl Into<HostEndpoint>) -> Self {
Self {
host_uri: host_uri.into(),
host_endpoint: host_endpoint.into(),
..self
}
}
Expand Down Expand Up @@ -206,7 +218,7 @@ impl Client {
force_lazy_connection: bool,
) -> Result<Self, ClientError> {
Ok(Self {
inner: ClientInner::connect_global(config, force_lazy_connection).await?,
inner: ClientInner::connect_cell(config, force_lazy_connection).await?,
})
}

Expand All @@ -220,7 +232,7 @@ impl Client {
Ok(BasinClient {
inner: self
.inner
.connect_cell(basin, /* force_lazy_connection = */ false)
.connect_basin(basin, /* force_lazy_connection = */ false)
.await?,
})
}
Expand Down Expand Up @@ -307,7 +319,7 @@ impl BasinClient {
// connection with the global client so we don't end up making 2
// connections for connecting with the basin client directly (given the
// cell URI and global URIs are different).
let force_lazy_connection = config.host_uri.cell.is_some();
let force_lazy_connection = config.host_endpoint.basin_zone.is_some();
let client = Client::connect_inner(config, force_lazy_connection).await?;
client.basin_client(basin).await
}
Expand Down Expand Up @@ -488,39 +500,31 @@ struct ClientInner {
}

impl ClientInner {
async fn connect_global(
async fn connect_cell(
config: ClientConfig,
force_lazy_connection: bool,
) -> Result<Self, ClientError> {
let uri = config.host_uri.global.clone();
Self::connect(config, uri, force_lazy_connection).await
let cell_endpoint = config.host_endpoint.cell_endpoint.clone();
Self::connect(config, cell_endpoint, force_lazy_connection).await
}

async fn connect_cell(
async fn connect_basin(
&self,
basin: impl Into<String>,
force_lazy_connection: bool,
) -> Result<Self, ClientError> {
let basin = basin.into();

match self.config.host_uri.cell.clone() {
Some(uri) if self.config.host_uri.prefix_host_with_basin => {
let host = uri.host().ok_or(ClientError::MissingHost)?;
let port = uri.port_u16().map_or(String::new(), |p| format!(":{}", p));
let authority: Authority = format!("{basin}.{host}{port}").parse()?;
let mut uri_parts = uri.into_parts();
uri_parts.authority = Some(authority);

match self.config.host_endpoint.basin_zone.clone() {
Some(endpoint) => {
let basin_zone_endpoint: Authority = format!("{basin}.{endpoint}").parse()?;
ClientInner::connect(
self.config.clone(),
Uri::from_parts(uri_parts).expect("invalid uri"),
basin_zone_endpoint,
force_lazy_connection,
)
.await
}
Some(uri) => {
ClientInner::connect(self.config.clone(), uri, force_lazy_connection).await
}
None => Ok(Self {
basin: Some(basin),
..self.clone()
Expand All @@ -530,11 +534,11 @@ impl ClientInner {

async fn connect(
config: ClientConfig,
uri: Uri,
endpoint: Authority,
force_lazy_connection: bool,
) -> Result<Self, ClientError> {
let endpoint: Endpoint = uri.clone().into();
let endpoint = endpoint
let endpoint = format!("https://{endpoint}")
.parse::<Endpoint>()?
.user_agent(config.user_agent.clone())?
.tls_config(
ClientTlsConfig::default()
Expand Down

0 comments on commit 7381edf

Please sign in to comment.