Skip to content

Commit

Permalink
Make dora cli connect to remote coordinator
Browse files Browse the repository at this point in the history
Signed-off-by: Gege-Wang <[email protected]>

fix: localhost to control_socket_addr()
  • Loading branch information
Miyamo authored and Gege-Wang committed May 28, 2024
1 parent 20cb68c commit c759282
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 35 deletions.
11 changes: 8 additions & 3 deletions binaries/cli/src/check.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
use crate::connect_to_coordinator;
use communication_layer_request_reply::TcpRequestReplyConnection;
use dora_core::topics::control_socket_addr;
use dora_core::topics::{ControlRequest, ControlRequestReply};
use eyre::{bail, Context};
use std::io::{IsTerminal, Write};
use std::{
io::{IsTerminal, Write},
net::SocketAddr,
};
use termcolor::{Color, ColorChoice, ColorSpec, WriteColor};

pub fn check_environment() -> eyre::Result<()> {
pub fn check_environment(coordinator_addr: Option<SocketAddr>) -> eyre::Result<()> {
let mut error_occured = false;

let color_choice = if std::io::stdout().is_terminal() {
Expand All @@ -17,7 +21,8 @@ pub fn check_environment() -> eyre::Result<()> {

// check whether coordinator is running
write!(stdout, "Dora Coordinator: ")?;
let mut session = match connect_to_coordinator() {
let coordination_addr = coordinator_addr.unwrap_or_else(|| control_socket_addr());

Check warning on line 24 in binaries/cli/src/check.rs

View workflow job for this annotation

GitHub Actions / Clippy

redundant closure
let mut session = match connect_to_coordinator(coordination_addr) {
Ok(session) => {
let _ = stdout.set_color(ColorSpec::new().set_fg(Some(Color::Green)));
writeln!(stdout, "ok")?;
Expand Down
78 changes: 56 additions & 22 deletions binaries/cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ enum Command {
Check {
#[clap(long)]
dataflow: Option<PathBuf>,
coordinator_addr: Option<SocketAddr>,
},
/// Generate a visualization of the given graph using mermaid.js. Use --open to open browser.
Graph {
Expand All @@ -67,17 +68,21 @@ enum Command {
Up {
#[clap(long)]
config: Option<PathBuf>,
coordinator_addr: Option<SocketAddr>,
},
/// Destroy running coordinator and daemon. If some dataflows are still running, they will be stopped first.
Destroy {
#[clap(long)]
config: Option<PathBuf>,
coordinator_addr: Option<SocketAddr>,
},
/// Start the given dataflow path. Attach a name to the running dataflow by using --name.
Start {
dataflow: PathBuf,
#[clap(long)]
name: Option<String>,
#[clap(long)]
coordinator_addr: Option<SocketAddr>,
#[clap(long, action)]
attach: bool,
#[clap(long, action)]
Expand All @@ -91,16 +96,20 @@ enum Command {
#[clap(long)]
#[arg(value_parser = parse)]
grace_duration: Option<Duration>,
coordinator_addr: Option<SocketAddr>,
},
/// List running dataflows.
List,
List {
coordinator_addr: Option<SocketAddr>,
},
// Planned for future releases:
// Dashboard,
/// Show logs of a given dataflow and node.
#[command(allow_missing_positional = true)]
Logs {
dataflow: Option<String>,
node: String,
coordinator_addr: Option<SocketAddr>,
},
// Metrics,
// Stats,
Expand Down Expand Up @@ -184,7 +193,10 @@ fn run() -> eyre::Result<()> {
};

match args.command {
Command::Check { dataflow } => match dataflow {
Command::Check {
dataflow,
coordinator_addr,
} => match dataflow {
Some(dataflow) => {
let working_dir = dataflow
.canonicalize()
Expand All @@ -193,9 +205,9 @@ fn run() -> eyre::Result<()> {
.ok_or_else(|| eyre::eyre!("dataflow path has no parent dir"))?
.to_owned();
Descriptor::blocking_read(&dataflow)?.check(&working_dir)?;
check::check_environment()?
check::check_environment(coordinator_addr)?
}
None => check::check_environment()?,
None => check::check_environment(coordinator_addr)?,
},
Command::Graph {
dataflow,
Expand All @@ -211,11 +223,20 @@ fn run() -> eyre::Result<()> {
args,
internal_create_with_path_dependencies,
} => template::create(args, internal_create_with_path_dependencies)?,
Command::Up { config } => up::up(config.as_deref())?,

Command::Logs { dataflow, node } => {
let mut session =
connect_to_coordinator().wrap_err("failed to connect to dora coordinator")?;
Command::Up {
config,
coordinator_addr,
} => {
up::up(config.as_deref(), coordinator_addr)?;
}
Command::Logs {
dataflow,
node,
coordinator_addr,
} => {
let coordination_addr = coordinator_addr.unwrap_or_else(|| control_socket_addr());

Check warning on line 237 in binaries/cli/src/main.rs

View workflow job for this annotation

GitHub Actions / Clippy

redundant closure
let mut session = connect_to_coordinator(coordination_addr)
.wrap_err("failed to connect to dora coordinator")?;
let uuids = query_running_dataflows(&mut *session)
.wrap_err("failed to query running dataflows")?;
if let Some(dataflow) = dataflow {
Expand All @@ -234,6 +255,7 @@ fn run() -> eyre::Result<()> {
Command::Start {
dataflow,
name,
coordinator_addr,
attach,
hot_reload,
} => {
Expand All @@ -248,8 +270,10 @@ fn run() -> eyre::Result<()> {
dataflow_descriptor
.check(&working_dir)
.wrap_err("Could not validate yaml")?;
let mut session =
connect_to_coordinator().wrap_err("failed to connect to dora coordinator")?;

let coordination_addr = coordinator_addr.unwrap_or_else(|| control_socket_addr());

Check warning on line 274 in binaries/cli/src/main.rs

View workflow job for this annotation

GitHub Actions / Clippy

redundant closure
let mut session = connect_to_coordinator(coordination_addr)
.wrap_err("failed to connect to dora coordinator")?;
let dataflow_id = start_dataflow(
dataflow_descriptor.clone(),
name,
Expand All @@ -267,26 +291,34 @@ fn run() -> eyre::Result<()> {
)?
}
}
Command::List => match connect_to_coordinator() {
Ok(mut session) => list(&mut *session)?,
Err(_) => {
bail!("No dora coordinator seems to be running.");
Command::List { coordinator_addr } => {
let coordination_addr = coordinator_addr.unwrap_or_else(|| control_socket_addr());

Check warning on line 295 in binaries/cli/src/main.rs

View workflow job for this annotation

GitHub Actions / Clippy

redundant closure
match connect_to_coordinator(coordination_addr) {
Ok(mut session) => list(&mut *session)?,
Err(_) => {
bail!("No dora coordinator seems to be running.");
}
}
},
}
Command::Stop {
uuid,
name,
grace_duration,
coordinator_addr,
} => {
let mut session =
connect_to_coordinator().wrap_err("could not connect to dora coordinator")?;
let coordination_addr = coordinator_addr.unwrap_or_else(|| control_socket_addr());

Check warning on line 309 in binaries/cli/src/main.rs

View workflow job for this annotation

GitHub Actions / Clippy

redundant closure
let mut session = connect_to_coordinator(coordination_addr)
.wrap_err("could not connect to dora coordinator")?;
match (uuid, name) {
(Some(uuid), _) => stop_dataflow(uuid, grace_duration, &mut *session)?,
(None, Some(name)) => stop_dataflow_by_name(name, grace_duration, &mut *session)?,
(None, None) => stop_dataflow_interactive(grace_duration, &mut *session)?,
}
}
Command::Destroy { config } => up::destroy(config.as_deref())?,
Command::Destroy {
config,
coordinator_addr,
} => up::destroy(config.as_deref(), coordinator_addr)?,
Command::Coordinator { addr } => {
let rt = Builder::new_multi_thread()
.enable_all()
Expand Down Expand Up @@ -327,7 +359,7 @@ fn run() -> eyre::Result<()> {
tracing::info!("Starting in local mode");
let localhost = Ipv4Addr::new(127, 0, 0, 1);
(localhost, DORA_COORDINATOR_PORT_DEFAULT).into()
});
});
Daemon::run(coordination_addr, machine_id.unwrap_or_default(), addr).await
}
}
Expand Down Expand Up @@ -466,6 +498,8 @@ fn query_running_dataflows(
Ok(ids)
}

fn connect_to_coordinator() -> std::io::Result<Box<TcpRequestReplyConnection>> {
TcpLayer::new().connect(control_socket_addr())
fn connect_to_coordinator(
coordinator_addr: SocketAddr,
) -> std::io::Result<Box<TcpRequestReplyConnection>> {
TcpLayer::new().connect(coordinator_addr)
}
25 changes: 15 additions & 10 deletions binaries/cli/src/up.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,23 @@
use crate::{check::daemon_running, connect_to_coordinator};
use dora_core::topics::ControlRequest;
use dora_core::topics::{control_socket_addr, ControlRequest};
use eyre::Context;
use std::{fs, path::Path, process::Command, time::Duration};

use std::{fs, net::SocketAddr, path::Path, process::Command, time::Duration};
#[derive(Debug, Default, serde::Serialize, serde::Deserialize)]
struct UpConfig {}

pub(crate) fn up(config_path: Option<&Path>) -> eyre::Result<()> {
pub(crate) fn up(
config_path: Option<&Path>,
coordinator_addr: Option<SocketAddr>,
) -> eyre::Result<()> {
let UpConfig {} = parse_dora_config(config_path)?;

let mut session = match connect_to_coordinator() {
let coordination_addr = coordinator_addr.unwrap_or_else(|| control_socket_addr());

Check warning on line 13 in binaries/cli/src/up.rs

View workflow job for this annotation

GitHub Actions / Clippy

redundant closure
let mut session = match connect_to_coordinator(coordination_addr) {
Ok(session) => session,
Err(_) => {
start_coordinator().wrap_err("failed to start dora-coordinator")?;

loop {
match connect_to_coordinator() {
match connect_to_coordinator(coordination_addr) {
Ok(session) => break session,
Err(_) => {
// sleep a bit until the coordinator accepts connections
Expand Down Expand Up @@ -47,10 +49,13 @@ pub(crate) fn up(config_path: Option<&Path>) -> eyre::Result<()> {
Ok(())
}

pub(crate) fn destroy(config_path: Option<&Path>) -> Result<(), eyre::ErrReport> {
pub(crate) fn destroy(
config_path: Option<&Path>,
coordinator_addr: Option<SocketAddr>,
) -> Result<(), eyre::ErrReport> {
let UpConfig {} = parse_dora_config(config_path)?;

match connect_to_coordinator() {
let coordination_addr = coordinator_addr.unwrap_or_else(|| control_socket_addr());

Check warning on line 57 in binaries/cli/src/up.rs

View workflow job for this annotation

GitHub Actions / Clippy

redundant closure
match connect_to_coordinator(coordination_addr) {
Ok(mut session) => {
// send destroy command to dora-coordinator
session
Expand Down

0 comments on commit c759282

Please sign in to comment.