Skip to content

Commit

Permalink
feat: Only accept URIs in basin+stream args (#100)
Browse files Browse the repository at this point in the history
Valid basin: `<basin-name>` or `s2://<basin-name>`

Valid basin+stream: `s2://<basin-name>/<stream-name>`

The `list-streams` command still accepts both `--prefix` opt and URI:
`s2://<basin-name>/<prefix>`

Signed-off-by: Vaibhav Rabber <[email protected]>
  • Loading branch information
vrongmeal authored Dec 26, 2024
1 parent adc26fb commit 9c7f28a
Show file tree
Hide file tree
Showing 3 changed files with 281 additions and 168 deletions.
33 changes: 25 additions & 8 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,13 +134,30 @@ impl ServiceError {
}
}

#[derive(Debug, Error, Diagnostic)]
pub enum BasinNameOrUriParseError {
#[error(transparent)]
#[diagnostic(help("Are you trying to operate on an invalid basin?"))]
BasinName(#[from] ConvertError),
#[derive(Debug, Error)]
pub enum S2UriParseError {
#[error("S2 URI must begin with `s2://`")]
MissingUriScheme,
#[error("Invalid S2 URI scheme `{0}://`. Must be `s2://`")]
InvalidUriScheme(String),
#[error("{0}")]
InvalidBasinName(ConvertError),
#[error("Only basin name expected but found both basin and stream names")]
UnexpectedStreamName,
#[error("Missing stream name in S2 URI")]
MissingStreamName,
}

#[error("Invalid S2 URI: {0}")]
#[diagnostic(transparent)]
InvalidUri(miette::Report),
#[cfg(test)]
impl PartialEq for S2UriParseError {
fn eq(&self, other: &Self) -> bool {
match (self, other) {
(Self::MissingUriScheme, Self::MissingUriScheme) => true,
(Self::InvalidUriScheme(s), Self::InvalidUriScheme(o)) if s.eq(o) => true,
(Self::InvalidBasinName(_), Self::InvalidBasinName(_)) => true,
(Self::MissingStreamName, Self::MissingStreamName) => true,
(Self::UnexpectedStreamName, Self::UnexpectedStreamName) => true,
_ => false,
}
}
}
100 changes: 50 additions & 50 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ use tokio_stream::{
use tracing::trace;
use tracing_subscriber::{fmt::format::FmtSpan, layer::SubscriberExt, util::SubscriberInitExt};
use types::{
BasinConfig, BasinNameAndMaybeStreamUri, BasinNameAndStreamArgs, BasinNameOnlyUri,
StreamConfig, RETENTION_POLICY_PATH, STORAGE_CLASS_PATH,
BasinConfig, S2BasinAndMaybeStreamUri, S2BasinAndStreamUri, S2BasinUri, StreamConfig,
RETENTION_POLICY_PATH, STORAGE_CLASS_PATH,
};

mod account;
Expand Down Expand Up @@ -96,7 +96,7 @@ enum Commands {
/// Create a basin.
CreateBasin {
/// Name of the basin to create.
basin: BasinNameOnlyUri,
basin: S2BasinUri,

#[command(flatten)]
config: BasinConfig,
Expand All @@ -105,19 +105,19 @@ enum Commands {
/// Delete a basin.
DeleteBasin {
/// Name of the basin to delete.
basin: BasinNameOnlyUri,
basin: S2BasinUri,
},

/// Get basin config.
GetBasinConfig {
/// Basin name to get config for.
basin: BasinNameOnlyUri,
basin: S2BasinUri,
},

/// Reconfigure a basin.
ReconfigureBasin {
/// Name of the basin to reconfigure.
basin: BasinNameOnlyUri,
basin: S2BasinUri,

/// Configuration to apply.
#[command(flatten)]
Expand All @@ -129,7 +129,7 @@ enum Commands {
ListStreams {
/// Name of the basin to manage or S2 URI with basin and prefix.
#[arg(value_name = "BASIN|S2_URI")]
basin: BasinNameAndMaybeStreamUri,
uri: S2BasinAndMaybeStreamUri,

/// Filter to stream names that begin with this prefix.
#[arg(short = 'p', long)]
Expand All @@ -146,8 +146,8 @@ enum Commands {

/// Create a stream.
CreateStream {
#[command(flatten)]
args: BasinNameAndStreamArgs,
#[arg(value_name = "S2_URI")]
uri: S2BasinAndStreamUri,

/// Configuration to apply.
#[command(flatten)]
Expand All @@ -157,20 +157,20 @@ enum Commands {
/// Delete a stream.
#[command(alias = "rm")]
DeleteStream {
#[command(flatten)]
args: BasinNameAndStreamArgs,
#[arg(value_name = "S2_URI")]
uri: S2BasinAndStreamUri,
},

/// Get stream config.
GetStreamConfig {
#[command(flatten)]
args: BasinNameAndStreamArgs,
#[arg(value_name = "S2_URI")]
uri: S2BasinAndStreamUri,
},

/// Reconfigure a stream.
ReconfigureStream {
#[command(flatten)]
args: BasinNameAndStreamArgs,
#[arg(value_name = "S2_URI")]
uri: S2BasinAndStreamUri,

/// Configuration to apply.
#[command(flatten)]
Expand All @@ -179,17 +179,17 @@ enum Commands {

/// Get the next sequence number that will be assigned by a stream.
CheckTail {
#[command(flatten)]
args: BasinNameAndStreamArgs,
#[arg(value_name = "S2_URI")]
uri: S2BasinAndStreamUri,
},

/// Set the trim point for the stream.
///
/// Trimming is eventually consistent, and trimmed records may be visible
/// for a brief period.
Trim {
#[command(flatten)]
args: BasinNameAndStreamArgs,
#[arg(value_name = "S2_URI")]
uri: S2BasinAndStreamUri,

/// Earliest sequence number that should be retained.
/// This sequence number is only allowed to advance,
Expand All @@ -213,8 +213,8 @@ enum Commands {
/// Note that fencing is a cooperative mechanism,
/// and it is only enforced when a token is provided.
Fence {
#[command(flatten)]
args: BasinNameAndStreamArgs,
#[arg(value_name = "S2_URI")]
uri: S2BasinAndStreamUri,

/// New fencing token specified in hex.
/// It may be upto 16 bytes, and can be empty.
Expand All @@ -234,8 +234,8 @@ enum Commands {
///
/// Currently, only newline delimited records are supported.
Append {
#[command(flatten)]
args: BasinNameAndStreamArgs,
#[arg(value_name = "S2_URI")]
uri: S2BasinAndStreamUri,

/// Enforce fencing token specified in hex.
#[arg(short = 'f', long, value_parser = parse_fencing_token)]
Expand All @@ -257,8 +257,8 @@ enum Commands {
/// If a limit if specified, reading will stop when the limit is reached or there are no more records on the stream.
/// If a limit is not specified, the reader will keep tailing and wait for new records.
Read {
#[command(flatten)]
args: BasinNameAndStreamArgs,
#[arg(value_name = "S2_URI")]
uri: S2BasinAndStreamUri,

/// Starting sequence number (inclusive).
#[arg(short = 's', long, default_value_t = 0)]
Expand All @@ -280,8 +280,8 @@ enum Commands {

/// Ping the stream to get append acknowledgement and end-to-end latencies.
Ping {
#[command(flatten)]
args: BasinNameAndStreamArgs,
#[arg(value_name = "S2_URI")]
uri: S2BasinAndStreamUri,

/// Send a batch after this interval.
///
Expand Down Expand Up @@ -532,15 +532,15 @@ async fn run() -> Result<(), S2CliError> {
}

Commands::ListStreams {
basin,
uri,
prefix,
start_after,
limit,
} => {
let BasinNameAndMaybeStreamUri {
let S2BasinAndMaybeStreamUri {
basin,
stream: maybe_prefix,
} = basin;
} = uri;
let prefix = match (maybe_prefix, prefix) {
(Some(_), Some(_)) => {
return Err(S2CliError::InvalidArgs(miette::miette!(
Expand Down Expand Up @@ -582,8 +582,8 @@ async fn run() -> Result<(), S2CliError> {
}
}

Commands::CreateStream { args, config } => {
let (basin, stream) = args.try_into_parts()?;
Commands::CreateStream { uri, config } => {
let S2BasinAndStreamUri { basin, stream } = uri;
let cfg = config::load_config(&config_path)?;
let client_config = client_config(cfg.auth_token)?;
let basin_client = BasinClient::new(client_config, basin);
Expand All @@ -593,8 +593,8 @@ async fn run() -> Result<(), S2CliError> {
eprintln!("{}", "✓ Stream created".green().bold());
}

Commands::DeleteStream { args } => {
let (basin, stream) = args.try_into_parts()?;
Commands::DeleteStream { uri } => {
let S2BasinAndStreamUri { basin, stream } = uri;
let cfg = config::load_config(&config_path)?;
let client_config = client_config(cfg.auth_token)?;
let basin_client = BasinClient::new(client_config, basin);
Expand All @@ -604,8 +604,8 @@ async fn run() -> Result<(), S2CliError> {
eprintln!("{}", "✓ Stream deletion requested".green().bold());
}

Commands::GetStreamConfig { args } => {
let (basin, stream) = args.try_into_parts()?;
Commands::GetStreamConfig { uri } => {
let S2BasinAndStreamUri { basin, stream } = uri;
let cfg = config::load_config(&config_path)?;
let client_config = client_config(cfg.auth_token)?;
let basin_client = BasinClient::new(client_config, basin);
Expand All @@ -616,8 +616,8 @@ async fn run() -> Result<(), S2CliError> {
println!("{}", serde_json::to_string_pretty(&config)?);
}

Commands::ReconfigureStream { args, config } => {
let (basin, stream) = args.try_into_parts()?;
Commands::ReconfigureStream { uri, config } => {
let S2BasinAndStreamUri { basin, stream } = uri;
let cfg = config::load_config(&config_path)?;
let client_config = client_config(cfg.auth_token)?;
let basin_client = BasinClient::new(client_config, basin);
Expand All @@ -640,8 +640,8 @@ async fn run() -> Result<(), S2CliError> {
println!("{}", serde_json::to_string_pretty(&config)?);
}

Commands::CheckTail { args } => {
let (basin, stream) = args.try_into_parts()?;
Commands::CheckTail { uri } => {
let S2BasinAndStreamUri { basin, stream } = uri;
let cfg = config::load_config(&config_path)?;
let client_config = client_config(cfg.auth_token)?;
let stream_client = StreamClient::new(client_config, basin, stream);
Expand All @@ -650,12 +650,12 @@ async fn run() -> Result<(), S2CliError> {
}

Commands::Trim {
args,
uri,
trim_point,
fencing_token,
match_seq_num,
} => {
let (basin, stream) = args.try_into_parts()?;
let S2BasinAndStreamUri { basin, stream } = uri;
let cfg = config::load_config(&config_path)?;
let client_config = client_config(cfg.auth_token)?;
let stream_client = StreamClient::new(client_config, basin, stream);
Expand All @@ -675,12 +675,12 @@ async fn run() -> Result<(), S2CliError> {
}

Commands::Fence {
args,
uri,
new_fencing_token,
fencing_token,
match_seq_num,
} => {
let (basin, stream) = args.try_into_parts()?;
let S2BasinAndStreamUri { basin, stream } = uri;
let cfg = config::load_config(&config_path)?;
let client_config = client_config(cfg.auth_token)?;
let stream_client = StreamClient::new(client_config, basin, stream);
Expand All @@ -700,12 +700,12 @@ async fn run() -> Result<(), S2CliError> {
}

Commands::Append {
args,
uri,
input,
fencing_token,
match_seq_num,
} => {
let (basin, stream) = args.try_into_parts()?;
let S2BasinAndStreamUri { basin, stream } = uri;
let cfg = config::load_config(&config_path)?;
let client_config = client_config(cfg.auth_token)?;
let stream_client = StreamClient::new(client_config, basin, stream);
Expand Down Expand Up @@ -763,13 +763,13 @@ async fn run() -> Result<(), S2CliError> {
}

Commands::Read {
args,
uri,
start_seq_num,
output,
limit_count,
limit_bytes,
} => {
let (basin, stream) = args.try_into_parts()?;
let S2BasinAndStreamUri { basin, stream } = uri;
let cfg = config::load_config(&config_path)?;
let client_config = client_config(cfg.auth_token)?;
let stream_client = StreamClient::new(client_config, basin, stream);
Expand Down Expand Up @@ -889,12 +889,12 @@ async fn run() -> Result<(), S2CliError> {
}

Commands::Ping {
args,
uri,
interval,
batch_bytes,
num_batches,
} => {
let (basin, stream) = args.try_into_parts()?;
let S2BasinAndStreamUri { basin, stream } = uri;
let cfg = config::load_config(&config_path)?;
let client_config = client_config(cfg.auth_token)?;
let stream_client = StreamService::new(StreamClient::new(client_config, basin, stream));
Expand Down
Loading

0 comments on commit 9c7f28a

Please sign in to comment.