Skip to content

Commit

Permalink
feat: s2 ls command to list basins or streams
Browse files Browse the repository at this point in the history
Resolves: #101

Signed-off-by: Vaibhav Rabber <[email protected]>
  • Loading branch information
vrongmeal committed Dec 26, 2024
1 parent 35e5b60 commit e36d6e6
Showing 1 changed file with 117 additions and 62 deletions.
179 changes: 117 additions & 62 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,28 @@ enum Commands {
action: ConfigActions,
},

/// List basins or streams in a basin.
///
/// List basins if basin name is not provided otherwise lists streams in
/// the basin.
Ls {
/// Name of the basin to manage or S2 URI with basin and prefix.
#[arg(value_name = "BASIN|S2_URI")]
uri: Option<S2BasinAndMaybeStreamUri>,

/// Filter to names that begin with this prefix.
#[arg(short = 'p', long)]
prefix: Option<String>,

/// Filter to names that lexicographically start after this name.
#[arg(short = 's', long)]
start_after: Option<String>,

/// Number of results, upto a maximum of 1000.
#[arg(short = 'n', long)]
limit: Option<usize>,
},

/// List basins.
ListBasins {
/// Filter to basin names that begin with this prefix.
Expand Down Expand Up @@ -122,7 +144,6 @@ enum Commands {
},

/// List streams.
#[command(alias = "ls")]
ListStreams {
/// Name of the basin to manage or S2 URI with basin and prefix.
#[arg(value_name = "BASIN|S2_URI")]
Expand Down Expand Up @@ -152,7 +173,6 @@ enum Commands {
},

/// Delete a stream.
#[command(alias = "rm")]
DeleteStream {
#[arg(value_name = "S2_URI")]
uri: S2BasinAndStreamUri,
Expand Down Expand Up @@ -426,6 +446,84 @@ async fn run() -> Result<(), S2CliError> {
.with(tracing_subscriber::EnvFilter::from_default_env())
.init();

async fn list_basins(
client_config: ClientConfig,
prefix: Option<String>,
start_after: Option<String>,
limit: Option<usize>,
) -> Result<(), S2CliError> {
let account_service = AccountService::new(Client::new(client_config));
let response = account_service
.list_basins(
prefix.unwrap_or_default(),
start_after.unwrap_or_default(),
limit.unwrap_or_default(),
)
.await?;
for basin_info in response.basins {
let BasinInfo { name, state, .. } = basin_info;

let state = match state {
s2::types::BasinState::Active => state.to_string().green(),
s2::types::BasinState::Deleting => state.to_string().red(),
_ => state.to_string().yellow(),
};
println!("{} {}", name, state);
}
Ok(())
}

async fn list_streams(
client_config: ClientConfig,
uri: S2BasinAndMaybeStreamUri,
prefix: Option<String>,
start_after: Option<String>,
limit: Option<usize>,
) -> Result<(), S2CliError> {
let S2BasinAndMaybeStreamUri {
basin,
stream: maybe_prefix,
} = uri;
let prefix = match (maybe_prefix, prefix) {
(Some(_), Some(_)) => {
return Err(S2CliError::InvalidArgs(miette::miette!(
help = "Make sure to provide the prefix once either using '--prefix' opt or in URI like 's2://basin-name/prefix'",
"Multiple prefixes provided"
)));
}
(Some(s), None) | (None, Some(s)) => Some(s),
(None, None) => None,
};
let basin_client = BasinClient::new(client_config, basin);
let streams = BasinService::new(basin_client)
.list_streams(
prefix.unwrap_or_default(),
start_after.unwrap_or_default(),
limit.unwrap_or_default(),
)
.await?;
for StreamInfo {
name,
created_at,
deleted_at,
} in streams
{
let date_time = |time: u32| {
humantime::format_rfc3339_seconds(UNIX_EPOCH + Duration::from_secs(time as u64))
};

println!(
"{} {} {}",
name,
date_time(created_at).to_string().green(),
deleted_at
.map(|d| date_time(d).to_string().red())
.unwrap_or_default()
);
}
Ok(())
}

match commands.command {
Commands::Config { action } => match action {
ConfigActions::Set { auth_token } => {
Expand All @@ -438,34 +536,31 @@ async fn run() -> Result<(), S2CliError> {
}
},

Commands::ListBasins {
Commands::Ls {
uri,
prefix,
start_after,
limit,
} => {
let cfg = config::load_config(&config_path)?;
let client_config = client_config(cfg.auth_token)?;
let account_service = AccountService::new(Client::new(client_config));
let response = account_service
.list_basins(
prefix.unwrap_or_default(),
start_after.unwrap_or_default(),
limit.unwrap_or_default(),
)
.await?;

for basin_info in response.basins {
let BasinInfo { name, state, .. } = basin_info;

let state = match state {
s2::types::BasinState::Active => state.to_string().green(),
s2::types::BasinState::Deleting => state.to_string().red(),
_ => state.to_string().yellow(),
};
println!("{} {}", name, state);
if let Some(uri) = uri {
list_streams(client_config, uri, prefix, start_after, limit).await?;
} else {
list_basins(client_config, prefix, start_after, limit).await?;
}
}

Commands::ListBasins {
prefix,
start_after,
limit,
} => {
let cfg = config::load_config(&config_path)?;
let client_config = client_config(cfg.auth_token)?;
list_basins(client_config, prefix, start_after, limit).await?;
}

Commands::CreateBasin { basin, config } => {
let cfg = config::load_config(&config_path)?;
let client_config = client_config(cfg.auth_token)?;
Expand Down Expand Up @@ -534,49 +629,9 @@ async fn run() -> Result<(), S2CliError> {
start_after,
limit,
} => {
let S2BasinAndMaybeStreamUri {
basin,
stream: maybe_prefix,
} = uri;
let prefix = match (maybe_prefix, prefix) {
(Some(_), Some(_)) => {
return Err(S2CliError::InvalidArgs(miette::miette!(
help = "Make sure to provide the prefix once either using '--prefix' opt or in URI like 's2://basin-name/prefix'",
"Multiple prefixes provided"
)));
}
(Some(s), None) | (None, Some(s)) => Some(s),
(None, None) => None,
};
let cfg = config::load_config(&config_path)?;
let client_config = client_config(cfg.auth_token)?;
let basin_client = BasinClient::new(client_config, basin);
let streams = BasinService::new(basin_client)
.list_streams(
prefix.unwrap_or_default(),
start_after.unwrap_or_default(),
limit.unwrap_or_default(),
)
.await?;
for StreamInfo {
name,
created_at,
deleted_at,
} in streams
{
let date_time = |time: u32| {
humantime::format_rfc3339_seconds(UNIX_EPOCH + Duration::from_secs(time as u64))
};

println!(
"{} {} {}",
name,
date_time(created_at).to_string().green(),
deleted_at
.map(|d| date_time(d).to_string().red())
.unwrap_or_default()
);
}
list_streams(client_config, uri, prefix, start_after, limit).await?;
}

Commands::CreateStream { uri, config } => {
Expand Down

0 comments on commit e36d6e6

Please sign in to comment.