diff --git a/src/main.rs b/src/main.rs index 7a508f4..12e8130 100644 --- a/src/main.rs +++ b/src/main.rs @@ -66,47 +66,6 @@ enum Commands { action: ConfigActions, }, - /// Operate on an S2 account. - Account { - #[command(subcommand)] - action: AccountActions, - }, - - /// Operate on an S2 basin. - Basin { - /// Name of the basin to manage. - basin: String, - - #[command(subcommand)] - action: BasinActions, - }, - - /// Operate on an S2 stream. - Stream { - /// Name of the basin. - basin: String, - - /// Name of the stream. - stream: String, - - #[command(subcommand)] - action: StreamActions, - }, -} - -#[derive(Subcommand, Debug)] -enum ConfigActions { - /// Set the authentication token to be reused in subsequent commands. - /// Alternatively, use the S2_AUTH_TOKEN environment variable. - Set { - #[arg(short, long)] - auth_token: String, - }, -} - -#[deny(missing_docs)] -#[derive(Subcommand, Debug)] -enum AccountActions { /// List basins. ListBasins { /// Filter to basin names that begin with this prefix. @@ -125,7 +84,7 @@ enum AccountActions { /// Create a basin. CreateBasin { /// Name of the basin to create. - basin: String, + basin: BasinName, #[command(flatten)] config: BasinConfig, @@ -134,30 +93,30 @@ enum AccountActions { /// Delete a basin. DeleteBasin { /// Name of the basin to delete. - basin: String, + basin: BasinName, }, /// Get basin config. GetBasinConfig { /// Basin name to get config for. - basin: String, + basin: BasinName, }, /// Reconfigure a basin. ReconfigureBasin { /// Name of the basin to reconfigure. - basin: String, + basin: BasinName, /// Configuration to apply. #[command(flatten)] config: BasinConfig, }, -} -#[derive(Subcommand, Debug)] -enum BasinActions { /// List streams. ListStreams { + /// Name of the basin to manage. + basin: BasinName, + /// Filter to stream names that begin with this prefix. #[arg(short, long)] prefix: Option, @@ -173,6 +132,9 @@ enum BasinActions { /// Create a stream. CreateStream { + /// Name of the basin to manage. + basin: BasinName, + /// Name of the stream to create. stream: String, @@ -183,18 +145,27 @@ enum BasinActions { /// Delete a stream. DeleteStream { + /// Name of the basin to manage. + basin: BasinName, + /// Name of the stream to delete. stream: String, }, /// Get stream config. GetStreamConfig { + /// Name of the basin to manage. + basin: BasinName, + /// Name of the stream to get config for. stream: String, }, /// Reconfigure a stream. ReconfigureStream { + /// Name of the basin to manage. + basin: BasinName, + /// Name of the stream to reconfigure. stream: String, @@ -202,17 +173,27 @@ enum BasinActions { #[command(flatten)] config: StreamConfig, }, -} -#[derive(Subcommand, Debug)] -enum StreamActions { /// Get the next sequence number that will be assigned by a stream. - CheckTail, + CheckTail { + /// Name of the basin. + basin: BasinName, + + /// Name of the stream. + stream: String, + }, /// Set the trim point for the stream. + /// /// Trimming is eventually consistent, and trimmed records may be visible /// for a brief period. Trim { + /// Name of the basin. + basin: BasinName, + + /// Name of the stream. + stream: String, + /// Trim point. /// This sequence number is only allowed to advance, and any regression /// will be ignored. @@ -220,16 +201,31 @@ enum StreamActions { }, /// Set the fencing token for the stream. + /// /// Fencing is strongly consistent, and subsequent appends that specify a /// fencing token will be rejected if it does not match. Fence { + /// Name of the basin. + basin: BasinName, + + /// Name of the stream. + stream: String, + /// Payload upto 16 bytes to set as the fencing token. /// An empty payload clears the token. fencing_token: Option, }, - /// Append records to a stream. Currently, only newline delimited records are supported. + /// Append records to a stream. + /// + /// Currently, only newline delimited records are supported. Append { + /// Name of the basin. + basin: BasinName, + + /// Name of the stream. + stream: String, + /// Enforce a fencing token which must have been previously set by a /// `fence` command record. #[arg(short = 'f', long)] @@ -247,9 +243,16 @@ enum StreamActions { }, /// Read records from a stream. + /// /// If a limit if specified, reading will stop when the limit is reached or there are no more records on the stream. /// If a limit is not specified, the reader will keep tailing and wait for new records. Read { + /// Name of the basin. + basin: BasinName, + + /// Name of the stream. + stream: String, + /// Starting sequence number (inclusive). If not specified, the latest record. #[arg(short = 's', long)] start_seq_num: Option, @@ -269,6 +272,16 @@ enum StreamActions { }, } +#[derive(Subcommand, Debug)] +enum ConfigActions { + /// Set the authentication token to be reused in subsequent commands. + /// Alternatively, use the S2_AUTH_TOKEN environment variable. + Set { + #[arg(short, long)] + auth_token: String, + }, +} + #[derive(Debug, Clone)] pub enum RecordsIn { File(PathBuf), @@ -369,371 +382,405 @@ async fn run() -> Result<(), S2CliError> { } }, - Commands::Account { action } => { + Commands::ListBasins { + prefix, + start_after, + limit, + } => { let cfg = config::load_config(&config_path)?; let client_config = client_config(cfg.auth_token)?; let account_service = AccountService::new(Client::new(client_config)); - match action { - AccountActions::ListBasins { - prefix, - start_after, - limit, - } => { - let response = account_service - .list_basins( - prefix.unwrap_or_default(), - start_after.unwrap_or_default(), - limit.unwrap_or_default(), - ) - .await?; - - for basin_info in response.basins { - let BasinInfo { name, state, .. } = basin_info; - - let state = match state { - streamstore::types::BasinState::Active => state.to_string().green(), - streamstore::types::BasinState::Deleting => state.to_string().red(), - _ => state.to_string().yellow(), - }; - println!("{} {}", name, state); - } + let response = account_service + .list_basins( + prefix.unwrap_or_default(), + start_after.unwrap_or_default(), + limit.unwrap_or_default(), + ) + .await?; + + for basin_info in response.basins { + let BasinInfo { name, state, .. } = basin_info; + + let state = match state { + streamstore::types::BasinState::Active => state.to_string().green(), + streamstore::types::BasinState::Deleting => state.to_string().red(), + _ => state.to_string().yellow(), + }; + println!("{} {}", name, state); + } + } + + Commands::CreateBasin { basin, config } => { + let cfg = config::load_config(&config_path)?; + let client_config = client_config(cfg.auth_token)?; + let account_service = AccountService::new(Client::new(client_config)); + let (storage_class, retention_policy) = match &config.default_stream_config { + Some(config) => { + let storage_class = config.storage_class.clone(); + let retention_policy = config.retention_policy.clone(); + (storage_class, retention_policy) } + None => (None, None), + }; + account_service + .create_basin(basin, storage_class, retention_policy) + .await?; - AccountActions::CreateBasin { basin, config } => { - let (storage_class, retention_policy) = match &config.default_stream_config { - Some(config) => { - let storage_class = config.storage_class.clone(); - let retention_policy = config.retention_policy.clone(); - (storage_class, retention_policy) - } - None => (None, None), - }; - account_service - .create_basin(basin.try_into()?, storage_class, retention_policy) - .await?; + eprintln!("{}", "✓ Basin created".green().bold()); + } - eprintln!("{}", "✓ Basin created".green().bold()); - } + Commands::DeleteBasin { basin } => { + let cfg = config::load_config(&config_path)?; + let client_config = client_config(cfg.auth_token)?; + let account_service = AccountService::new(Client::new(client_config)); + account_service.delete_basin(basin).await?; + eprintln!("{}", "✓ Basin deletion requested".green().bold()); + } - AccountActions::DeleteBasin { basin } => { - account_service.delete_basin(basin.try_into()?).await?; - eprintln!("{}", "✓ Basin deletion requested".green().bold()); - } + Commands::GetBasinConfig { basin } => { + let cfg = config::load_config(&config_path)?; + let client_config = client_config(cfg.auth_token)?; + let account_service = AccountService::new(Client::new(client_config)); + let basin_config = account_service.get_basin_config(basin).await?; + let basin_config: BasinConfig = basin_config.into(); + println!("{}", serde_json::to_string_pretty(&basin_config)?); + } - AccountActions::GetBasinConfig { basin } => { - let basin_config = account_service.get_basin_config(basin.try_into()?).await?; - let basin_config: BasinConfig = basin_config.into(); - println!("{}", serde_json::to_string_pretty(&basin_config)?); + Commands::ReconfigureBasin { basin, config } => { + let cfg = config::load_config(&config_path)?; + let client_config = client_config(cfg.auth_token)?; + let account_service = AccountService::new(Client::new(client_config)); + let mut mask = Vec::new(); + if let Some(config) = &config.default_stream_config { + if config.storage_class.is_some() { + mask.push(STORAGE_CLASS_PATH.to_string()); + } + if config.retention_policy.is_some() { + mask.push(RETENTION_POLICY_PATH.to_string()); } + } - AccountActions::ReconfigureBasin { basin, config } => { - let mut mask = Vec::new(); - if let Some(config) = &config.default_stream_config { - if config.storage_class.is_some() { - mask.push(STORAGE_CLASS_PATH.to_string()); - } - if config.retention_policy.is_some() { - mask.push(RETENTION_POLICY_PATH.to_string()); - } - } + account_service + .reconfigure_basin(basin, config.into(), mask) + .await?; + } - account_service - .reconfigure_basin(basin.try_into()?, config.into(), mask) - .await?; - } + Commands::ListStreams { + basin, + prefix, + start_after, + limit, + } => { + let cfg = config::load_config(&config_path)?; + let client_config = client_config(cfg.auth_token)?; + let basin_client = BasinClient::new(client_config, basin); + let streams = BasinService::new(basin_client) + .list_streams( + prefix.unwrap_or_default(), + start_after.unwrap_or_default(), + limit.unwrap_or_default(), + ) + .await?; + for StreamInfo { + name, + created_at, + deleted_at, + } in streams + { + let date_time = |time: u32| { + humantime::format_rfc3339_seconds(UNIX_EPOCH + Duration::from_secs(time as u64)) + }; + + println!( + "{} {} {}", + name, + date_time(created_at).to_string().green(), + deleted_at + .map(|d| date_time(d).to_string().red()) + .unwrap_or_default() + ); } } - Commands::Basin { basin, action } => { - let basin = BasinName::try_from(basin)?; + Commands::CreateStream { + basin, + stream, + config, + } => { let cfg = config::load_config(&config_path)?; let client_config = client_config(cfg.auth_token)?; - match action { - BasinActions::ListStreams { - prefix, - start_after, - limit, - } => { - let basin_client = BasinClient::new(client_config, basin); - let streams = BasinService::new(basin_client) - .list_streams( - prefix.unwrap_or_default(), - start_after.unwrap_or_default(), - limit.unwrap_or_default(), - ) - .await?; - for StreamInfo { - name, - created_at, - deleted_at, - } in streams - { - let date_time = |time: u32| { - humantime::format_rfc3339_seconds( - UNIX_EPOCH + Duration::from_secs(time as u64), - ) - }; - - println!( - "{} {} {}", - name, - date_time(created_at).to_string().green(), - deleted_at - .map(|d| date_time(d).to_string().red()) - .unwrap_or_default() - ); - } - } + let basin_client = BasinClient::new(client_config, basin); + BasinService::new(basin_client) + .create_stream(stream, config.map(Into::into)) + .await?; + eprintln!("{}", "✓ Stream created".green().bold()); + } - BasinActions::CreateStream { stream, config } => { - let basin_client = BasinClient::new(client_config, basin); - BasinService::new(basin_client) - .create_stream(stream, config.map(Into::into)) - .await?; - eprintln!("{}", "✓ Stream created".green().bold()); - } + Commands::DeleteStream { basin, stream } => { + let cfg = config::load_config(&config_path)?; + let client_config = client_config(cfg.auth_token)?; + let basin_client = BasinClient::new(client_config, basin); + BasinService::new(basin_client) + .delete_stream(stream) + .await?; + eprintln!("{}", "✓ Stream deletion requested".green().bold()); + } - BasinActions::DeleteStream { stream } => { - let basin_client = BasinClient::new(client_config, basin); - BasinService::new(basin_client) - .delete_stream(stream) - .await?; - eprintln!("{}", "✓ Stream deleted".green().bold()); - } + Commands::GetStreamConfig { basin, stream } => { + let cfg = config::load_config(&config_path)?; + let client_config = client_config(cfg.auth_token)?; + let basin_client = BasinClient::new(client_config, basin); + let config: StreamConfig = BasinService::new(basin_client) + .get_stream_config(stream) + .await? + .into(); + println!("{}", serde_json::to_string_pretty(&config)?); + } - BasinActions::GetStreamConfig { stream } => { - let basin_client = BasinClient::new(client_config, basin); - let config: StreamConfig = BasinService::new(basin_client) - .get_stream_config(stream) - .await? - .into(); - println!("{}", serde_json::to_string_pretty(&config)?); - } + Commands::ReconfigureStream { + basin, + stream, + config, + } => { + let cfg = config::load_config(&config_path)?; + let client_config = client_config(cfg.auth_token)?; + let basin_client = BasinClient::new(client_config, basin); + let mut mask = Vec::new(); - BasinActions::ReconfigureStream { stream, config } => { - let basin_client = BasinClient::new(client_config, basin); - let mut mask = Vec::new(); + if config.storage_class.is_some() { + mask.push("storage_class".to_string()); + }; - if config.storage_class.is_some() { - mask.push("storage_class".to_string()); - }; + if config.retention_policy.is_some() { + mask.push("retention_policy".to_string()); + }; - if config.retention_policy.is_some() { - mask.push("retention_policy".to_string()); - }; + let config: StreamConfig = BasinService::new(basin_client) + .reconfigure_stream(stream, config.into(), mask) + .await? + .into(); - let config: StreamConfig = BasinService::new(basin_client) - .reconfigure_stream(stream, config.into(), mask) - .await? - .into(); + eprintln!("{}", "✓ Stream reconfigured".green().bold()); + println!("{}", serde_json::to_string_pretty(&config)?); + } - eprintln!("{}", "✓ Stream reconfigured".green().bold()); - println!("{}", serde_json::to_string_pretty(&config)?); - } - } + Commands::CheckTail { basin, stream } => { + let cfg = config::load_config(&config_path)?; + let client_config = client_config(cfg.auth_token)?; + let stream_client = StreamClient::new(client_config, basin, stream); + let next_seq_num = StreamService::new(stream_client).check_tail().await?; + println!("{}", next_seq_num); } - Commands::Stream { + + Commands::Trim { basin, stream, - action, + trim_point, } => { - let basin = BasinName::try_from(basin)?; let cfg = config::load_config(&config_path)?; let client_config = client_config(cfg.auth_token)?; - match action { - StreamActions::CheckTail => { - let stream_client = StreamClient::new(client_config, basin, stream); - let next_seq_num = StreamService::new(stream_client).check_tail().await?; - println!("{}", next_seq_num); - } - StreamActions::Trim { trim_point } => { - let stream_client = StreamClient::new(client_config, basin, stream); - StreamService::new(stream_client) - .append_command_record(CommandRecord::trim(trim_point)) - .await?; - eprintln!("{}", "✓ Trim requested".green().bold()); - } - StreamActions::Fence { fencing_token } => { - let stream_client = StreamClient::new(client_config, basin, stream); - StreamService::new(stream_client) - .append_command_record(CommandRecord::fence(fencing_token)) - .await?; - eprintln!("{}", "✓ Fencing token set".green().bold()); - } - StreamActions::Append { - input, - fencing_token, - match_seq_num, - } => { - let stream_client = StreamClient::new(client_config, basin, stream); - let append_input_stream = RecordStream::new( - input - .into_reader() - .await - .map_err(|e| S2CliError::RecordReaderInit(e.to_string()))? - .lines(), - ); - - let mut append_output_stream = StreamService::new(stream_client) - .append_session(append_input_stream, fencing_token, match_seq_num) - .await?; - loop { - select! { - maybe_append_result = append_output_stream.next() => { - match maybe_append_result { - Some(append_result) => { - match append_result { - Ok(append_result) => { - eprintln!( - "{}", - format!( - "✓ [APPENDED] start: {}, end: {}, next: {}", - append_result.start_seq_num, - append_result.end_seq_num, - append_result.next_seq_num - ) - .green() - .bold() - ); - }, - Err(e) => { - return Err(ServiceError::new(ServiceErrorContext::AppendSession, e).into()); - } - } + let stream_client = StreamClient::new(client_config, basin, stream); + StreamService::new(stream_client) + .append_command_record(CommandRecord::trim(trim_point)) + .await?; + eprintln!("{}", "✓ Trim requested".green().bold()); + } + + Commands::Fence { + basin, + stream, + fencing_token, + } => { + let cfg = config::load_config(&config_path)?; + let client_config = client_config(cfg.auth_token)?; + let stream_client = StreamClient::new(client_config, basin, stream); + StreamService::new(stream_client) + .append_command_record(CommandRecord::fence(fencing_token)) + .await?; + eprintln!("{}", "✓ Fencing token set".green().bold()); + } + + Commands::Append { + basin, + stream, + input, + fencing_token, + match_seq_num, + } => { + let cfg = config::load_config(&config_path)?; + let client_config = client_config(cfg.auth_token)?; + let stream_client = StreamClient::new(client_config, basin, stream); + let append_input_stream = RecordStream::new( + input + .into_reader() + .await + .map_err(|e| S2CliError::RecordReaderInit(e.to_string()))? + .lines(), + ); + + let mut append_output_stream = StreamService::new(stream_client) + .append_session(append_input_stream, fencing_token, match_seq_num) + .await?; + loop { + select! { + maybe_append_result = append_output_stream.next() => { + match maybe_append_result { + Some(append_result) => { + match append_result { + Ok(append_result) => { + eprintln!( + "{}", + format!( + "✓ [APPENDED] start: {}, end: {}, next: {}", + append_result.start_seq_num, + append_result.end_seq_num, + append_result.next_seq_num + ) + .green() + .bold() + ); + }, + Err(e) => { + return Err(ServiceError::new(ServiceErrorContext::AppendSession, e).into()); } - None => break, } } - - _ = signal::ctrl_c() => { - drop(append_output_stream); - eprintln!("{}", "■ [ABORTED]".red().bold()); - break; - } + None => break, } } + + _ = signal::ctrl_c() => { + drop(append_output_stream); + eprintln!("{}", "■ [ABORTED]".red().bold()); + break; + } } - StreamActions::Read { - start_seq_num, - output, - limit_count, - limit_bytes, - } => { - let stream_client = StreamClient::new(client_config, basin, stream); - let mut read_output_stream = StreamService::new(stream_client) - .read_session(start_seq_num, limit_count, limit_bytes) - .await?; - let mut writer = output.into_writer().await.unwrap(); - - let mut start = None; - let mut total_data_len = 0; - - loop { - select! { - maybe_read_result = read_output_stream.next() => { - match maybe_read_result { - Some(read_result) => { - if start.is_none() { - start = Some(Instant::now()); - } - match read_result { - Ok(ReadOutput::Batch(sequenced_record_batch)) => { - let num_records = sequenced_record_batch.records.len(); - let mut batch_len = 0; - - let seq_range = match ( - sequenced_record_batch.records.first(), - sequenced_record_batch.records.last(), - ) { - (Some(first), Some(last)) => first.seq_num..=last.seq_num, - _ => panic!("empty batch"), + } + } + + Commands::Read { + basin, + stream, + start_seq_num, + output, + limit_count, + limit_bytes, + } => { + let cfg = config::load_config(&config_path)?; + let client_config = client_config(cfg.auth_token)?; + let stream_client = StreamClient::new(client_config, basin, stream); + let mut read_output_stream = StreamService::new(stream_client) + .read_session(start_seq_num, limit_count, limit_bytes) + .await?; + let mut writer = output.into_writer().await.unwrap(); + + let mut start = None; + let mut total_data_len = 0; + + loop { + select! { + maybe_read_result = read_output_stream.next() => { + match maybe_read_result { + Some(read_result) => { + if start.is_none() { + start = Some(Instant::now()); + } + match read_result { + Ok(ReadOutput::Batch(sequenced_record_batch)) => { + let num_records = sequenced_record_batch.records.len(); + let mut batch_len = 0; + + let seq_range = match ( + sequenced_record_batch.records.first(), + sequenced_record_batch.records.last(), + ) { + (Some(first), Some(last)) => first.seq_num..=last.seq_num, + _ => panic!("empty batch"), + }; + for sequenced_record in sequenced_record_batch.records { + batch_len += sequenced_record.metered_bytes(); + + if let Some(command_record) = sequenced_record.as_command_record() { + let (cmd, description) = match command_record { + CommandRecord::Fence { fencing_token } => ( + "fence", + format!("{fencing_token:?}"), + ), + CommandRecord::Trim { seq_num } => ( + "trim", + format!("TrimPoint({seq_num})"), + ), }; - for sequenced_record in sequenced_record_batch.records { - batch_len += sequenced_record.metered_bytes(); - - if let Some(command_record) = sequenced_record.as_command_record() { - let (cmd, description) = match command_record { - CommandRecord::Fence { fencing_token } => ( - "fence", - format!("{fencing_token:?}"), - ), - CommandRecord::Trim { seq_num } => ( - "trim", - format!("TrimPoint({seq_num})"), - ), - }; - eprintln!("{} with {}", cmd.bold(), description.green().bold()); - } else { - let data = &sequenced_record.body; - writer - .write_all(data) - .await - .map_err(|e| S2CliError::RecordWrite(e.to_string()))?; - writer - .write_all(b"\n") - .await - .map_err(|e| S2CliError::RecordWrite(e.to_string()))?; - } - } - total_data_len += batch_len; - - let throughput_mibps = (total_data_len as f64 - / start.unwrap().elapsed().as_secs_f64()) - / 1024.0 - / 1024.0; - - eprintln!( - "{}", - format!( - "⦿ {throughput_mibps:.2} MiB/s \ - ({num_records} records in range {seq_range:?})", - ) - .blue() - .bold() - ); + eprintln!("{} with {}", cmd.bold(), description.green().bold()); + } else { + let data = &sequenced_record.body; + writer + .write_all(data) + .await + .map_err(|e| S2CliError::RecordWrite(e.to_string()))?; + writer + .write_all(b"\n") + .await + .map_err(|e| S2CliError::RecordWrite(e.to_string()))?; } + } + total_data_len += batch_len; + + let throughput_mibps = (total_data_len as f64 + / start.unwrap().elapsed().as_secs_f64()) + / 1024.0 + / 1024.0; + + eprintln!( + "{}", + format!( + "⦿ {throughput_mibps:.2} MiB/s \ + ({num_records} records in range {seq_range:?})", + ) + .blue() + .bold() + ); + } - Ok(ReadOutput::FirstSeqNum(seq_num)) => { - eprintln!("{}", format!("first_seq_num: {seq_num}").blue().bold()); - } + Ok(ReadOutput::FirstSeqNum(seq_num)) => { + eprintln!("{}", format!("first_seq_num: {seq_num}").blue().bold()); + } - Ok(ReadOutput::NextSeqNum(seq_num)) => { - eprintln!("{}", format!("next_seq_num: {seq_num}").blue().bold()); - } + Ok(ReadOutput::NextSeqNum(seq_num)) => { + eprintln!("{}", format!("next_seq_num: {seq_num}").blue().bold()); + } - Err(e) => { - return Err(ServiceError::new(ServiceErrorContext::ReadSession, e).into()); - } - } + Err(e) => { + return Err(ServiceError::new(ServiceErrorContext::ReadSession, e).into()); } - None => break, } - }, - _ = signal::ctrl_c() => { - drop(read_output_stream); - eprintln!("{}", "■ [ABORTED]".red().bold()); - break; } + None => break, } - let total_elapsed_time = start.unwrap().elapsed().as_secs_f64(); + }, + _ = signal::ctrl_c() => { + drop(read_output_stream); + eprintln!("{}", "■ [ABORTED]".red().bold()); + break; + } + } + let total_elapsed_time = start.unwrap().elapsed().as_secs_f64(); - let total_throughput_mibps = - (total_data_len as f64 / total_elapsed_time) / 1024.0 / 1024.0; + let total_throughput_mibps = + (total_data_len as f64 / total_elapsed_time) / 1024.0 / 1024.0; - eprintln!( - "{}", - format!( - "{total_data_len} metered bytes in \ + eprintln!( + "{}", + format!( + "{total_data_len} metered bytes in \ {total_elapsed_time} seconds \ at {total_throughput_mibps:.2} MiB/s" - ) - .yellow() - .bold() - ); + ) + .yellow() + .bold() + ); - writer.flush().await.expect("writer flush"); - } - } + writer.flush().await.expect("writer flush"); } } }