Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Redo endpoint logic (#39) #40

Merged
merged 8 commits into from
Nov 6, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions examples/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::time::Duration;

use futures::StreamExt;
use streamstore::{
client::{Client, ClientConfig, HostCloud},
client::{Client, ClientConfig, HostEndpoints},
service_error::{CreateBasinError, CreateStreamError, ServiceError},
streams::AppendRecordStream,
types::{
Expand All @@ -16,7 +16,7 @@ async fn main() {
let token = std::env::var("S2_AUTH_TOKEN").unwrap();

let config = ClientConfig::new(token)
.with_host_uri(HostCloud::Local)
.with_host_endpoint(HostEndpoints::from_env())
.with_request_timeout(Duration::from_secs(10));

println!("Connecting with {config:#?}");
Expand Down
166 changes: 83 additions & 83 deletions src/client.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::time::Duration;
use std::{fmt::Display, str::FromStr, time::Duration};

use backon::{ConstantBuilder, Retryable};
use http::{uri::Authority, Uri};
use http::uri::Authority;
use secrecy::SecretString;
use sync_docs::sync_docs;
use tonic::transport::{Channel, ClientTlsConfig, Endpoint};
Expand Down Expand Up @@ -46,76 +46,88 @@ use crate::{
/// ```
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum HostCloud {
/// Localhost (to be used for testing).
#[default]
Local,
/// S2 hosted on AWS.
#[default]
Aws,
}

impl From<HostCloud> for HostUri {
fn from(value: HostCloud) -> Self {
match value {
HostCloud::Local => HostUri {
global: std::env::var("S2_FRONTEND_AUTHORITY")
.expect("S2_FRONTEND_AUTHORITY required")
.try_into()
.unwrap(),
cell: None,
prefix_host_with_basin: false,
},
HostCloud::Aws => todo!("prod aws uris"),
impl HostCloud {
const AWS: &str = "aws";

fn as_str(&self) -> &'static str {
match self {
Self::Aws => Self::AWS,
}
}
}

/// URIs for the hosted S2 environment.
#[derive(Debug, Clone)]
pub struct HostUri {
/// Global URI to connect to.
pub global: Uri,
/// Cell specific URI (for basin and stream service requests).
///
/// Client uses the same URI as the global URI if cell URI is absent.
pub cell: Option<Uri>,
/// Whether the cell URI host should be prefixed with the basin name or not.
///
/// If set to true, the cell URI `cell.aws.s2.dev` would be prefixed with
/// the basin name and set to `<basin>.cell.aws.s2.dev`.
pub prefix_host_with_basin: bool,
pub fn cell_endpoint(&self) -> Authority {
format!("{}.s2.dev", self.as_str()).parse().unwrap()
}

pub fn basin_endpoint(&self) -> Option<Authority> {
Some(format!("b.{}.s2.dev", self.as_str()).parse().unwrap())
}
}

impl Default for HostUri {
fn default() -> Self {
HostCloud::default().into()
impl Display for HostCloud {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(self.as_str())
}
}

impl HostUri {
/// Construct a new host URI with the given global URI.
pub fn new(global_uri: impl Into<Uri>) -> Self {
Self {
global: global_uri.into(),
cell: None,
prefix_host_with_basin: false,
impl FromStr for HostCloud {
type Err = InvalidHostCloudError;

fn from_str(s: &str) -> Result<Self, Self::Err> {
if s.eq_ignore_ascii_case(Self::AWS) {
Ok(Self::Aws)
} else {
Err(InvalidHostCloudError)
}
}
}

#[derive(Debug, Clone, Copy, thiserror::Error)]
#[error("invalid host cloud")]
pub struct InvalidHostCloudError;

/// Construct from an existing host URI with the given cell URI.
pub fn with_cell_uri(self, cell_uri: impl Into<Uri>) -> Self {
impl From<HostCloud> for HostEndpoints {
fn from(value: HostCloud) -> Self {
Self {
cell: Some(cell_uri.into()),
..self
cell: value.cell_endpoint(),
basin_zone: value.basin_endpoint(),
}
}
}

/// Construct from an existing host URI with the new
/// `prefix_host_with_basin` configuration.
pub fn with_prefix_host_with_basin(self, prefix_host_with_basin: bool) -> Self {
Self {
prefix_host_with_basin,
..self
/// Endpoints for the hosted S2 environment.
#[derive(Debug, Clone)]
pub struct HostEndpoints {
pub cell: Authority,
pub basin_zone: Option<Authority>,
}

impl Default for HostEndpoints {
fn default() -> Self {
HostCloud::default().into()
}
}

impl HostEndpoints {
pub fn from_env() -> Self {
let cloud = std::env::var("S2_CLOUD")
.ok()
.and_then(|c| c.parse::<HostCloud>().ok())
shikhar marked this conversation as resolved.
Show resolved Hide resolved
.unwrap_or_default();

fn endpoint_from_env(env: &str) -> Option<Authority> {
std::env::var(env).ok().and_then(|e| e.parse().ok())
shikhar marked this conversation as resolved.
Show resolved Hide resolved
}

let cell = endpoint_from_env("S2_CELL").unwrap_or_else(|| cloud.cell_endpoint());
let basin_zone = endpoint_from_env("S2_BASIN_ZONE").or_else(|| cloud.basin_endpoint());
shikhar marked this conversation as resolved.
Show resolved Hide resolved

Self { cell, basin_zone }
}
}

Expand All @@ -125,7 +137,7 @@ pub struct ClientConfig {
/// Auth token for the client.
pub token: SecretString,
/// Host URI to connect with.
pub host_uri: HostUri,
pub host_endpoint: HostEndpoints,
/// Should the connection be lazy, i.e., only be made when making the very
/// first request.
pub connect_lazily: bool,
Expand All @@ -143,7 +155,7 @@ impl ClientConfig {
pub fn new(token: impl Into<String>) -> Self {
Self {
token: token.into().into(),
host_uri: HostUri::default(),
host_endpoint: HostEndpoints::default(),
connect_lazily: true,
connection_timeout: Duration::from_secs(3),
request_timeout: Duration::from_secs(5),
Expand All @@ -152,9 +164,9 @@ impl ClientConfig {
}

/// Construct from an existing configuration with the new host URIs.
pub fn with_host_uri(self, host_uri: impl Into<HostUri>) -> Self {
pub fn with_host_endpoint(self, host_endpoint: impl Into<HostEndpoints>) -> Self {
Self {
host_uri: host_uri.into(),
host_endpoint: host_endpoint.into(),
..self
}
}
Expand Down Expand Up @@ -206,7 +218,7 @@ impl Client {
force_lazy_connection: bool,
) -> Result<Self, ClientError> {
Ok(Self {
inner: ClientInner::connect_global(config, force_lazy_connection).await?,
inner: ClientInner::connect_cell(config, force_lazy_connection).await?,
})
}

Expand All @@ -220,7 +232,7 @@ impl Client {
Ok(BasinClient {
inner: self
.inner
.connect_cell(basin, /* force_lazy_connection = */ false)
.connect_basin(basin, /* force_lazy_connection = */ false)
.await?,
})
}
Expand Down Expand Up @@ -307,7 +319,7 @@ impl BasinClient {
// connection with the global client so we don't end up making 2
// connections for connecting with the basin client directly (given the
// cell URI and global URIs are different).
let force_lazy_connection = config.host_uri.cell.is_some();
let force_lazy_connection = config.host_endpoint.basin_zone.is_some();
let client = Client::connect_inner(config, force_lazy_connection).await?;
client.basin_client(basin).await
}
Expand Down Expand Up @@ -488,38 +500,26 @@ struct ClientInner {
}

impl ClientInner {
async fn connect_global(
async fn connect_cell(
config: ClientConfig,
force_lazy_connection: bool,
) -> Result<Self, ClientError> {
let uri = config.host_uri.global.clone();
Self::connect(config, uri, force_lazy_connection).await
let cell_endpoint = config.host_endpoint.cell.clone();
Self::connect(config, cell_endpoint, force_lazy_connection).await
}

async fn connect_cell(
async fn connect_basin(
&self,
basin: impl Into<String>,
force_lazy_connection: bool,
) -> Result<Self, ClientError> {
let basin = basin.into();

match self.config.host_uri.cell.clone() {
Some(uri) if self.config.host_uri.prefix_host_with_basin => {
let host = uri.host().ok_or(ClientError::MissingHost)?;
let port = uri.port_u16().map_or(String::new(), |p| format!(":{}", p));
let authority: Authority = format!("{basin}.{host}{port}").parse()?;
let mut uri_parts = uri.into_parts();
uri_parts.authority = Some(authority);

ClientInner::connect(
self.config.clone(),
Uri::from_parts(uri_parts).expect("invalid uri"),
force_lazy_connection,
)
.await
}
Some(uri) => {
ClientInner::connect(self.config.clone(), uri, force_lazy_connection).await
match self.config.host_endpoint.basin_zone.clone() {
Some(endpoint) => {
let basin_endpoint: Authority = format!("{basin}.{endpoint}").parse()?;
ClientInner::connect(self.config.clone(), basin_endpoint, force_lazy_connection)
.await
}
None => Ok(Self {
basin: Some(basin),
Expand All @@ -530,11 +530,11 @@ impl ClientInner {

async fn connect(
config: ClientConfig,
uri: Uri,
endpoint: Authority,
force_lazy_connection: bool,
) -> Result<Self, ClientError> {
let endpoint: Endpoint = uri.clone().into();
let endpoint = endpoint
let endpoint = format!("https://{endpoint}")
.parse::<Endpoint>()?
.user_agent(config.user_agent.clone())?
.http2_adaptive_window(true)
.tls_config(
Expand Down