Skip to content

Commit

Permalink
feat: add bind configuration for the inter_daemon listener
Browse files Browse the repository at this point in the history
  • Loading branch information
Michael-J-Ward committed Apr 16, 2024
1 parent 1c2dc46 commit f659022
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 11 deletions.
12 changes: 10 additions & 2 deletions binaries/cli/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use std::{net::Ipv4Addr, path::PathBuf};
use std::{
net::{IpAddr, Ipv4Addr},
path::PathBuf,
};

use attach::attach_dataflow;
use clap::Parser;
Expand Down Expand Up @@ -103,6 +106,10 @@ enum Command {
Daemon {
#[clap(long)]
machine_id: Option<String>,
#[clap(long, default_value_t = SocketAddr::new(
IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0)
)]
bind: SocketAddr,
#[clap(long)]
coordinator_addr: Option<SocketAddr>,

Expand Down Expand Up @@ -280,6 +287,7 @@ fn run() -> eyre::Result<()> {
}
Command::Daemon {
coordinator_addr,
bind,
machine_id,
run_dataflow,
} => {
Expand All @@ -306,7 +314,7 @@ fn run() -> eyre::Result<()> {
let localhost = Ipv4Addr::new(127, 0, 0, 1);
(localhost, DORA_COORDINATOR_PORT_DEFAULT).into()
});
Daemon::run(addr, machine_id.unwrap_or_default()).await
Daemon::run(addr, machine_id.unwrap_or_default(), bind).await
}
}
})
Expand Down
10 changes: 3 additions & 7 deletions binaries/daemon/src/inter_daemon.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
use crate::tcp_utils::{tcp_receive, tcp_send};
use dora_core::daemon_messages::{InterDaemonEvent, Timestamped};
use eyre::{Context, ContextCompat};
use std::{
collections::BTreeMap,
io::ErrorKind,
net::{Ipv4Addr, SocketAddr},
};
use std::{collections::BTreeMap, io::ErrorKind, net::SocketAddr};
use tokio::net::{TcpListener, TcpStream};

pub struct InterDaemonConnection {
Expand Down Expand Up @@ -65,11 +61,11 @@ pub async fn send_inter_daemon_event(
}

pub async fn spawn_listener_loop(
bind: SocketAddr,
machine_id: String,
events_tx: flume::Sender<Timestamped<InterDaemonEvent>>,
) -> eyre::Result<SocketAddr> {
let localhost = Ipv4Addr::new(127, 0, 0, 1);
let socket = match TcpListener::bind((localhost, 0)).await {
let socket = match TcpListener::bind(bind).await {
Ok(socket) => socket,
Err(err) => {
return Err(eyre::Report::new(err).wrap_err("failed to create local TCP listener"))
Expand Down
8 changes: 6 additions & 2 deletions binaries/daemon/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,15 +77,19 @@ pub struct Daemon {
}

impl Daemon {
pub async fn run(coordinator_addr: SocketAddr, machine_id: String) -> eyre::Result<()> {
pub async fn run(
coordinator_addr: SocketAddr,
machine_id: String,
bind_addr: SocketAddr,
) -> eyre::Result<()> {
let clock = Arc::new(HLC::default());

let ctrlc_events = set_up_ctrlc_handler(clock.clone())?;

// spawn listen loop
let (events_tx, events_rx) = flume::bounded(10);
let listen_socket =
inter_daemon::spawn_listener_loop(machine_id.clone(), events_tx).await?;
inter_daemon::spawn_listener_loop(bind_addr, machine_id.clone(), events_tx).await?;
let daemon_events = events_rx.into_stream().map(|e| Timestamped {
inner: Event::Daemon(e.inner),
timestamp: e.timestamp,
Expand Down

0 comments on commit f659022

Please sign in to comment.