diff --git a/binaries/cli/src/main.rs b/binaries/cli/src/main.rs index 4f2c1623e..f02482fd6 100644 --- a/binaries/cli/src/main.rs +++ b/binaries/cli/src/main.rs @@ -119,7 +119,12 @@ enum Command { /// Run runtime Runtime, /// Run coordinator - Coordinator { port: Option }, + Coordinator { + #[clap(long, default_value_t = SocketAddr::new( + IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), DORA_COORDINATOR_PORT_DEFAULT) + )] + bind: SocketAddr, + }, } #[derive(Debug, clap::Args)] @@ -273,14 +278,14 @@ fn run() -> eyre::Result<()> { } } Command::Destroy { config } => up::destroy(config.as_deref())?, - Command::Coordinator { port } => { + Command::Coordinator { bind } => { let rt = Builder::new_multi_thread() .enable_all() .build() .context("tokio runtime failed")?; rt.block_on(async { let (_port, task) = - dora_coordinator::start(port, futures::stream::empty::()).await?; + dora_coordinator::start(bind, futures::stream::empty::()).await?; task.await }) .context("failed to run dora-coordinator")? diff --git a/binaries/coordinator/src/lib.rs b/binaries/coordinator/src/lib.rs index 3d6be47d7..1014ea152 100644 --- a/binaries/coordinator/src/lib.rs +++ b/binaries/coordinator/src/lib.rs @@ -9,10 +9,7 @@ use dora_core::{ daemon_messages::{DaemonCoordinatorEvent, DaemonCoordinatorReply, Timestamped}, descriptor::{Descriptor, ResolvedNode}, message::uhlc::{self, HLC}, - topics::{ - control_socket_addr, ControlRequest, ControlRequestReply, DataflowId, - DORA_COORDINATOR_PORT_DEFAULT, - }, + topics::{control_socket_addr, ControlRequest, ControlRequestReply, DataflowId}, }; use eyre::{bail, eyre, ContextCompat, WrapErr}; use futures::{stream::FuturesUnordered, Future, Stream, StreamExt}; @@ -39,11 +36,10 @@ mod run; mod tcp_utils; pub async fn start( - port: Option, + bind: SocketAddr, external_events: impl Stream + Unpin, ) -> Result<(u16, impl Future>), eyre::ErrReport> { - let port = port.unwrap_or(DORA_COORDINATOR_PORT_DEFAULT); - let listener = listener::create_listener(port).await?; + let listener = listener::create_listener(bind).await?; let port = listener .local_addr() .wrap_err("failed to get local addr of listener")? diff --git a/binaries/coordinator/src/listener.rs b/binaries/coordinator/src/listener.rs index ee78755fd..824d1168c 100644 --- a/binaries/coordinator/src/listener.rs +++ b/binaries/coordinator/src/listener.rs @@ -1,15 +1,14 @@ use crate::{tcp_utils::tcp_receive, DaemonEvent, DataflowEvent, Event}; use dora_core::{coordinator_messages, daemon_messages::Timestamped, message::uhlc::HLC}; use eyre::{eyre, Context}; -use std::{io::ErrorKind, net::Ipv4Addr, sync::Arc}; +use std::{io::ErrorKind, net::SocketAddr, sync::Arc}; use tokio::{ net::{TcpListener, TcpStream}, sync::mpsc, }; -pub async fn create_listener(port: u16) -> eyre::Result { - let localhost = Ipv4Addr::new(127, 0, 0, 1); - let socket = match TcpListener::bind((localhost, port)).await { +pub async fn create_listener(bind: SocketAddr) -> eyre::Result { + let socket = match TcpListener::bind(bind).await { Ok(socket) => socket, Err(err) => { return Err(eyre::Report::new(err).wrap_err("failed to create local TCP listener")) diff --git a/examples/multiple-daemons/run.rs b/examples/multiple-daemons/run.rs index bd0722135..95f64de70 100644 --- a/examples/multiple-daemons/run.rs +++ b/examples/multiple-daemons/run.rs @@ -1,14 +1,14 @@ use dora_coordinator::{ControlEvent, Event}; use dora_core::{ descriptor::Descriptor, - topics::{ControlRequest, ControlRequestReply, DataflowId}, + topics::{ControlRequest, ControlRequestReply, DataflowId, DORA_COORDINATOR_PORT_DEFAULT}, }; use dora_tracing::set_up_tracing; use eyre::{bail, Context}; use std::{ collections::BTreeSet, - net::{Ipv4Addr, SocketAddr}, + net::{IpAddr, Ipv4Addr, SocketAddr}, path::Path, time::Duration, }; @@ -34,8 +34,13 @@ async fn main() -> eyre::Result<()> { build_dataflow(dataflow).await?; let (coordinator_events_tx, coordinator_events_rx) = mpsc::channel(1); + let coordinator_bind = SocketAddr::new( + IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), + DORA_COORDINATOR_PORT_DEFAULT, + ); let (coordinator_port, coordinator) = - dora_coordinator::start(None, ReceiverStream::new(coordinator_events_rx)).await?; + dora_coordinator::start(coordinator_bind, ReceiverStream::new(coordinator_events_rx)) + .await?; let coordinator_addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), coordinator_port); let daemon_a = run_daemon(coordinator_addr.to_string(), "A".into()); let daemon_b = run_daemon(coordinator_addr.to_string(), "B".into());