From af2798f75a61bae74467cbc12522eaffdf6a50b2 Mon Sep 17 00:00:00 2001 From: Miyamo Date: Mon, 27 May 2024 14:00:40 +0800 Subject: [PATCH] Make dora cli connect to remote coordinator Signed-off-by: Gege-Wang <2891067867@qq.com> --- binaries/cli/src/check.rs | 16 ++++++-- binaries/cli/src/main.rs | 82 +++++++++++++++++++++++++++------------ binaries/cli/src/up.rs | 30 +++++++++----- 3 files changed, 90 insertions(+), 38 deletions(-) diff --git a/binaries/cli/src/check.rs b/binaries/cli/src/check.rs index 9a6607b32..4e3502ea3 100644 --- a/binaries/cli/src/check.rs +++ b/binaries/cli/src/check.rs @@ -1,11 +1,16 @@ use crate::connect_to_coordinator; use communication_layer_request_reply::TcpRequestReplyConnection; +use dora_core::topics::DORA_COORDINATOR_PORT_DEFAULT; use dora_core::topics::{ControlRequest, ControlRequestReply}; use eyre::{bail, Context}; -use std::io::{IsTerminal, Write}; +use std::net::Ipv4Addr; +use std::{ + io::{IsTerminal, Write}, + net::SocketAddr, +}; use termcolor::{Color, ColorChoice, ColorSpec, WriteColor}; -pub fn check_environment() -> eyre::Result<()> { +pub fn check_environment(coordinator_addr: Option) -> eyre::Result<()> { let mut error_occured = false; let color_choice = if std::io::stdout().is_terminal() { @@ -17,7 +22,12 @@ pub fn check_environment() -> eyre::Result<()> { // check whether coordinator is running write!(stdout, "Dora Coordinator: ")?; - let mut session = match connect_to_coordinator() { + let coordination_addr = coordinator_addr.unwrap_or_else(|| { + tracing::info!("Starting in local mode"); + let localhost = Ipv4Addr::new(127, 0, 0, 1); + (localhost, DORA_COORDINATOR_PORT_DEFAULT).into() + }); + let mut session = match connect_to_coordinator(coordination_addr) { Ok(session) => { let _ = stdout.set_color(ColorSpec::new().set_fg(Some(Color::Green))); writeln!(stdout, "ok")?; diff --git a/binaries/cli/src/main.rs b/binaries/cli/src/main.rs index 8ac75485e..b565a4fa8 100644 --- a/binaries/cli/src/main.rs +++ b/binaries/cli/src/main.rs @@ -45,6 +45,7 @@ enum Command { Check { #[clap(long)] dataflow: Option, + coordinator_addr: Option, }, /// Generate a visualization of the given graph using mermaid.js. Use --open to open browser. Graph { @@ -67,17 +68,21 @@ enum Command { Up { #[clap(long)] config: Option, + coordinator_addr: Option, }, /// Destroy running coordinator and daemon. If some dataflows are still running, they will be stopped first. Destroy { #[clap(long)] config: Option, + coordinator_addr: Option, }, /// Start the given dataflow path. Attach a name to the running dataflow by using --name. Start { dataflow: PathBuf, #[clap(long)] name: Option, + #[clap(long)] + coordinator_addr: Option, #[clap(long, action)] attach: bool, #[clap(long, action)] @@ -91,9 +96,12 @@ enum Command { #[clap(long)] #[arg(value_parser = parse)] grace_duration: Option, + coordinator_addr: Option, }, /// List running dataflows. - List, + List { + coordinator_addr: Option, + }, // Planned for future releases: // Dashboard, /// Show logs of a given dataflow and node. @@ -101,6 +109,7 @@ enum Command { Logs { dataflow: Option, node: String, + coordinator_addr: Option, }, // Metrics, // Stats, @@ -184,7 +193,10 @@ fn run() -> eyre::Result<()> { }; match args.command { - Command::Check { dataflow } => match dataflow { + Command::Check { + dataflow, + coordinator_addr, + } => match dataflow { Some(dataflow) => { let working_dir = dataflow .canonicalize() @@ -193,9 +205,9 @@ fn run() -> eyre::Result<()> { .ok_or_else(|| eyre::eyre!("dataflow path has no parent dir"))? .to_owned(); Descriptor::blocking_read(&dataflow)?.check(&working_dir)?; - check::check_environment()? + check::check_environment(coordinator_addr)? } - None => check::check_environment()?, + None => check::check_environment(coordinator_addr)?, }, Command::Graph { dataflow, @@ -211,11 +223,20 @@ fn run() -> eyre::Result<()> { args, internal_create_with_path_dependencies, } => template::create(args, internal_create_with_path_dependencies)?, - Command::Up { config } => up::up(config.as_deref())?, - - Command::Logs { dataflow, node } => { - let mut session = - connect_to_coordinator().wrap_err("failed to connect to dora coordinator")?; + Command::Up { + config, + coordinator_addr, + } => { + up::up(config.as_deref(), coordinator_addr)?; + } + Command::Logs { + dataflow, + node, + coordinator_addr, + } => { + let coordination_addr = coordinator_addr.unwrap_or_else(|| control_socket_addr()); + let mut session = connect_to_coordinator(coordination_addr) + .wrap_err("failed to connect to dora coordinator")?; let uuids = query_running_dataflows(&mut *session) .wrap_err("failed to query running dataflows")?; if let Some(dataflow) = dataflow { @@ -234,6 +255,7 @@ fn run() -> eyre::Result<()> { Command::Start { dataflow, name, + coordinator_addr, attach, hot_reload, } => { @@ -248,8 +270,10 @@ fn run() -> eyre::Result<()> { dataflow_descriptor .check(&working_dir) .wrap_err("Could not validate yaml")?; - let mut session = - connect_to_coordinator().wrap_err("failed to connect to dora coordinator")?; + + let coordination_addr = coordinator_addr.unwrap_or_else(|| control_socket_addr()); + let mut session = connect_to_coordinator(coordination_addr) + .wrap_err("failed to connect to dora coordinator")?; let dataflow_id = start_dataflow( dataflow_descriptor.clone(), name, @@ -267,26 +291,34 @@ fn run() -> eyre::Result<()> { )? } } - Command::List => match connect_to_coordinator() { - Ok(mut session) => list(&mut *session)?, - Err(_) => { - bail!("No dora coordinator seems to be running."); + Command::List { coordinator_addr } => { + let coordination_addr = coordinator_addr.unwrap_or_else(|| control_socket_addr()); + match connect_to_coordinator(coordination_addr) { + Ok(mut session) => list(&mut *session)?, + Err(_) => { + bail!("No dora coordinator seems to be running."); + } } - }, + } Command::Stop { uuid, name, grace_duration, + coordinator_addr, } => { - let mut session = - connect_to_coordinator().wrap_err("could not connect to dora coordinator")?; + let coordination_addr = coordinator_addr.unwrap_or_else(|| control_socket_addr()); + let mut session = connect_to_coordinator(coordination_addr) + .wrap_err("could not connect to dora coordinator")?; match (uuid, name) { (Some(uuid), _) => stop_dataflow(uuid, grace_duration, &mut *session)?, (None, Some(name)) => stop_dataflow_by_name(name, grace_duration, &mut *session)?, (None, None) => stop_dataflow_interactive(grace_duration, &mut *session)?, } } - Command::Destroy { config } => up::destroy(config.as_deref())?, + Command::Destroy { + config, + coordinator_addr, + } => up::destroy(config.as_deref(), coordinator_addr)?, Command::Coordinator { addr } => { let rt = Builder::new_multi_thread() .enable_all() @@ -324,10 +356,8 @@ fn run() -> eyre::Result<()> { } None => { let coordination_addr = coordinator_addr.unwrap_or_else(|| { - tracing::info!("Starting in local mode"); - let localhost = Ipv4Addr::new(127, 0, 0, 1); - (localhost, DORA_COORDINATOR_PORT_DEFAULT).into() - }); + control_socket_addr() + }); Daemon::run(coordination_addr, machine_id.unwrap_or_default(), addr).await } } @@ -466,6 +496,8 @@ fn query_running_dataflows( Ok(ids) } -fn connect_to_coordinator() -> std::io::Result> { - TcpLayer::new().connect(control_socket_addr()) +fn connect_to_coordinator( + coordinator_addr: SocketAddr, +) -> std::io::Result> { + TcpLayer::new().connect(coordinator_addr) } diff --git a/binaries/cli/src/up.rs b/binaries/cli/src/up.rs index bdb7a0b33..fe050f7c4 100644 --- a/binaries/cli/src/up.rs +++ b/binaries/cli/src/up.rs @@ -1,21 +1,24 @@ use crate::{check::daemon_running, connect_to_coordinator}; -use dora_core::topics::ControlRequest; +use dora_core::topics::{control_socket_addr, ControlRequest, DORA_COORDINATOR_PORT_DEFAULT}; use eyre::Context; -use std::{fs, path::Path, process::Command, time::Duration}; - +use std::net::Ipv4Addr; +use std::{fs, net::SocketAddr, path::Path, process::Command, time::Duration}; #[derive(Debug, Default, serde::Serialize, serde::Deserialize)] struct UpConfig {} -pub(crate) fn up(config_path: Option<&Path>) -> eyre::Result<()> { +pub(crate) fn up( + config_path: Option<&Path>, + coordinator_addr: Option, +) -> eyre::Result<()> { let UpConfig {} = parse_dora_config(config_path)?; - - let mut session = match connect_to_coordinator() { + let coordination_addr = coordinator_addr.unwrap_or_else(|| control_socket_addr()); + let mut session = match connect_to_coordinator(coordination_addr) { Ok(session) => session, Err(_) => { start_coordinator().wrap_err("failed to start dora-coordinator")?; loop { - match connect_to_coordinator() { + match connect_to_coordinator(coordination_addr) { Ok(session) => break session, Err(_) => { // sleep a bit until the coordinator accepts connections @@ -47,10 +50,17 @@ pub(crate) fn up(config_path: Option<&Path>) -> eyre::Result<()> { Ok(()) } -pub(crate) fn destroy(config_path: Option<&Path>) -> Result<(), eyre::ErrReport> { +pub(crate) fn destroy( + config_path: Option<&Path>, + coordinator_addr: Option, +) -> Result<(), eyre::ErrReport> { let UpConfig {} = parse_dora_config(config_path)?; - - match connect_to_coordinator() { + let coordination_addr = coordinator_addr.unwrap_or_else(|| { + tracing::info!("Starting in local mode"); + let localhost = Ipv4Addr::new(127, 0, 0, 1); + (localhost, DORA_COORDINATOR_PORT_DEFAULT).into() + }); + match connect_to_coordinator(coordination_addr) { Ok(mut session) => { // send destroy command to dora-coordinator session