diff --git a/Cargo.lock b/Cargo.lock index cd665ba41..929fdc0d3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2316,6 +2316,7 @@ dependencies = [ "inquire", "log", "notify 5.2.0", + "pyo3", "serde", "serde_json", "serde_yaml 0.9.34+deprecated", diff --git a/binaries/cli/Cargo.toml b/binaries/cli/Cargo.toml index 73b4c8589..aac648fed 100644 --- a/binaries/cli/Cargo.toml +++ b/binaries/cli/Cargo.toml @@ -16,6 +16,7 @@ path = "src/main.rs" [features] default = ["tracing"] tracing = ["dep:dora-tracing"] +python = ["pyo3"] [dependencies] clap = { version = "4.0.3", features = ["derive"] } @@ -49,3 +50,13 @@ tabwriter = "1.4.0" log = { version = "0.4.21", features = ["serde"] } colored = "2.1.0" env_logger = "0.11.3" +pyo3 = { workspace = true, features = [ + "extension-module", + "abi3", +], optional = true } + + +[lib] +name = "dora_cli" +path = "src/lib.rs" +crate-type = ["lib", "cdylib"] diff --git a/binaries/cli/pyproject.toml b/binaries/cli/pyproject.toml index 6deee792f..9e580a8e6 100644 --- a/binaries/cli/pyproject.toml +++ b/binaries/cli/pyproject.toml @@ -4,3 +4,9 @@ build-backend = "maturin" [project] name = "dora-rs-cli" + +scripts = { "dora" = "dora_cli:py_main" } + + +[tool.maturin] +features = ["python", "pyo3/extension-module"] diff --git a/binaries/cli/src/lib.rs b/binaries/cli/src/lib.rs new file mode 100644 index 000000000..c86c3664a --- /dev/null +++ b/binaries/cli/src/lib.rs @@ -0,0 +1,706 @@ +use attach::attach_dataflow; +use colored::Colorize; +use communication_layer_request_reply::{RequestReplyLayer, TcpLayer, TcpRequestReplyConnection}; +use dora_coordinator::Event; +use dora_core::{ + descriptor::{source_is_url, Descriptor}, + topics::{ + DORA_COORDINATOR_PORT_CONTROL_DEFAULT, DORA_COORDINATOR_PORT_DEFAULT, + DORA_DAEMON_LOCAL_LISTEN_PORT_DEFAULT, + }, +}; +use dora_daemon::Daemon; +use dora_download::download_file; +use dora_message::{ + cli_to_coordinator::ControlRequest, + coordinator_to_cli::{ControlRequestReply, DataflowList, DataflowResult, DataflowStatus}, +}; +#[cfg(feature = "tracing")] +use dora_tracing::set_up_tracing; +use dora_tracing::set_up_tracing_opts; +use duration_str::parse; +use eyre::{bail, Context}; +use formatting::FormatDataflowError; +use std::{env::current_dir, io::Write, net::SocketAddr}; +use std::{ + net::{IpAddr, Ipv4Addr}, + path::PathBuf, + time::Duration, +}; +use tabwriter::TabWriter; +use tokio::runtime::Builder; +use uuid::Uuid; + +mod attach; +mod build; +mod check; +mod formatting; +mod graph; +mod logs; +mod template; +mod up; + +const LOCALHOST: IpAddr = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)); +const LISTEN_WILDCARD: IpAddr = IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)); + +#[derive(Debug, clap::Parser)] +#[clap(version)] +pub struct Args { + #[clap(subcommand)] + command: Command, +} + +/// dora-rs cli client +#[derive(Debug, clap::Subcommand)] +enum Command { + /// Check if the coordinator and the daemon is running. + Check { + /// Path to the dataflow descriptor file (enables additional checks) + #[clap(long, value_name = "PATH", value_hint = clap::ValueHint::FilePath)] + dataflow: Option, + /// Address of the dora coordinator + #[clap(long, value_name = "IP", default_value_t = LOCALHOST)] + coordinator_addr: IpAddr, + /// Port number of the coordinator control server + #[clap(long, value_name = "PORT", default_value_t = DORA_COORDINATOR_PORT_CONTROL_DEFAULT)] + coordinator_port: u16, + }, + /// Generate a visualization of the given graph using mermaid.js. Use --open to open browser. + Graph { + /// Path to the dataflow descriptor file + #[clap(value_name = "PATH", value_hint = clap::ValueHint::FilePath)] + dataflow: PathBuf, + /// Visualize the dataflow as a Mermaid diagram (instead of HTML) + #[clap(long, action)] + mermaid: bool, + /// Open the HTML visualization in the browser + #[clap(long, action)] + open: bool, + }, + /// Run build commands provided in the given dataflow. + Build { + /// Path to the dataflow descriptor file + #[clap(value_name = "PATH")] + dataflow: String, + }, + /// Generate a new project or node. Choose the language between Rust, Python, C or C++. + New { + #[clap(flatten)] + args: CommandNew, + #[clap(hide = true, long)] + internal_create_with_path_dependencies: bool, + }, + /// Spawn coordinator and daemon in local mode (with default config) + Up { + /// Use a custom configuration + #[clap(long, hide = true, value_name = "PATH", value_hint = clap::ValueHint::FilePath)] + config: Option, + }, + /// Destroy running coordinator and daemon. If some dataflows are still running, they will be stopped first. + Destroy { + /// Use a custom configuration + #[clap(long, hide = true)] + config: Option, + /// Address of the dora coordinator + #[clap(long, value_name = "IP", default_value_t = LOCALHOST)] + coordinator_addr: IpAddr, + /// Port number of the coordinator control server + #[clap(long, value_name = "PORT", default_value_t = DORA_COORDINATOR_PORT_CONTROL_DEFAULT)] + coordinator_port: u16, + }, + /// Start the given dataflow path. Attach a name to the running dataflow by using --name. + Start { + /// Path to the dataflow descriptor file + #[clap(value_name = "PATH")] + dataflow: String, + /// Assign a name to the dataflow + #[clap(long)] + name: Option, + /// Address of the dora coordinator + #[clap(long, value_name = "IP", default_value_t = LOCALHOST)] + coordinator_addr: IpAddr, + /// Port number of the coordinator control server + #[clap(long, value_name = "PORT", default_value_t = DORA_COORDINATOR_PORT_CONTROL_DEFAULT)] + coordinator_port: u16, + /// Attach to the dataflow and wait for its completion + #[clap(long, action)] + attach: bool, + /// Run the dataflow in background + #[clap(long, action)] + detach: bool, + /// Enable hot reloading (Python only) + #[clap(long, action)] + hot_reload: bool, + }, + /// Stop the given dataflow UUID. If no id is provided, you will be able to choose between the running dataflows. + Stop { + /// UUID of the dataflow that should be stopped + uuid: Option, + /// Name of the dataflow that should be stopped + #[clap(long)] + name: Option, + /// Kill the dataflow if it doesn't stop after the given duration + #[clap(long, value_name = "DURATION")] + #[arg(value_parser = parse)] + grace_duration: Option, + /// Address of the dora coordinator + #[clap(long, value_name = "IP", default_value_t = LOCALHOST)] + coordinator_addr: IpAddr, + /// Port number of the coordinator control server + #[clap(long, value_name = "PORT", default_value_t = DORA_COORDINATOR_PORT_CONTROL_DEFAULT)] + coordinator_port: u16, + }, + /// List running dataflows. + List { + /// Address of the dora coordinator + #[clap(long, value_name = "IP", default_value_t = LOCALHOST)] + coordinator_addr: IpAddr, + /// Port number of the coordinator control server + #[clap(long, value_name = "PORT", default_value_t = DORA_COORDINATOR_PORT_CONTROL_DEFAULT)] + coordinator_port: u16, + }, + // Planned for future releases: + // Dashboard, + /// Show logs of a given dataflow and node. + #[command(allow_missing_positional = true)] + Logs { + /// Identifier of the dataflow + #[clap(value_name = "UUID_OR_NAME")] + dataflow: Option, + /// Show logs for the given node + #[clap(value_name = "NAME")] + node: String, + /// Address of the dora coordinator + #[clap(long, value_name = "IP", default_value_t = LOCALHOST)] + coordinator_addr: IpAddr, + /// Port number of the coordinator control server + #[clap(long, value_name = "PORT", default_value_t = DORA_COORDINATOR_PORT_CONTROL_DEFAULT)] + coordinator_port: u16, + }, + // Metrics, + // Stats, + // Get, + // Upgrade, + /// Run daemon + Daemon { + /// Unique identifier for the machine (required for distributed dataflows) + #[clap(long)] + machine_id: Option, + /// The inter daemon IP address and port this daemon will bind to. + #[clap(long, default_value_t = SocketAddr::new(LISTEN_WILDCARD, 0))] + inter_daemon_addr: SocketAddr, + /// Local listen port for event such as dynamic node. + #[clap(long, default_value_t = DORA_DAEMON_LOCAL_LISTEN_PORT_DEFAULT)] + local_listen_port: u16, + /// Address and port number of the dora coordinator + #[clap(long, short, default_value_t = LOCALHOST)] + coordinator_addr: IpAddr, + /// Port number of the coordinator control server + #[clap(long, default_value_t = DORA_COORDINATOR_PORT_DEFAULT)] + coordinator_port: u16, + #[clap(long, hide = true)] + run_dataflow: Option, + /// Suppresses all log output to stdout. + #[clap(long)] + quiet: bool, + }, + /// Run runtime + Runtime, + /// Run coordinator + Coordinator { + /// Network interface to bind to for daemon communication + #[clap(long, default_value_t = LISTEN_WILDCARD)] + interface: IpAddr, + /// Port number to bind to for daemon communication + #[clap(long, default_value_t = DORA_COORDINATOR_PORT_DEFAULT)] + port: u16, + /// Network interface to bind to for control communication + #[clap(long, default_value_t = LISTEN_WILDCARD)] + control_interface: IpAddr, + /// Port number to bind to for control communication + #[clap(long, default_value_t = DORA_COORDINATOR_PORT_CONTROL_DEFAULT)] + control_port: u16, + /// Suppresses all log output to stdout. + #[clap(long)] + quiet: bool, + }, +} + +#[derive(Debug, clap::Args)] +pub struct CommandNew { + /// The entity that should be created + #[clap(long, value_enum, default_value_t = Kind::Dataflow)] + kind: Kind, + /// The programming language that should be used + #[clap(long, value_enum, default_value_t = Lang::Rust)] + lang: Lang, + /// Desired name of the entity + name: String, + /// Where to create the entity + #[clap(hide = true)] + path: Option, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, clap::ValueEnum)] +enum Kind { + Dataflow, + CustomNode, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, clap::ValueEnum)] +enum Lang { + Rust, + Python, + C, + Cxx, +} + +pub fn lib_main(args: Args) { + if let Err(err) = run(args) { + eprintln!("\n\n{}", "[ERROR]".bold().red()); + eprintln!("{err:#}"); + std::process::exit(1); + } +} + +fn run(args: Args) -> eyre::Result<()> { + #[cfg(feature = "tracing")] + match &args.command { + Command::Daemon { + quiet, machine_id, .. + } => { + let name = "dora-daemon"; + let filename = machine_id + .as_ref() + .map(|id| format!("{name}-{id}")) + .unwrap_or(name.to_string()); + set_up_tracing_opts(name, !quiet, Some(&filename)) + .context("failed to set up tracing subscriber")?; + } + Command::Runtime => { + // Do not set the runtime in the cli. + } + Command::Coordinator { quiet, .. } => { + let name = "dora-coordinator"; + set_up_tracing_opts(name, !quiet, Some(name)) + .context("failed to set up tracing subscriber")?; + } + _ => { + set_up_tracing("dora-cli").context("failed to set up tracing subscriber")?; + } + }; + + let log_level = env_logger::Builder::new() + .filter_level(log::LevelFilter::Info) + .parse_default_env() + .build() + .filter(); + + match args.command { + Command::Check { + dataflow, + coordinator_addr, + coordinator_port, + } => match dataflow { + Some(dataflow) => { + let working_dir = dataflow + .canonicalize() + .context("failed to canonicalize dataflow path")? + .parent() + .ok_or_else(|| eyre::eyre!("dataflow path has no parent dir"))? + .to_owned(); + Descriptor::blocking_read(&dataflow)?.check(&working_dir)?; + check::check_environment((coordinator_addr, coordinator_port).into())? + } + None => check::check_environment((coordinator_addr, coordinator_port).into())?, + }, + Command::Graph { + dataflow, + mermaid, + open, + } => { + graph::create(dataflow, mermaid, open)?; + } + Command::Build { dataflow } => { + build::build(dataflow)?; + } + Command::New { + args, + internal_create_with_path_dependencies, + } => template::create(args, internal_create_with_path_dependencies)?, + Command::Up { config } => { + up::up(config.as_deref())?; + } + Command::Logs { + dataflow, + node, + coordinator_addr, + coordinator_port, + } => { + let mut session = connect_to_coordinator((coordinator_addr, coordinator_port).into()) + .wrap_err("failed to connect to dora coordinator")?; + let list = query_running_dataflows(&mut *session) + .wrap_err("failed to query running dataflows")?; + if let Some(dataflow) = dataflow { + let uuid = Uuid::parse_str(&dataflow).ok(); + let name = if uuid.is_some() { None } else { Some(dataflow) }; + logs::logs(&mut *session, uuid, name, node)? + } else { + let active = list.get_active(); + let uuid = match &active[..] { + [] => bail!("No dataflows are running"), + [uuid] => uuid.clone(), + _ => inquire::Select::new("Choose dataflow to show logs:", active).prompt()?, + }; + logs::logs(&mut *session, Some(uuid.uuid), None, node)? + } + } + Command::Start { + dataflow, + name, + coordinator_addr, + coordinator_port, + attach, + detach, + hot_reload, + } => { + let dataflow = resolve_dataflow(dataflow).context("could not resolve dataflow")?; + let dataflow_descriptor = + Descriptor::blocking_read(&dataflow).wrap_err("Failed to read yaml dataflow")?; + let working_dir = dataflow + .canonicalize() + .context("failed to canonicalize dataflow path")? + .parent() + .ok_or_else(|| eyre::eyre!("dataflow path has no parent dir"))? + .to_owned(); + + let coordinator_socket = (coordinator_addr, coordinator_port).into(); + let mut session = connect_to_coordinator(coordinator_socket) + .wrap_err("failed to connect to dora coordinator")?; + let dataflow_id = start_dataflow( + dataflow_descriptor.clone(), + name, + working_dir, + &mut *session, + )?; + + let attach = match (attach, detach) { + (true, true) => eyre::bail!("both `--attach` and `--detach` are given"), + (true, false) => true, + (false, true) => false, + (false, false) => { + println!("attaching to dataflow (use `--detach` to run in background)"); + true + } + }; + + if attach { + attach_dataflow( + dataflow_descriptor, + dataflow, + dataflow_id, + &mut *session, + hot_reload, + coordinator_socket, + log_level, + )? + } + } + Command::List { + coordinator_addr, + coordinator_port, + } => match connect_to_coordinator((coordinator_addr, coordinator_port).into()) { + Ok(mut session) => list(&mut *session)?, + Err(_) => { + bail!("No dora coordinator seems to be running."); + } + }, + Command::Stop { + uuid, + name, + grace_duration, + coordinator_addr, + coordinator_port, + } => { + let mut session = connect_to_coordinator((coordinator_addr, coordinator_port).into()) + .wrap_err("could not connect to dora coordinator")?; + match (uuid, name) { + (Some(uuid), _) => stop_dataflow(uuid, grace_duration, &mut *session)?, + (None, Some(name)) => stop_dataflow_by_name(name, grace_duration, &mut *session)?, + (None, None) => stop_dataflow_interactive(grace_duration, &mut *session)?, + } + } + Command::Destroy { + config, + coordinator_addr, + coordinator_port, + } => up::destroy( + config.as_deref(), + (coordinator_addr, coordinator_port).into(), + )?, + Command::Coordinator { + interface, + port, + control_interface, + control_port, + quiet, + } => { + let rt = Builder::new_multi_thread() + .enable_all() + .build() + .context("tokio runtime failed")?; + rt.block_on(async { + let bind = SocketAddr::new(interface, port); + let bind_control = SocketAddr::new(control_interface, control_port); + let (port, task) = + dora_coordinator::start(bind, bind_control, futures::stream::empty::()) + .await?; + if !quiet { + println!("Listening for incoming daemon connection on {port}"); + } + task.await + }) + .context("failed to run dora-coordinator")? + } + Command::Daemon { + coordinator_addr, + coordinator_port, + inter_daemon_addr, + local_listen_port, + machine_id, + run_dataflow, + quiet: _, + } => { + let rt = Builder::new_multi_thread() + .enable_all() + .build() + .context("tokio runtime failed")?; + rt.block_on(async { + match run_dataflow { + Some(dataflow_path) => { + tracing::info!("Starting dataflow `{}`", dataflow_path.display()); + if coordinator_addr != LOCALHOST { + tracing::info!( + "Not using coordinator addr {} as `run_dataflow` is for local dataflow only. Please use the `start` command for remote coordinator", + coordinator_addr + ); + } + + let result = Daemon::run_dataflow(&dataflow_path).await?; + handle_dataflow_result(result, None) + } + None => { + Daemon::run(SocketAddr::new(coordinator_addr, coordinator_port), machine_id.unwrap_or_default(), inter_daemon_addr, local_listen_port).await + } + } + }) + .context("failed to run dora-daemon")? + } + Command::Runtime => dora_runtime::main().context("Failed to run dora-runtime")?, + }; + + Ok(()) +} + +fn start_dataflow( + dataflow: Descriptor, + name: Option, + local_working_dir: PathBuf, + session: &mut TcpRequestReplyConnection, +) -> Result { + let reply_raw = session + .request( + &serde_json::to_vec(&ControlRequest::Start { + dataflow, + name, + local_working_dir, + }) + .unwrap(), + ) + .wrap_err("failed to send start dataflow message")?; + + let result: ControlRequestReply = + serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?; + match result { + ControlRequestReply::DataflowStarted { uuid } => { + eprintln!("{uuid}"); + Ok(uuid) + } + ControlRequestReply::Error(err) => bail!("{err}"), + other => bail!("unexpected start dataflow reply: {other:?}"), + } +} + +fn stop_dataflow_interactive( + grace_duration: Option, + session: &mut TcpRequestReplyConnection, +) -> eyre::Result<()> { + let list = query_running_dataflows(session).wrap_err("failed to query running dataflows")?; + let active = list.get_active(); + if active.is_empty() { + eprintln!("No dataflows are running"); + } else { + let selection = inquire::Select::new("Choose dataflow to stop:", active).prompt()?; + stop_dataflow(selection.uuid, grace_duration, session)?; + } + + Ok(()) +} + +fn stop_dataflow( + uuid: Uuid, + grace_duration: Option, + session: &mut TcpRequestReplyConnection, +) -> Result<(), eyre::ErrReport> { + let reply_raw = session + .request( + &serde_json::to_vec(&ControlRequest::Stop { + dataflow_uuid: uuid, + grace_duration, + }) + .unwrap(), + ) + .wrap_err("failed to send dataflow stop message")?; + let result: ControlRequestReply = + serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?; + match result { + ControlRequestReply::DataflowStopped { uuid, result } => { + handle_dataflow_result(result, Some(uuid)) + } + ControlRequestReply::Error(err) => bail!("{err}"), + other => bail!("unexpected stop dataflow reply: {other:?}"), + } +} + +fn handle_dataflow_result(result: DataflowResult, uuid: Option) -> Result<(), eyre::Error> { + if result.is_ok() { + Ok(()) + } else { + Err(match uuid { + Some(uuid) => { + eyre::eyre!("Dataflow {uuid} failed:\n{}", FormatDataflowError(&result)) + } + None => { + eyre::eyre!("Dataflow failed:\n{}", FormatDataflowError(&result)) + } + }) + } +} + +fn stop_dataflow_by_name( + name: String, + grace_duration: Option, + session: &mut TcpRequestReplyConnection, +) -> Result<(), eyre::ErrReport> { + let reply_raw = session + .request( + &serde_json::to_vec(&ControlRequest::StopByName { + name, + grace_duration, + }) + .unwrap(), + ) + .wrap_err("failed to send dataflow stop_by_name message")?; + let result: ControlRequestReply = + serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?; + match result { + ControlRequestReply::DataflowStopped { uuid, result } => { + handle_dataflow_result(result, Some(uuid)) + } + ControlRequestReply::Error(err) => bail!("{err}"), + other => bail!("unexpected stop dataflow reply: {other:?}"), + } +} + +fn list(session: &mut TcpRequestReplyConnection) -> Result<(), eyre::ErrReport> { + let list = query_running_dataflows(session)?; + + let mut tw = TabWriter::new(vec![]); + tw.write_all(b"UUID\tName\tStatus\n")?; + for entry in list.0 { + let uuid = entry.id.uuid; + let name = entry.id.name.unwrap_or_default(); + let status = match entry.status { + DataflowStatus::Running => "Running", + DataflowStatus::Finished => "Succeeded", + DataflowStatus::Failed => "Failed", + }; + tw.write_all(format!("{uuid}\t{name}\t{status}\n").as_bytes())?; + } + tw.flush()?; + let formatted = String::from_utf8(tw.into_inner()?)?; + + println!("{formatted}"); + + Ok(()) +} + +fn query_running_dataflows(session: &mut TcpRequestReplyConnection) -> eyre::Result { + let reply_raw = session + .request(&serde_json::to_vec(&ControlRequest::List).unwrap()) + .wrap_err("failed to send list message")?; + let reply: ControlRequestReply = + serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?; + let ids = match reply { + ControlRequestReply::DataflowList(list) => list, + ControlRequestReply::Error(err) => bail!("{err}"), + other => bail!("unexpected list dataflow reply: {other:?}"), + }; + + Ok(ids) +} + +fn connect_to_coordinator( + coordinator_addr: SocketAddr, +) -> std::io::Result> { + TcpLayer::new().connect(coordinator_addr) +} + +fn resolve_dataflow(dataflow: String) -> eyre::Result { + let dataflow = if source_is_url(&dataflow) { + // try to download the shared library + let target_path = current_dir().context("Could not access the current dir")?; + let rt = Builder::new_current_thread() + .enable_all() + .build() + .context("tokio runtime failed")?; + rt.block_on(async { download_file(&dataflow, &target_path).await }) + .wrap_err("failed to download dataflow yaml file")? + } else { + PathBuf::from(dataflow) + }; + Ok(dataflow) +} + +#[cfg(feature = "python")] +use clap::Parser; +#[cfg(feature = "python")] +use pyo3::{ + pyfunction, pymodule, + types::{PyModule, PyModuleMethods}, + wrap_pyfunction, Bound, PyResult, Python, +}; + +#[cfg(feature = "python")] +#[pyfunction] +fn py_main(_py: Python) -> PyResult<()> { + pyo3::prepare_freethreaded_python(); + // Skip first argument as it is a python call. + let args = std::env::args_os().skip(1).collect::>(); + + match Args::try_parse_from(args) { + Ok(args) => lib_main(args), + Err(err) => { + eprintln!("{err}"); + } + } + Ok(()) +} + +/// A Python module implemented in Rust. +#[cfg(feature = "python")] +#[pymodule] +fn dora_cli(_py: Python, m: Bound<'_, PyModule>) -> PyResult<()> { + m.add_function(wrap_pyfunction!(py_main, &m)?)?; + Ok(()) +} diff --git a/binaries/cli/src/main.rs b/binaries/cli/src/main.rs index f6197724e..b28aa0a22 100644 --- a/binaries/cli/src/main.rs +++ b/binaries/cli/src/main.rs @@ -1,676 +1,7 @@ -use attach::attach_dataflow; use clap::Parser; -use colored::Colorize; -use communication_layer_request_reply::{RequestReplyLayer, TcpLayer, TcpRequestReplyConnection}; -use dora_coordinator::Event; -use dora_core::{ - descriptor::{source_is_url, Descriptor}, - topics::{ - DORA_COORDINATOR_PORT_CONTROL_DEFAULT, DORA_COORDINATOR_PORT_DEFAULT, - DORA_DAEMON_LOCAL_LISTEN_PORT_DEFAULT, - }, -}; -use dora_daemon::Daemon; -use dora_download::download_file; -use dora_message::{ - cli_to_coordinator::ControlRequest, - coordinator_to_cli::{ControlRequestReply, DataflowList, DataflowResult, DataflowStatus}, -}; -#[cfg(feature = "tracing")] -use dora_tracing::set_up_tracing; -use dora_tracing::set_up_tracing_opts; -use duration_str::parse; -use eyre::{bail, Context}; -use formatting::FormatDataflowError; -use std::{env::current_dir, io::Write, net::SocketAddr}; -use std::{ - net::{IpAddr, Ipv4Addr}, - path::PathBuf, - time::Duration, -}; -use tabwriter::TabWriter; -use tokio::runtime::Builder; -use uuid::Uuid; +use dora_cli::Args; -mod attach; -mod build; -mod check; -mod formatting; -mod graph; -mod logs; -mod template; -mod up; - -const LOCALHOST: IpAddr = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)); -const LISTEN_WILDCARD: IpAddr = IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)); - -#[derive(Debug, clap::Parser)] -#[clap(version)] -struct Args { - #[clap(subcommand)] - command: Command, -} - -/// dora-rs cli client -#[derive(Debug, clap::Subcommand)] -enum Command { - /// Check if the coordinator and the daemon is running. - Check { - /// Path to the dataflow descriptor file (enables additional checks) - #[clap(long, value_name = "PATH", value_hint = clap::ValueHint::FilePath)] - dataflow: Option, - /// Address of the dora coordinator - #[clap(long, value_name = "IP", default_value_t = LOCALHOST)] - coordinator_addr: IpAddr, - /// Port number of the coordinator control server - #[clap(long, value_name = "PORT", default_value_t = DORA_COORDINATOR_PORT_CONTROL_DEFAULT)] - coordinator_port: u16, - }, - /// Generate a visualization of the given graph using mermaid.js. Use --open to open browser. - Graph { - /// Path to the dataflow descriptor file - #[clap(value_name = "PATH", value_hint = clap::ValueHint::FilePath)] - dataflow: PathBuf, - /// Visualize the dataflow as a Mermaid diagram (instead of HTML) - #[clap(long, action)] - mermaid: bool, - /// Open the HTML visualization in the browser - #[clap(long, action)] - open: bool, - }, - /// Run build commands provided in the given dataflow. - Build { - /// Path to the dataflow descriptor file - #[clap(value_name = "PATH")] - dataflow: String, - }, - /// Generate a new project or node. Choose the language between Rust, Python, C or C++. - New { - #[clap(flatten)] - args: CommandNew, - #[clap(hide = true, long)] - internal_create_with_path_dependencies: bool, - }, - /// Spawn coordinator and daemon in local mode (with default config) - Up { - /// Use a custom configuration - #[clap(long, hide = true, value_name = "PATH", value_hint = clap::ValueHint::FilePath)] - config: Option, - }, - /// Destroy running coordinator and daemon. If some dataflows are still running, they will be stopped first. - Destroy { - /// Use a custom configuration - #[clap(long, hide = true)] - config: Option, - /// Address of the dora coordinator - #[clap(long, value_name = "IP", default_value_t = LOCALHOST)] - coordinator_addr: IpAddr, - /// Port number of the coordinator control server - #[clap(long, value_name = "PORT", default_value_t = DORA_COORDINATOR_PORT_CONTROL_DEFAULT)] - coordinator_port: u16, - }, - /// Start the given dataflow path. Attach a name to the running dataflow by using --name. - Start { - /// Path to the dataflow descriptor file - #[clap(value_name = "PATH")] - dataflow: String, - /// Assign a name to the dataflow - #[clap(long)] - name: Option, - /// Address of the dora coordinator - #[clap(long, value_name = "IP", default_value_t = LOCALHOST)] - coordinator_addr: IpAddr, - /// Port number of the coordinator control server - #[clap(long, value_name = "PORT", default_value_t = DORA_COORDINATOR_PORT_CONTROL_DEFAULT)] - coordinator_port: u16, - /// Attach to the dataflow and wait for its completion - #[clap(long, action)] - attach: bool, - /// Run the dataflow in background - #[clap(long, action)] - detach: bool, - /// Enable hot reloading (Python only) - #[clap(long, action)] - hot_reload: bool, - }, - /// Stop the given dataflow UUID. If no id is provided, you will be able to choose between the running dataflows. - Stop { - /// UUID of the dataflow that should be stopped - uuid: Option, - /// Name of the dataflow that should be stopped - #[clap(long)] - name: Option, - /// Kill the dataflow if it doesn't stop after the given duration - #[clap(long, value_name = "DURATION")] - #[arg(value_parser = parse)] - grace_duration: Option, - /// Address of the dora coordinator - #[clap(long, value_name = "IP", default_value_t = LOCALHOST)] - coordinator_addr: IpAddr, - /// Port number of the coordinator control server - #[clap(long, value_name = "PORT", default_value_t = DORA_COORDINATOR_PORT_CONTROL_DEFAULT)] - coordinator_port: u16, - }, - /// List running dataflows. - List { - /// Address of the dora coordinator - #[clap(long, value_name = "IP", default_value_t = LOCALHOST)] - coordinator_addr: IpAddr, - /// Port number of the coordinator control server - #[clap(long, value_name = "PORT", default_value_t = DORA_COORDINATOR_PORT_CONTROL_DEFAULT)] - coordinator_port: u16, - }, - // Planned for future releases: - // Dashboard, - /// Show logs of a given dataflow and node. - #[command(allow_missing_positional = true)] - Logs { - /// Identifier of the dataflow - #[clap(value_name = "UUID_OR_NAME")] - dataflow: Option, - /// Show logs for the given node - #[clap(value_name = "NAME")] - node: String, - /// Address of the dora coordinator - #[clap(long, value_name = "IP", default_value_t = LOCALHOST)] - coordinator_addr: IpAddr, - /// Port number of the coordinator control server - #[clap(long, value_name = "PORT", default_value_t = DORA_COORDINATOR_PORT_CONTROL_DEFAULT)] - coordinator_port: u16, - }, - // Metrics, - // Stats, - // Get, - // Upgrade, - /// Run daemon - Daemon { - /// Unique identifier for the machine (required for distributed dataflows) - #[clap(long)] - machine_id: Option, - /// The inter daemon IP address and port this daemon will bind to. - #[clap(long, default_value_t = SocketAddr::new(LISTEN_WILDCARD, 0))] - inter_daemon_addr: SocketAddr, - /// Local listen port for event such as dynamic node. - #[clap(long, default_value_t = DORA_DAEMON_LOCAL_LISTEN_PORT_DEFAULT)] - local_listen_port: u16, - /// Address and port number of the dora coordinator - #[clap(long, short, default_value_t = LOCALHOST)] - coordinator_addr: IpAddr, - /// Port number of the coordinator control server - #[clap(long, default_value_t = DORA_COORDINATOR_PORT_DEFAULT)] - coordinator_port: u16, - #[clap(long, hide = true)] - run_dataflow: Option, - /// Suppresses all log output to stdout. - #[clap(long)] - quiet: bool, - }, - /// Run runtime - Runtime, - /// Run coordinator - Coordinator { - /// Network interface to bind to for daemon communication - #[clap(long, default_value_t = LISTEN_WILDCARD)] - interface: IpAddr, - /// Port number to bind to for daemon communication - #[clap(long, default_value_t = DORA_COORDINATOR_PORT_DEFAULT)] - port: u16, - /// Network interface to bind to for control communication - #[clap(long, default_value_t = LISTEN_WILDCARD)] - control_interface: IpAddr, - /// Port number to bind to for control communication - #[clap(long, default_value_t = DORA_COORDINATOR_PORT_CONTROL_DEFAULT)] - control_port: u16, - /// Suppresses all log output to stdout. - #[clap(long)] - quiet: bool, - }, -} - -#[derive(Debug, clap::Args)] -pub struct CommandNew { - /// The entity that should be created - #[clap(long, value_enum, default_value_t = Kind::Dataflow)] - kind: Kind, - /// The programming language that should be used - #[clap(long, value_enum, default_value_t = Lang::Rust)] - lang: Lang, - /// Desired name of the entity - name: String, - /// Where to create the entity - #[clap(hide = true)] - path: Option, -} - -#[derive(Debug, Clone, Copy, PartialEq, Eq, clap::ValueEnum)] -enum Kind { - Dataflow, - CustomNode, -} - -#[derive(Debug, Clone, Copy, PartialEq, Eq, clap::ValueEnum)] -enum Lang { - Rust, - Python, - C, - Cxx, -} - -fn main() { - if let Err(err) = run() { - eprintln!("\n\n{}", "[ERROR]".bold().red()); - eprintln!("{err:#}"); - std::process::exit(1); - } -} - -fn run() -> eyre::Result<()> { +fn main() -> () { let args = Args::parse(); - - #[cfg(feature = "tracing")] - match &args.command { - Command::Daemon { - quiet, machine_id, .. - } => { - let name = "dora-daemon"; - let filename = machine_id - .as_ref() - .map(|id| format!("{name}-{id}")) - .unwrap_or(name.to_string()); - set_up_tracing_opts(name, !quiet, Some(&filename)) - .context("failed to set up tracing subscriber")?; - } - Command::Runtime => { - // Do not set the runtime in the cli. - } - Command::Coordinator { quiet, .. } => { - let name = "dora-coordinator"; - set_up_tracing_opts(name, !quiet, Some(name)) - .context("failed to set up tracing subscriber")?; - } - _ => { - set_up_tracing("dora-cli").context("failed to set up tracing subscriber")?; - } - }; - - let log_level = env_logger::Builder::new() - .filter_level(log::LevelFilter::Info) - .parse_default_env() - .build() - .filter(); - - match args.command { - Command::Check { - dataflow, - coordinator_addr, - coordinator_port, - } => match dataflow { - Some(dataflow) => { - let working_dir = dataflow - .canonicalize() - .context("failed to canonicalize dataflow path")? - .parent() - .ok_or_else(|| eyre::eyre!("dataflow path has no parent dir"))? - .to_owned(); - Descriptor::blocking_read(&dataflow)?.check(&working_dir)?; - check::check_environment((coordinator_addr, coordinator_port).into())? - } - None => check::check_environment((coordinator_addr, coordinator_port).into())?, - }, - Command::Graph { - dataflow, - mermaid, - open, - } => { - graph::create(dataflow, mermaid, open)?; - } - Command::Build { dataflow } => { - build::build(dataflow)?; - } - Command::New { - args, - internal_create_with_path_dependencies, - } => template::create(args, internal_create_with_path_dependencies)?, - Command::Up { config } => { - up::up(config.as_deref())?; - } - Command::Logs { - dataflow, - node, - coordinator_addr, - coordinator_port, - } => { - let mut session = connect_to_coordinator((coordinator_addr, coordinator_port).into()) - .wrap_err("failed to connect to dora coordinator")?; - let list = query_running_dataflows(&mut *session) - .wrap_err("failed to query running dataflows")?; - if let Some(dataflow) = dataflow { - let uuid = Uuid::parse_str(&dataflow).ok(); - let name = if uuid.is_some() { None } else { Some(dataflow) }; - logs::logs(&mut *session, uuid, name, node)? - } else { - let active = list.get_active(); - let uuid = match &active[..] { - [] => bail!("No dataflows are running"), - [uuid] => uuid.clone(), - _ => inquire::Select::new("Choose dataflow to show logs:", active).prompt()?, - }; - logs::logs(&mut *session, Some(uuid.uuid), None, node)? - } - } - Command::Start { - dataflow, - name, - coordinator_addr, - coordinator_port, - attach, - detach, - hot_reload, - } => { - let dataflow = resolve_dataflow(dataflow).context("could not resolve dataflow")?; - let dataflow_descriptor = - Descriptor::blocking_read(&dataflow).wrap_err("Failed to read yaml dataflow")?; - let working_dir = dataflow - .canonicalize() - .context("failed to canonicalize dataflow path")? - .parent() - .ok_or_else(|| eyre::eyre!("dataflow path has no parent dir"))? - .to_owned(); - - let coordinator_socket = (coordinator_addr, coordinator_port).into(); - let mut session = connect_to_coordinator(coordinator_socket) - .wrap_err("failed to connect to dora coordinator")?; - let dataflow_id = start_dataflow( - dataflow_descriptor.clone(), - name, - working_dir, - &mut *session, - )?; - - let attach = match (attach, detach) { - (true, true) => eyre::bail!("both `--attach` and `--detach` are given"), - (true, false) => true, - (false, true) => false, - (false, false) => { - println!("attaching to dataflow (use `--detach` to run in background)"); - true - } - }; - - if attach { - attach_dataflow( - dataflow_descriptor, - dataflow, - dataflow_id, - &mut *session, - hot_reload, - coordinator_socket, - log_level, - )? - } - } - Command::List { - coordinator_addr, - coordinator_port, - } => match connect_to_coordinator((coordinator_addr, coordinator_port).into()) { - Ok(mut session) => list(&mut *session)?, - Err(_) => { - bail!("No dora coordinator seems to be running."); - } - }, - Command::Stop { - uuid, - name, - grace_duration, - coordinator_addr, - coordinator_port, - } => { - let mut session = connect_to_coordinator((coordinator_addr, coordinator_port).into()) - .wrap_err("could not connect to dora coordinator")?; - match (uuid, name) { - (Some(uuid), _) => stop_dataflow(uuid, grace_duration, &mut *session)?, - (None, Some(name)) => stop_dataflow_by_name(name, grace_duration, &mut *session)?, - (None, None) => stop_dataflow_interactive(grace_duration, &mut *session)?, - } - } - Command::Destroy { - config, - coordinator_addr, - coordinator_port, - } => up::destroy( - config.as_deref(), - (coordinator_addr, coordinator_port).into(), - )?, - Command::Coordinator { - interface, - port, - control_interface, - control_port, - quiet, - } => { - let rt = Builder::new_multi_thread() - .enable_all() - .build() - .context("tokio runtime failed")?; - rt.block_on(async { - let bind = SocketAddr::new(interface, port); - let bind_control = SocketAddr::new(control_interface, control_port); - let (port, task) = - dora_coordinator::start(bind, bind_control, futures::stream::empty::()) - .await?; - if !quiet { - println!("Listening for incoming daemon connection on {port}"); - } - task.await - }) - .context("failed to run dora-coordinator")? - } - Command::Daemon { - coordinator_addr, - coordinator_port, - inter_daemon_addr, - local_listen_port, - machine_id, - run_dataflow, - quiet: _, - } => { - let rt = Builder::new_multi_thread() - .enable_all() - .build() - .context("tokio runtime failed")?; - rt.block_on(async { - match run_dataflow { - Some(dataflow_path) => { - tracing::info!("Starting dataflow `{}`", dataflow_path.display()); - if coordinator_addr != LOCALHOST { - tracing::info!( - "Not using coordinator addr {} as `run_dataflow` is for local dataflow only. Please use the `start` command for remote coordinator", - coordinator_addr - ); - } - - let result = Daemon::run_dataflow(&dataflow_path).await?; - handle_dataflow_result(result, None) - } - None => { - Daemon::run(SocketAddr::new(coordinator_addr, coordinator_port), machine_id.unwrap_or_default(), inter_daemon_addr, local_listen_port).await - } - } - }) - .context("failed to run dora-daemon")? - } - Command::Runtime => dora_runtime::main().context("Failed to run dora-runtime")?, - }; - - Ok(()) -} - -fn start_dataflow( - dataflow: Descriptor, - name: Option, - local_working_dir: PathBuf, - session: &mut TcpRequestReplyConnection, -) -> Result { - let reply_raw = session - .request( - &serde_json::to_vec(&ControlRequest::Start { - dataflow, - name, - local_working_dir, - }) - .unwrap(), - ) - .wrap_err("failed to send start dataflow message")?; - - let result: ControlRequestReply = - serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?; - match result { - ControlRequestReply::DataflowStarted { uuid } => { - eprintln!("{uuid}"); - Ok(uuid) - } - ControlRequestReply::Error(err) => bail!("{err}"), - other => bail!("unexpected start dataflow reply: {other:?}"), - } -} - -fn stop_dataflow_interactive( - grace_duration: Option, - session: &mut TcpRequestReplyConnection, -) -> eyre::Result<()> { - let list = query_running_dataflows(session).wrap_err("failed to query running dataflows")?; - let active = list.get_active(); - if active.is_empty() { - eprintln!("No dataflows are running"); - } else { - let selection = inquire::Select::new("Choose dataflow to stop:", active).prompt()?; - stop_dataflow(selection.uuid, grace_duration, session)?; - } - - Ok(()) -} - -fn stop_dataflow( - uuid: Uuid, - grace_duration: Option, - session: &mut TcpRequestReplyConnection, -) -> Result<(), eyre::ErrReport> { - let reply_raw = session - .request( - &serde_json::to_vec(&ControlRequest::Stop { - dataflow_uuid: uuid, - grace_duration, - }) - .unwrap(), - ) - .wrap_err("failed to send dataflow stop message")?; - let result: ControlRequestReply = - serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?; - match result { - ControlRequestReply::DataflowStopped { uuid, result } => { - handle_dataflow_result(result, Some(uuid)) - } - ControlRequestReply::Error(err) => bail!("{err}"), - other => bail!("unexpected stop dataflow reply: {other:?}"), - } -} - -fn handle_dataflow_result(result: DataflowResult, uuid: Option) -> Result<(), eyre::Error> { - if result.is_ok() { - Ok(()) - } else { - Err(match uuid { - Some(uuid) => { - eyre::eyre!("Dataflow {uuid} failed:\n{}", FormatDataflowError(&result)) - } - None => { - eyre::eyre!("Dataflow failed:\n{}", FormatDataflowError(&result)) - } - }) - } -} - -fn stop_dataflow_by_name( - name: String, - grace_duration: Option, - session: &mut TcpRequestReplyConnection, -) -> Result<(), eyre::ErrReport> { - let reply_raw = session - .request( - &serde_json::to_vec(&ControlRequest::StopByName { - name, - grace_duration, - }) - .unwrap(), - ) - .wrap_err("failed to send dataflow stop_by_name message")?; - let result: ControlRequestReply = - serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?; - match result { - ControlRequestReply::DataflowStopped { uuid, result } => { - handle_dataflow_result(result, Some(uuid)) - } - ControlRequestReply::Error(err) => bail!("{err}"), - other => bail!("unexpected stop dataflow reply: {other:?}"), - } -} - -fn list(session: &mut TcpRequestReplyConnection) -> Result<(), eyre::ErrReport> { - let list = query_running_dataflows(session)?; - - let mut tw = TabWriter::new(vec![]); - tw.write_all(b"UUID\tName\tStatus\n")?; - for entry in list.0 { - let uuid = entry.id.uuid; - let name = entry.id.name.unwrap_or_default(); - let status = match entry.status { - DataflowStatus::Running => "Running", - DataflowStatus::Finished => "Succeeded", - DataflowStatus::Failed => "Failed", - }; - tw.write_all(format!("{uuid}\t{name}\t{status}\n").as_bytes())?; - } - tw.flush()?; - let formatted = String::from_utf8(tw.into_inner()?)?; - - println!("{formatted}"); - - Ok(()) -} - -fn query_running_dataflows(session: &mut TcpRequestReplyConnection) -> eyre::Result { - let reply_raw = session - .request(&serde_json::to_vec(&ControlRequest::List).unwrap()) - .wrap_err("failed to send list message")?; - let reply: ControlRequestReply = - serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?; - let ids = match reply { - ControlRequestReply::DataflowList(list) => list, - ControlRequestReply::Error(err) => bail!("{err}"), - other => bail!("unexpected list dataflow reply: {other:?}"), - }; - - Ok(ids) -} - -fn connect_to_coordinator( - coordinator_addr: SocketAddr, -) -> std::io::Result> { - TcpLayer::new().connect(coordinator_addr) -} - -fn resolve_dataflow(dataflow: String) -> eyre::Result { - let dataflow = if source_is_url(&dataflow) { - // try to download the shared library - let target_path = current_dir().context("Could not access the current dir")?; - let rt = Builder::new_current_thread() - .enable_all() - .build() - .context("tokio runtime failed")?; - rt.block_on(async { download_file(&dataflow, &target_path).await }) - .wrap_err("failed to download dataflow yaml file")? - } else { - PathBuf::from(dataflow) - }; - Ok(dataflow) + dora_cli::lib_main(args); } diff --git a/binaries/cli/src/up.rs b/binaries/cli/src/up.rs index b6345794b..16f1a4c1f 100644 --- a/binaries/cli/src/up.rs +++ b/binaries/cli/src/up.rs @@ -1,7 +1,7 @@ use crate::{check::daemon_running, connect_to_coordinator, LOCALHOST}; use dora_core::topics::DORA_COORDINATOR_PORT_CONTROL_DEFAULT; use dora_message::{cli_to_coordinator::ControlRequest, coordinator_to_cli::ControlRequestReply}; -use eyre::{bail, Context}; +use eyre::{bail, Context, ContextCompat}; use std::{fs, net::SocketAddr, path::Path, process::Command, time::Duration}; #[derive(Debug, Default, serde::Serialize, serde::Deserialize)] struct UpConfig {} @@ -95,8 +95,16 @@ fn parse_dora_config(config_path: Option<&Path>) -> Result eyre::Result<()> { - let mut cmd = - Command::new(std::env::current_exe().wrap_err("failed to get current executable path")?); + let path = if cfg!(feature = "python") { + std::env::args_os() + .nth(1) + .context("Could not get first argument correspond to dora with python installation")? + } else { + std::env::args_os() + .next() + .context("Could not get dora path")? + }; + let mut cmd = Command::new(path); cmd.arg("coordinator"); cmd.arg("--quiet"); cmd.spawn().wrap_err("failed to run `dora coordinator`")?; @@ -107,8 +115,16 @@ fn start_coordinator() -> eyre::Result<()> { } fn start_daemon() -> eyre::Result<()> { - let mut cmd = - Command::new(std::env::current_exe().wrap_err("failed to get current executable path")?); + let path = if cfg!(feature = "python") { + std::env::args_os() + .nth(1) + .context("Could not get first argument correspond to dora with python installation")? + } else { + std::env::args_os() + .next() + .context("Could not get dora path")? + }; + let mut cmd = Command::new(path); cmd.arg("daemon"); cmd.arg("--quiet"); cmd.spawn().wrap_err("failed to run `dora daemon`")?;