From d4f37a3d178e5c6240586099cd2e64f33ef12efe Mon Sep 17 00:00:00 2001 From: XxChang Date: Wed, 17 Jul 2024 20:08:05 +0800 Subject: [PATCH 01/11] add support to unix domain --- apis/rust/node/src/daemon_connection/mod.rs | 14 +++ .../node/src/daemon_connection/unix_domain.rs | 84 +++++++++++++++++ apis/rust/node/src/event_stream/mod.rs | 12 +++ apis/rust/node/src/node/control_channel.rs | 5 + apis/rust/node/src/node/drop_stream.rs | 6 ++ binaries/daemon/src/node_communication/mod.rs | 37 +++++++- .../src/node_communication/unix_domain.rs | 93 +++++++++++++++++++ libraries/core/src/config.rs | 2 + libraries/core/src/daemon_messages.rs | 4 + 9 files changed, 255 insertions(+), 2 deletions(-) create mode 100644 apis/rust/node/src/daemon_connection/unix_domain.rs create mode 100644 binaries/daemon/src/node_communication/unix_domain.rs diff --git a/apis/rust/node/src/daemon_connection/mod.rs b/apis/rust/node/src/daemon_connection/mod.rs index 7d778fb45..1f0454276 100644 --- a/apis/rust/node/src/daemon_connection/mod.rs +++ b/apis/rust/node/src/daemon_connection/mod.rs @@ -5,16 +5,21 @@ use dora_core::{ }; use eyre::{bail, eyre, Context}; use shared_memory_server::{ShmemClient, ShmemConf}; +#[cfg(unix)] +use std::os::unix::net::UnixStream; use std::{ net::{SocketAddr, TcpStream}, time::Duration, }; mod tcp; +#[cfg(unix)] +mod unix_domain; pub enum DaemonChannel { Shmem(ShmemClient, DaemonReply>), Tcp(TcpStream), + UnixDomain(UnixStream), } impl DaemonChannel { @@ -38,6 +43,13 @@ impl DaemonChannel { Ok(channel) } + #[cfg(unix)] + #[tracing::instrument(level = "trace")] + pub fn new_unix_socket(path: &str) -> eyre::Result { + let stream = UnixStream::connect(path).wrap_err("failed to open Unix socket")?; + Ok(DaemonChannel::UnixDomain(stream)) + } + pub fn register( &mut self, dataflow_id: DataflowId, @@ -69,6 +81,8 @@ impl DaemonChannel { match self { DaemonChannel::Shmem(client) => client.request(request), DaemonChannel::Tcp(stream) => tcp::request(stream, request), + #[cfg(unix)] + DaemonChannel::UnixDomain(stream) => unix_domain::request(stream, request), } } } diff --git a/apis/rust/node/src/daemon_connection/unix_domain.rs b/apis/rust/node/src/daemon_connection/unix_domain.rs new file mode 100644 index 000000000..29d6f3259 --- /dev/null +++ b/apis/rust/node/src/daemon_connection/unix_domain.rs @@ -0,0 +1,84 @@ +use dora_core::daemon_messages::{DaemonReply, DaemonRequest, Timestamped}; +use eyre::{eyre, Context}; +use std::{ + io::{Read, Write}, + os::unix::net::UnixStream, +}; + +enum Serializer { + Bincode, + SerdeJson, +} +pub fn request( + connection: &mut UnixStream, + request: &Timestamped, +) -> eyre::Result { + send_message(connection, request)?; + if request.inner.expects_tcp_bincode_reply() { + receive_reply(connection, Serializer::Bincode) + .and_then(|reply| reply.ok_or_else(|| eyre!("server disconnected unexpectedly"))) + // Use serde json for message with variable length + } else if request.inner.expects_tcp_json_reply() { + receive_reply(connection, Serializer::SerdeJson) + .and_then(|reply| reply.ok_or_else(|| eyre!("server disconnected unexpectedly"))) + } else { + Ok(DaemonReply::Empty) + } +} + +fn send_message( + connection: &mut UnixStream, + message: &Timestamped, +) -> eyre::Result<()> { + let serialized = bincode::serialize(&message).wrap_err("failed to serialize DaemonRequest")?; + tcp_send(connection, &serialized).wrap_err("failed to send DaemonRequest")?; + Ok(()) +} + +fn receive_reply( + connection: &mut UnixStream, + serializer: Serializer, +) -> eyre::Result> { + let raw = match tcp_receive(connection) { + Ok(raw) => raw, + Err(err) => match err.kind() { + std::io::ErrorKind::UnexpectedEof | std::io::ErrorKind::ConnectionAborted => { + return Ok(None) + } + other => { + return Err(err).with_context(|| { + format!( + "unexpected I/O error (kind {other:?}) while trying to receive DaemonReply" + ) + }) + } + }, + }; + match serializer { + Serializer::Bincode => bincode::deserialize(&raw) + .wrap_err("failed to deserialize DaemonReply") + .map(Some), + Serializer::SerdeJson => serde_json::from_slice(&raw) + .wrap_err("failed to deserialize DaemonReply") + .map(Some), + } +} + +fn tcp_send(connection: &mut (impl Write + Unpin), message: &[u8]) -> std::io::Result<()> { + let len_raw = (message.len() as u64).to_le_bytes(); + connection.write_all(&len_raw)?; + connection.write_all(message)?; + connection.flush()?; + Ok(()) +} + +fn tcp_receive(connection: &mut (impl Read + Unpin)) -> std::io::Result> { + let reply_len = { + let mut raw = [0; 8]; + connection.read_exact(&mut raw)?; + u64::from_le_bytes(raw) as usize + }; + let mut reply = vec![0; reply_len]; + connection.read_exact(&mut reply)?; + Ok(reply) +} diff --git a/apis/rust/node/src/event_stream/mod.rs b/apis/rust/node/src/event_stream/mod.rs index 9575a8d7b..104ffe2ae 100644 --- a/apis/rust/node/src/event_stream/mod.rs +++ b/apis/rust/node/src/event_stream/mod.rs @@ -50,6 +50,12 @@ impl EventStream { )?, DaemonCommunication::Tcp { socket_addr } => DaemonChannel::new_tcp(*socket_addr) .wrap_err_with(|| format!("failed to connect event stream for node `{node_id}`"))?, + #[cfg(unix)] + DaemonCommunication::UnixDomain { socket_addr } => { + DaemonChannel::new_unix_socket(&socket_addr).wrap_err_with(|| { + format!("failed to connect event stream for node `{node_id}`") + })? + } }; let close_channel = match daemon_communication { @@ -63,6 +69,12 @@ impl EventStream { .wrap_err_with(|| { format!("failed to connect event close channel for node `{node_id}`") })?, + #[cfg(unix)] + DaemonCommunication::UnixDomain { socket_addr } => { + DaemonChannel::new_unix_socket(&socket_addr).wrap_err_with(|| { + format!("failed to connect event close channel for node `{node_id}`") + })? + } }; Self::init_on_channel(dataflow_id, node_id, channel, close_channel, clock) diff --git a/apis/rust/node/src/node/control_channel.rs b/apis/rust/node/src/node/control_channel.rs index 693295789..d9c4ecb60 100644 --- a/apis/rust/node/src/node/control_channel.rs +++ b/apis/rust/node/src/node/control_channel.rs @@ -29,6 +29,11 @@ impl ControlChannel { .wrap_err("failed to create shmem control channel")?, DaemonCommunication::Tcp { socket_addr } => DaemonChannel::new_tcp(*socket_addr) .wrap_err("failed to connect control channel")?, + #[cfg(unix)] + DaemonCommunication::UnixDomain { socket_addr } => { + DaemonChannel::new_unix_socket(&socket_addr) + .wrap_err("failed to connect control channel")? + } }; Self::init_on_channel(dataflow_id, node_id, channel, clock) diff --git a/apis/rust/node/src/node/drop_stream.rs b/apis/rust/node/src/node/drop_stream.rs index efe6c7964..cbf5d443b 100644 --- a/apis/rust/node/src/node/drop_stream.rs +++ b/apis/rust/node/src/node/drop_stream.rs @@ -36,6 +36,12 @@ impl DropStream { } DaemonCommunication::Tcp { socket_addr } => DaemonChannel::new_tcp(*socket_addr) .wrap_err_with(|| format!("failed to connect drop stream for node `{node_id}`"))?, + #[cfg(unix)] + DaemonCommunication::UnixDomain { socket_addr } => { + DaemonChannel::new_unix_socket(&socket_addr).wrap_err_with(|| { + format!("failed to connect drop stream for node `{node_id}`") + })? + } }; Self::init_on_channel(dataflow_id, node_id, channel, hlc) diff --git a/binaries/daemon/src/node_communication/mod.rs b/binaries/daemon/src/node_communication/mod.rs index 738fa65e7..d124db7c8 100644 --- a/binaries/daemon/src/node_communication/mod.rs +++ b/binaries/daemon/src/node_communication/mod.rs @@ -8,17 +8,18 @@ use dora_core::{ message::uhlc, topics::LOCALHOST, }; -use eyre::{eyre, Context}; +use eyre::{eyre, Context, ContextCompat}; use futures::{future, task, Future}; use shared_memory_server::{ShmemConf, ShmemServer}; use std::{ collections::{BTreeMap, VecDeque}, mem, + path::Path, sync::Arc, task::Poll, }; use tokio::{ - net::TcpListener, + net::{TcpListener, UnixListener}, sync::{ mpsc::{self, UnboundedReceiver}, oneshot, @@ -28,6 +29,8 @@ use tokio::{ // TODO unify and avoid duplication; pub mod shmem; pub mod tcp; +#[cfg(unix)] +pub mod unix_domain; pub async fn spawn_listener_loop( dataflow_id: &DataflowId, @@ -138,6 +141,36 @@ pub async fn spawn_listener_loop( daemon_events_close_region_id, }) } + #[cfg(unix)] + LocalCommunicationConfig::UnixDomain => { + let tmpfile_dir = Path::new("/tmp"); + let tmpfile_dir = tmpfile_dir.join(dataflow_id.to_string()); + if !tmpfile_dir.exists() { + std::fs::create_dir_all(&tmpfile_dir).context("could not create tmp dir")?; + } + let socket_file = tmpfile_dir.join(format!("{}.sock", node_id.to_string())); + let socket = match UnixListener::bind(&socket_file) { + Ok(socket) => socket, + Err(err) => { + return Err(eyre::Report::new(err) + .wrap_err("failed to create local Unix domain socket")) + } + }; + + let event_loop_node_id = format!("{dataflow_id}/{node_id}"); + let daemon_tx = daemon_tx.clone(); + tokio::spawn(async move { + unix_domain::listener_loop(socket, daemon_tx, queue_sizes, clock).await; + tracing::debug!("event listener loop finished for `{event_loop_node_id}`"); + }); + + Ok(DaemonCommunication::UnixDomain { + socket_addr: socket_file + .to_str() + .wrap_err("get unix domain file addr failed")? + .to_string(), + }) + } } } diff --git a/binaries/daemon/src/node_communication/unix_domain.rs b/binaries/daemon/src/node_communication/unix_domain.rs new file mode 100644 index 000000000..7713eccc4 --- /dev/null +++ b/binaries/daemon/src/node_communication/unix_domain.rs @@ -0,0 +1,93 @@ +use std::{collections::BTreeMap, io::ErrorKind, sync::Arc}; + +use dora_core::{ + config::DataId, + daemon_messages::{DaemonReply, DaemonRequest, Timestamped}, + message::uhlc::HLC, +}; +use eyre::Context; +use tokio::{ + net::{UnixListener, UnixStream}, + sync::mpsc, +}; + +use crate::{ + tcp_utils::{tcp_receive, tcp_send}, + Event, +}; + +use super::{Connection, Listener}; + +#[tracing::instrument(skip(listener, daemon_tx, clock), level = "trace")] +pub async fn listener_loop( + listener: UnixListener, + daemon_tx: mpsc::Sender>, + queue_sizes: BTreeMap, + clock: Arc, +) { + loop { + match listener + .accept() + .await + .wrap_err("failed to accept new connection") + { + Err(err) => { + tracing::info!("{err}"); + } + Ok((connection, _)) => { + tokio::spawn(handle_connection_loop( + connection, + daemon_tx.clone(), + queue_sizes.clone(), + clock.clone(), + )); + } + } + } +} + +#[tracing::instrument(skip(connection, daemon_tx, clock), level = "trace")] +async fn handle_connection_loop( + connection: UnixStream, + daemon_tx: mpsc::Sender>, + queue_sizes: BTreeMap, + clock: Arc, +) { + Listener::run(UnixConnection(connection), daemon_tx, queue_sizes, clock).await +} + +struct UnixConnection(UnixStream); + +#[async_trait::async_trait] +impl Connection for UnixConnection { + async fn receive_message(&mut self) -> eyre::Result>> { + let raw = match tcp_receive(&mut self.0).await { + Ok(raw) => raw, + Err(err) => match err.kind() { + ErrorKind::UnexpectedEof + | ErrorKind::ConnectionAborted + | ErrorKind::ConnectionReset => return Ok(None), + _other => { + return Err(err) + .context("unexpected I/O error while trying to receive DaemonRequest") + } + }, + }; + bincode::deserialize(&raw) + .wrap_err("failed to deserialize DaemonRequest") + .map(Some) + } + + async fn send_reply(&mut self, message: DaemonReply) -> eyre::Result<()> { + if matches!(message, DaemonReply::Empty) { + // don't send empty replies + return Ok(()); + } + let serialized = + bincode::serialize(&message).wrap_err("failed to serialize DaemonReply")?; + tcp_send(&mut self.0, &serialized) + .await + .wrap_err("failed to send DaemonReply")?; + Ok(()) + } +} diff --git a/libraries/core/src/config.rs b/libraries/core/src/config.rs index 6b8f2ac15..a6118c8ae 100644 --- a/libraries/core/src/config.rs +++ b/libraries/core/src/config.rs @@ -348,6 +348,8 @@ pub struct CommunicationConfig { pub enum LocalCommunicationConfig { Tcp, Shmem, + #[cfg(unix)] + UnixDomain, } impl Default for LocalCommunicationConfig { diff --git a/libraries/core/src/daemon_messages.rs b/libraries/core/src/daemon_messages.rs index ce6c459a7..9e61f5eba 100644 --- a/libraries/core/src/daemon_messages.rs +++ b/libraries/core/src/daemon_messages.rs @@ -35,6 +35,10 @@ pub enum DaemonCommunication { Tcp { socket_addr: SocketAddr, }, + #[cfg(unix)] + UnixDomain { + socket_addr: String, + }, } #[derive(Debug, serde::Serialize, serde::Deserialize)] From ed67dbea30a2e5fbd461e6c4addb8c704d5be3d8 Mon Sep 17 00:00:00 2001 From: XxChang Date: Wed, 17 Jul 2024 20:31:16 +0800 Subject: [PATCH 02/11] add ci --- .github/workflows/ci.yml | 6 ++++++ apis/rust/node/src/daemon_connection/mod.rs | 1 + 2 files changed, 7 insertions(+) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 8708ba964..2892069b2 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -129,6 +129,12 @@ jobs: if: runner.os == 'Linux' timeout-minutes: 30 run: cargo run --example cmake-dataflow + - name: "Unix Domain example" + if: runner.os == 'Linux' + run: | + sed -i '1i communication:\r\n\t_unstable_local:\r\n\t\tUnixDomain' examples/rust-dataflow/dataflow.yml + cargo run --example rust-dataflow + # python examples - uses: actions/setup-python@v2 diff --git a/apis/rust/node/src/daemon_connection/mod.rs b/apis/rust/node/src/daemon_connection/mod.rs index 1f0454276..ca2a39796 100644 --- a/apis/rust/node/src/daemon_connection/mod.rs +++ b/apis/rust/node/src/daemon_connection/mod.rs @@ -19,6 +19,7 @@ mod unix_domain; pub enum DaemonChannel { Shmem(ShmemClient, DaemonReply>), Tcp(TcpStream), + #[cfg(unix)] UnixDomain(UnixStream), } From 7a5e904721d9e169801a111a76e49a4c5bd5fd1c Mon Sep 17 00:00:00 2001 From: XxChang Date: Wed, 17 Jul 2024 20:38:57 +0800 Subject: [PATCH 03/11] minor --- .github/workflows/ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 2892069b2..c79fd2a0b 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -132,7 +132,7 @@ jobs: - name: "Unix Domain example" if: runner.os == 'Linux' run: | - sed -i '1i communication:\r\n\t_unstable_local:\r\n\t\tUnixDomain' examples/rust-dataflow/dataflow.yml + sed -i '1i communication:\r\n\t_unstable_local:\r\n\t\tUnixDomain\r\n\r\n' examples/rust-dataflow/dataflow.yml cargo run --example rust-dataflow From 130507befa0cfca0d2be4ee9b112fa340cac3370 Mon Sep 17 00:00:00 2001 From: XxChang Date: Wed, 17 Jul 2024 22:00:47 +0800 Subject: [PATCH 04/11] add ci test --- .github/workflows/ci.yml | 7 ++-- binaries/daemon/src/node_communication/mod.rs | 4 ++- examples/rust-dataflow/dataflow_socket.yml | 32 +++++++++++++++++++ 3 files changed, 39 insertions(+), 4 deletions(-) create mode 100644 examples/rust-dataflow/dataflow_socket.yml diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index c79fd2a0b..7380b1403 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -132,9 +132,10 @@ jobs: - name: "Unix Domain example" if: runner.os == 'Linux' run: | - sed -i '1i communication:\r\n\t_unstable_local:\r\n\t\tUnixDomain\r\n\r\n' examples/rust-dataflow/dataflow.yml - cargo run --example rust-dataflow - + dora build examples/rust-dataflow/dataflow_socket.yml + dora start examples/rust-dataflow/dataflow_socket.yml --name ci-rust-socket + dora stop --name ci-rust-socket --grace-duration 5s + dora destroy # python examples - uses: actions/setup-python@v2 diff --git a/binaries/daemon/src/node_communication/mod.rs b/binaries/daemon/src/node_communication/mod.rs index d124db7c8..d75a44579 100644 --- a/binaries/daemon/src/node_communication/mod.rs +++ b/binaries/daemon/src/node_communication/mod.rs @@ -18,8 +18,10 @@ use std::{ sync::Arc, task::Poll, }; +#[cfg(unix)] +use tokio::net::UnixListener; use tokio::{ - net::{TcpListener, UnixListener}, + net::TcpListener, sync::{ mpsc::{self, UnboundedReceiver}, oneshot, diff --git a/examples/rust-dataflow/dataflow_socket.yml b/examples/rust-dataflow/dataflow_socket.yml new file mode 100644 index 000000000..ce998a8ca --- /dev/null +++ b/examples/rust-dataflow/dataflow_socket.yml @@ -0,0 +1,32 @@ +communication: + _unstable_local: + UnixDomain + +nodes: + - id: rust-node + build: cargo build -p rust-dataflow-example-node + path: ../../target/debug/rust-dataflow-example-node + inputs: + tick: dora/timer/millis/10 + outputs: + - random + - id: rust-status-node + custom: + build: cargo build -p rust-dataflow-example-status-node + source: ../../target/debug/rust-dataflow-example-status-node + inputs: + tick: dora/timer/millis/100 + random: rust-node/random + outputs: + - status + - id: rust-sink + build: cargo build -p rust-dataflow-example-sink + path: ../../target/debug/rust-dataflow-example-sink + inputs: + message: rust-status-node/status + - id: dora-record + build: cargo build -p dora-record + path: ../../target/debug/dora-record + inputs: + message: rust-status-node/status + random: rust-node/random From 923a74eb950b2c8c31a333885f4ed6cc97c21968 Mon Sep 17 00:00:00 2001 From: XxChang Date: Wed, 17 Jul 2024 22:05:21 +0800 Subject: [PATCH 05/11] minor --- .github/workflows/ci.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 7380b1403..f85526fa5 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -132,6 +132,7 @@ jobs: - name: "Unix Domain example" if: runner.os == 'Linux' run: | + dora up dora build examples/rust-dataflow/dataflow_socket.yml dora start examples/rust-dataflow/dataflow_socket.yml --name ci-rust-socket dora stop --name ci-rust-socket --grace-duration 5s From 8c4cadd9d807bb0996447b2fcb463d3ede240cad Mon Sep 17 00:00:00 2001 From: XxChang Date: Wed, 17 Jul 2024 22:27:51 +0800 Subject: [PATCH 06/11] fix ci bug --- .github/workflows/ci.yml | 7 ++----- binaries/daemon/src/node_communication/mod.rs | 2 +- 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index f85526fa5..b28da864a 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -132,11 +132,8 @@ jobs: - name: "Unix Domain example" if: runner.os == 'Linux' run: | - dora up - dora build examples/rust-dataflow/dataflow_socket.yml - dora start examples/rust-dataflow/dataflow_socket.yml --name ci-rust-socket - dora stop --name ci-rust-socket --grace-duration 5s - dora destroy + sed -i '1i communication:\n _unstable_local:\n UnixDomain\n' dataflow.yml + cargo run --example rust-dataflow # python examples - uses: actions/setup-python@v2 diff --git a/binaries/daemon/src/node_communication/mod.rs b/binaries/daemon/src/node_communication/mod.rs index d75a44579..adb5940c6 100644 --- a/binaries/daemon/src/node_communication/mod.rs +++ b/binaries/daemon/src/node_communication/mod.rs @@ -14,7 +14,6 @@ use shared_memory_server::{ShmemConf, ShmemServer}; use std::{ collections::{BTreeMap, VecDeque}, mem, - path::Path, sync::Arc, task::Poll, }; @@ -145,6 +144,7 @@ pub async fn spawn_listener_loop( } #[cfg(unix)] LocalCommunicationConfig::UnixDomain => { + use std::path::Path; let tmpfile_dir = Path::new("/tmp"); let tmpfile_dir = tmpfile_dir.join(dataflow_id.to_string()); if !tmpfile_dir.exists() { From 63f1e11b7a16b6e7ff18392022b6aaddd239dec0 Mon Sep 17 00:00:00 2001 From: XxChang Date: Wed, 17 Jul 2024 22:45:42 +0800 Subject: [PATCH 07/11] fix ci bug --- .github/workflows/ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index b28da864a..2c5d50e74 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -132,7 +132,7 @@ jobs: - name: "Unix Domain example" if: runner.os == 'Linux' run: | - sed -i '1i communication:\n _unstable_local:\n UnixDomain\n' dataflow.yml + sed -i '1i communication:\n _unstable_local:\n UnixDomain\n' examples/rust-dataflow/dataflow.yml cargo run --example rust-dataflow # python examples From 6848fa5413b873ea9d1c96b5a8b324bd0954302d Mon Sep 17 00:00:00 2001 From: XxChang Date: Wed, 17 Jul 2024 23:07:25 +0800 Subject: [PATCH 08/11] fix clippy --- apis/rust/node/src/event_stream/mod.rs | 4 ++-- apis/rust/node/src/node/control_channel.rs | 2 +- apis/rust/node/src/node/drop_stream.rs | 2 +- binaries/daemon/src/node_communication/mod.rs | 5 +++-- 4 files changed, 7 insertions(+), 6 deletions(-) diff --git a/apis/rust/node/src/event_stream/mod.rs b/apis/rust/node/src/event_stream/mod.rs index 104ffe2ae..f7826746e 100644 --- a/apis/rust/node/src/event_stream/mod.rs +++ b/apis/rust/node/src/event_stream/mod.rs @@ -52,7 +52,7 @@ impl EventStream { .wrap_err_with(|| format!("failed to connect event stream for node `{node_id}`"))?, #[cfg(unix)] DaemonCommunication::UnixDomain { socket_addr } => { - DaemonChannel::new_unix_socket(&socket_addr).wrap_err_with(|| { + DaemonChannel::new_unix_socket(socket_addr).wrap_err_with(|| { format!("failed to connect event stream for node `{node_id}`") })? } @@ -71,7 +71,7 @@ impl EventStream { })?, #[cfg(unix)] DaemonCommunication::UnixDomain { socket_addr } => { - DaemonChannel::new_unix_socket(&socket_addr).wrap_err_with(|| { + DaemonChannel::new_unix_socket(socket_addr).wrap_err_with(|| { format!("failed to connect event close channel for node `{node_id}`") })? } diff --git a/apis/rust/node/src/node/control_channel.rs b/apis/rust/node/src/node/control_channel.rs index d9c4ecb60..9f137b89d 100644 --- a/apis/rust/node/src/node/control_channel.rs +++ b/apis/rust/node/src/node/control_channel.rs @@ -31,7 +31,7 @@ impl ControlChannel { .wrap_err("failed to connect control channel")?, #[cfg(unix)] DaemonCommunication::UnixDomain { socket_addr } => { - DaemonChannel::new_unix_socket(&socket_addr) + DaemonChannel::new_unix_socket(socket_addr) .wrap_err("failed to connect control channel")? } }; diff --git a/apis/rust/node/src/node/drop_stream.rs b/apis/rust/node/src/node/drop_stream.rs index cbf5d443b..6e563dd45 100644 --- a/apis/rust/node/src/node/drop_stream.rs +++ b/apis/rust/node/src/node/drop_stream.rs @@ -38,7 +38,7 @@ impl DropStream { .wrap_err_with(|| format!("failed to connect drop stream for node `{node_id}`"))?, #[cfg(unix)] DaemonCommunication::UnixDomain { socket_addr } => { - DaemonChannel::new_unix_socket(&socket_addr).wrap_err_with(|| { + DaemonChannel::new_unix_socket(socket_addr).wrap_err_with(|| { format!("failed to connect drop stream for node `{node_id}`") })? } diff --git a/binaries/daemon/src/node_communication/mod.rs b/binaries/daemon/src/node_communication/mod.rs index adb5940c6..c3fb1b96f 100644 --- a/binaries/daemon/src/node_communication/mod.rs +++ b/binaries/daemon/src/node_communication/mod.rs @@ -8,7 +8,7 @@ use dora_core::{ message::uhlc, topics::LOCALHOST, }; -use eyre::{eyre, Context, ContextCompat}; +use eyre::{eyre, Context}; use futures::{future, task, Future}; use shared_memory_server::{ShmemConf, ShmemServer}; use std::{ @@ -150,7 +150,7 @@ pub async fn spawn_listener_loop( if !tmpfile_dir.exists() { std::fs::create_dir_all(&tmpfile_dir).context("could not create tmp dir")?; } - let socket_file = tmpfile_dir.join(format!("{}.sock", node_id.to_string())); + let socket_file = tmpfile_dir.join(format!("{}.sock", node_id)); let socket = match UnixListener::bind(&socket_file) { Ok(socket) => socket, Err(err) => { @@ -166,6 +166,7 @@ pub async fn spawn_listener_loop( tracing::debug!("event listener loop finished for `{event_loop_node_id}`"); }); + use eyre::ContextCompat; Ok(DaemonCommunication::UnixDomain { socket_addr: socket_file .to_str() From 442f650e1f9afd9cd274a6dbae50aa5bd32a023a Mon Sep 17 00:00:00 2001 From: XxChang Date: Wed, 17 Jul 2024 23:28:48 +0800 Subject: [PATCH 09/11] minor --- .github/workflows/ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 2c5d50e74..0f5debc1c 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -129,7 +129,7 @@ jobs: if: runner.os == 'Linux' timeout-minutes: 30 run: cargo run --example cmake-dataflow - - name: "Unix Domain example" + - name: "Unix Domain Socket example" if: runner.os == 'Linux' run: | sed -i '1i communication:\n _unstable_local:\n UnixDomain\n' examples/rust-dataflow/dataflow.yml From e1db10ac7f7c6fe46a697bc35fe4f3f24bc5138e Mon Sep 17 00:00:00 2001 From: XxChang Date: Fri, 19 Jul 2024 21:15:00 +0800 Subject: [PATCH 10/11] rename method --- apis/rust/node/src/daemon_connection/mod.rs | 2 +- apis/rust/node/src/daemon_connection/unix_domain.rs | 8 ++++---- apis/rust/node/src/event_stream/mod.rs | 8 ++++---- apis/rust/node/src/node/control_channel.rs | 4 ++-- apis/rust/node/src/node/drop_stream.rs | 4 ++-- binaries/daemon/src/coordinator.rs | 10 +++++----- binaries/daemon/src/inter_daemon.rs | 6 +++--- binaries/daemon/src/lib.rs | 10 +++++----- binaries/daemon/src/local_listener.rs | 6 +++--- binaries/daemon/src/node_communication/mod.rs | 10 +++++----- binaries/daemon/src/node_communication/tcp.rs | 6 +++--- binaries/daemon/src/node_communication/unix_domain.rs | 6 +++--- binaries/daemon/src/pending.rs | 4 ++-- .../src/{tcp_utils.rs => socket_stream_utils.rs} | 4 ++-- libraries/core/src/config.rs | 1 - libraries/core/src/daemon_messages.rs | 2 +- 16 files changed, 45 insertions(+), 46 deletions(-) rename binaries/daemon/src/{tcp_utils.rs => socket_stream_utils.rs} (80%) diff --git a/apis/rust/node/src/daemon_connection/mod.rs b/apis/rust/node/src/daemon_connection/mod.rs index ca2a39796..d21607f13 100644 --- a/apis/rust/node/src/daemon_connection/mod.rs +++ b/apis/rust/node/src/daemon_connection/mod.rs @@ -46,7 +46,7 @@ impl DaemonChannel { #[cfg(unix)] #[tracing::instrument(level = "trace")] - pub fn new_unix_socket(path: &str) -> eyre::Result { + pub fn new_unix_socket(path: &std::path::PathBuf) -> eyre::Result { let stream = UnixStream::connect(path).wrap_err("failed to open Unix socket")?; Ok(DaemonChannel::UnixDomain(stream)) } diff --git a/apis/rust/node/src/daemon_connection/unix_domain.rs b/apis/rust/node/src/daemon_connection/unix_domain.rs index 29d6f3259..dfcb17a20 100644 --- a/apis/rust/node/src/daemon_connection/unix_domain.rs +++ b/apis/rust/node/src/daemon_connection/unix_domain.rs @@ -31,7 +31,7 @@ fn send_message( message: &Timestamped, ) -> eyre::Result<()> { let serialized = bincode::serialize(&message).wrap_err("failed to serialize DaemonRequest")?; - tcp_send(connection, &serialized).wrap_err("failed to send DaemonRequest")?; + stream_send(connection, &serialized).wrap_err("failed to send DaemonRequest")?; Ok(()) } @@ -39,7 +39,7 @@ fn receive_reply( connection: &mut UnixStream, serializer: Serializer, ) -> eyre::Result> { - let raw = match tcp_receive(connection) { + let raw = match stream_receive(connection) { Ok(raw) => raw, Err(err) => match err.kind() { std::io::ErrorKind::UnexpectedEof | std::io::ErrorKind::ConnectionAborted => { @@ -64,7 +64,7 @@ fn receive_reply( } } -fn tcp_send(connection: &mut (impl Write + Unpin), message: &[u8]) -> std::io::Result<()> { +fn stream_send(connection: &mut (impl Write + Unpin), message: &[u8]) -> std::io::Result<()> { let len_raw = (message.len() as u64).to_le_bytes(); connection.write_all(&len_raw)?; connection.write_all(message)?; @@ -72,7 +72,7 @@ fn tcp_send(connection: &mut (impl Write + Unpin), message: &[u8]) -> std::io::R Ok(()) } -fn tcp_receive(connection: &mut (impl Read + Unpin)) -> std::io::Result> { +fn stream_receive(connection: &mut (impl Read + Unpin)) -> std::io::Result> { let reply_len = { let mut raw = [0; 8]; connection.read_exact(&mut raw)?; diff --git a/apis/rust/node/src/event_stream/mod.rs b/apis/rust/node/src/event_stream/mod.rs index f7826746e..2a11503b7 100644 --- a/apis/rust/node/src/event_stream/mod.rs +++ b/apis/rust/node/src/event_stream/mod.rs @@ -51,8 +51,8 @@ impl EventStream { DaemonCommunication::Tcp { socket_addr } => DaemonChannel::new_tcp(*socket_addr) .wrap_err_with(|| format!("failed to connect event stream for node `{node_id}`"))?, #[cfg(unix)] - DaemonCommunication::UnixDomain { socket_addr } => { - DaemonChannel::new_unix_socket(socket_addr).wrap_err_with(|| { + DaemonCommunication::UnixDomain { socket_file } => { + DaemonChannel::new_unix_socket(socket_file).wrap_err_with(|| { format!("failed to connect event stream for node `{node_id}`") })? } @@ -70,8 +70,8 @@ impl EventStream { format!("failed to connect event close channel for node `{node_id}`") })?, #[cfg(unix)] - DaemonCommunication::UnixDomain { socket_addr } => { - DaemonChannel::new_unix_socket(socket_addr).wrap_err_with(|| { + DaemonCommunication::UnixDomain { socket_file } => { + DaemonChannel::new_unix_socket(socket_file).wrap_err_with(|| { format!("failed to connect event close channel for node `{node_id}`") })? } diff --git a/apis/rust/node/src/node/control_channel.rs b/apis/rust/node/src/node/control_channel.rs index 9f137b89d..28826e950 100644 --- a/apis/rust/node/src/node/control_channel.rs +++ b/apis/rust/node/src/node/control_channel.rs @@ -30,8 +30,8 @@ impl ControlChannel { DaemonCommunication::Tcp { socket_addr } => DaemonChannel::new_tcp(*socket_addr) .wrap_err("failed to connect control channel")?, #[cfg(unix)] - DaemonCommunication::UnixDomain { socket_addr } => { - DaemonChannel::new_unix_socket(socket_addr) + DaemonCommunication::UnixDomain { socket_file } => { + DaemonChannel::new_unix_socket(socket_file) .wrap_err("failed to connect control channel")? } }; diff --git a/apis/rust/node/src/node/drop_stream.rs b/apis/rust/node/src/node/drop_stream.rs index 6e563dd45..80947ac94 100644 --- a/apis/rust/node/src/node/drop_stream.rs +++ b/apis/rust/node/src/node/drop_stream.rs @@ -37,8 +37,8 @@ impl DropStream { DaemonCommunication::Tcp { socket_addr } => DaemonChannel::new_tcp(*socket_addr) .wrap_err_with(|| format!("failed to connect drop stream for node `{node_id}`"))?, #[cfg(unix)] - DaemonCommunication::UnixDomain { socket_addr } => { - DaemonChannel::new_unix_socket(socket_addr).wrap_err_with(|| { + DaemonCommunication::UnixDomain { socket_file } => { + DaemonChannel::new_unix_socket(socket_file).wrap_err_with(|| { format!("failed to connect drop stream for node `{node_id}`") })? } diff --git a/binaries/daemon/src/coordinator.rs b/binaries/daemon/src/coordinator.rs index d2f86b3cc..895bf49b7 100644 --- a/binaries/daemon/src/coordinator.rs +++ b/binaries/daemon/src/coordinator.rs @@ -1,5 +1,5 @@ use crate::{ - tcp_utils::{tcp_receive, tcp_send}, + socket_stream_utils::{socket_stream_receive, socket_stream_send}, DaemonCoordinatorEvent, }; use dora_core::{ @@ -41,10 +41,10 @@ pub async fn register( }, timestamp: clock.new_timestamp(), })?; - tcp_send(&mut stream, ®ister) + socket_stream_send(&mut stream, ®ister) .await .wrap_err("failed to send register request to dora-coordinator")?; - let reply_raw = tcp_receive(&mut stream) + let reply_raw = socket_stream_receive(&mut stream) .await .wrap_err("failed to register reply from dora-coordinator")?; let result: Timestamped = serde_json::from_slice(&reply_raw) @@ -59,7 +59,7 @@ pub async fn register( let (tx, rx) = mpsc::channel(1); tokio::spawn(async move { loop { - let event = match tcp_receive(&mut stream).await { + let event = match socket_stream_receive(&mut stream).await { Ok(raw) => match serde_json::from_slice(&raw) { Ok(event) => event, Err(err) => { @@ -109,7 +109,7 @@ pub async fn register( continue; } }; - if let Err(err) = tcp_send(&mut stream, &serialized).await { + if let Err(err) = socket_stream_send(&mut stream, &serialized).await { tracing::warn!("failed to send reply to coordinator: {err}"); continue; }; diff --git a/binaries/daemon/src/inter_daemon.rs b/binaries/daemon/src/inter_daemon.rs index 7eb4b9485..21cce12a5 100644 --- a/binaries/daemon/src/inter_daemon.rs +++ b/binaries/daemon/src/inter_daemon.rs @@ -1,4 +1,4 @@ -use crate::tcp_utils::{tcp_receive, tcp_send}; +use crate::socket_stream_utils::{socket_stream_receive, socket_stream_send}; use dora_core::daemon_messages::{InterDaemonEvent, Timestamped}; use eyre::{Context, ContextCompat}; use std::{collections::BTreeMap, io::ErrorKind, net::SocketAddr}; @@ -52,7 +52,7 @@ pub async fn send_inter_daemon_event( .connect() .await .wrap_err_with(|| format!("failed to connect to machine `{target_machine}`"))?; - tcp_send(connection, &message) + socket_stream_send(connection, &message) .await .wrap_err_with(|| format!("failed to send event to machine `{target_machine}`"))?; } @@ -131,7 +131,7 @@ async fn handle_connection_loop( async fn receive_message( connection: &mut TcpStream, ) -> eyre::Result>> { - let raw = match tcp_receive(connection).await { + let raw = match socket_stream_receive(connection).await { Ok(raw) => raw, Err(err) => match err.kind() { ErrorKind::UnexpectedEof diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index 54d7284e7..711872d1c 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -39,7 +39,7 @@ use std::{ time::Duration, }; use sysinfo::Pid; -use tcp_utils::tcp_send; +use socket_stream_utils::socket_stream_send; use tokio::fs::File; use tokio::io::AsyncReadExt; use tokio::net::TcpStream; @@ -57,7 +57,7 @@ mod log; mod node_communication; mod pending; mod spawn; -mod tcp_utils; +mod socket_stream_utils; #[cfg(feature = "telemetry")] use dora_tracing::telemetry::serialize_context; @@ -314,7 +314,7 @@ impl Daemon { }, timestamp: self.clock.new_timestamp(), })?; - tcp_send(connection, &msg) + socket_stream_send(connection, &msg) .await .wrap_err("failed to send watchdog message to dora-coordinator")?; @@ -345,7 +345,7 @@ impl Daemon { }, timestamp: self.clock.new_timestamp(), })?; - tcp_send(connection, &msg) + socket_stream_send(connection, &msg) .await .wrap_err("failed to send watchdog message to dora-coordinator")?; @@ -1103,7 +1103,7 @@ impl Daemon { }, timestamp: self.clock.new_timestamp(), })?; - tcp_send(connection, &msg) + socket_stream_send(connection, &msg) .await .wrap_err("failed to report dataflow finish to dora-coordinator")?; } diff --git a/binaries/daemon/src/local_listener.rs b/binaries/daemon/src/local_listener.rs index dbffe39ee..d9ff858a2 100644 --- a/binaries/daemon/src/local_listener.rs +++ b/binaries/daemon/src/local_listener.rs @@ -1,4 +1,4 @@ -use crate::tcp_utils::{tcp_receive, tcp_send}; +use crate::socket_stream_utils::{socket_stream_receive, socket_stream_send}; use dora_core::daemon_messages::{DaemonReply, DaemonRequest, DynamicNodeEvent, Timestamped}; use eyre::Context; use std::{io::ErrorKind, net::SocketAddr}; @@ -99,7 +99,7 @@ async fn handle_connection_loop( continue; } }; - if let Err(err) = tcp_send(&mut connection, &serialized).await { + if let Err(err) = socket_stream_send(&mut connection, &serialized).await { tracing::warn!("failed to send reply: {err}"); continue; }; @@ -120,7 +120,7 @@ async fn handle_connection_loop( async fn receive_message( connection: &mut TcpStream, ) -> eyre::Result>> { - let raw = match tcp_receive(connection).await { + let raw = match socket_stream_receive(connection).await { Ok(raw) => raw, Err(err) => match err.kind() { ErrorKind::UnexpectedEof diff --git a/binaries/daemon/src/node_communication/mod.rs b/binaries/daemon/src/node_communication/mod.rs index c3fb1b96f..f132072db 100644 --- a/binaries/daemon/src/node_communication/mod.rs +++ b/binaries/daemon/src/node_communication/mod.rs @@ -166,14 +166,14 @@ pub async fn spawn_listener_loop( tracing::debug!("event listener loop finished for `{event_loop_node_id}`"); }); - use eyre::ContextCompat; Ok(DaemonCommunication::UnixDomain { - socket_addr: socket_file - .to_str() - .wrap_err("get unix domain file addr failed")? - .to_string(), + socket_file }) } + #[cfg(not(unix))] + LocalCommunicationConfig::UnixDomain => { + eyre::bail!("Communication via UNIX domain sockets is only supported on UNIX systems") + } } } diff --git a/binaries/daemon/src/node_communication/tcp.rs b/binaries/daemon/src/node_communication/tcp.rs index c4d6122cc..2d787d8da 100644 --- a/binaries/daemon/src/node_communication/tcp.rs +++ b/binaries/daemon/src/node_communication/tcp.rs @@ -2,7 +2,7 @@ use std::{collections::BTreeMap, io::ErrorKind, sync::Arc}; use super::{Connection, Listener}; use crate::{ - tcp_utils::{tcp_receive, tcp_send}, + socket_stream_utils::{socket_stream_receive, socket_stream_send}, Event, }; use dora_core::{ @@ -63,7 +63,7 @@ struct TcpConnection(TcpStream); #[async_trait::async_trait] impl Connection for TcpConnection { async fn receive_message(&mut self) -> eyre::Result>> { - let raw = match tcp_receive(&mut self.0).await { + let raw = match socket_stream_receive(&mut self.0).await { Ok(raw) => raw, Err(err) => match err.kind() { ErrorKind::UnexpectedEof @@ -87,7 +87,7 @@ impl Connection for TcpConnection { } let serialized = bincode::serialize(&message).wrap_err("failed to serialize DaemonReply")?; - tcp_send(&mut self.0, &serialized) + socket_stream_send(&mut self.0, &serialized) .await .wrap_err("failed to send DaemonReply")?; Ok(()) diff --git a/binaries/daemon/src/node_communication/unix_domain.rs b/binaries/daemon/src/node_communication/unix_domain.rs index 7713eccc4..1268821e9 100644 --- a/binaries/daemon/src/node_communication/unix_domain.rs +++ b/binaries/daemon/src/node_communication/unix_domain.rs @@ -12,7 +12,7 @@ use tokio::{ }; use crate::{ - tcp_utils::{tcp_receive, tcp_send}, + socket_stream_utils::{socket_stream_receive, socket_stream_send}, Event, }; @@ -61,7 +61,7 @@ struct UnixConnection(UnixStream); #[async_trait::async_trait] impl Connection for UnixConnection { async fn receive_message(&mut self) -> eyre::Result>> { - let raw = match tcp_receive(&mut self.0).await { + let raw = match socket_stream_receive(&mut self.0).await { Ok(raw) => raw, Err(err) => match err.kind() { ErrorKind::UnexpectedEof @@ -85,7 +85,7 @@ impl Connection for UnixConnection { } let serialized = bincode::serialize(&message).wrap_err("failed to serialize DaemonReply")?; - tcp_send(&mut self.0, &serialized) + socket_stream_send(&mut self.0, &serialized) .await .wrap_err("failed to send DaemonReply")?; Ok(()) diff --git a/binaries/daemon/src/pending.rs b/binaries/daemon/src/pending.rs index 0e42dca38..e4037fdc0 100644 --- a/binaries/daemon/src/pending.rs +++ b/binaries/daemon/src/pending.rs @@ -9,7 +9,7 @@ use dora_core::{ use eyre::{bail, Context}; use tokio::{net::TcpStream, sync::oneshot}; -use crate::{tcp_utils::tcp_send, CascadingErrorCauses}; +use crate::{socket_stream_utils::socket_stream_send, CascadingErrorCauses}; pub struct PendingNodes { dataflow_id: DataflowId, @@ -210,7 +210,7 @@ impl PendingNodes { }, timestamp, })?; - tcp_send(connection, &msg) + socket_stream_send(connection, &msg) .await .wrap_err("failed to send AllNodesReady message to dora-coordinator")?; Ok(()) diff --git a/binaries/daemon/src/tcp_utils.rs b/binaries/daemon/src/socket_stream_utils.rs similarity index 80% rename from binaries/daemon/src/tcp_utils.rs rename to binaries/daemon/src/socket_stream_utils.rs index db327c584..3f234c428 100644 --- a/binaries/daemon/src/tcp_utils.rs +++ b/binaries/daemon/src/socket_stream_utils.rs @@ -1,6 +1,6 @@ use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; -pub async fn tcp_send( +pub async fn socket_stream_send( connection: &mut (impl AsyncWrite + Unpin), message: &[u8], ) -> std::io::Result<()> { @@ -11,7 +11,7 @@ pub async fn tcp_send( Ok(()) } -pub async fn tcp_receive(connection: &mut (impl AsyncRead + Unpin)) -> std::io::Result> { +pub async fn socket_stream_receive(connection: &mut (impl AsyncRead + Unpin)) -> std::io::Result> { let reply_len = { let mut raw = [0; 8]; connection.read_exact(&mut raw).await?; diff --git a/libraries/core/src/config.rs b/libraries/core/src/config.rs index a6118c8ae..0fcf11f2d 100644 --- a/libraries/core/src/config.rs +++ b/libraries/core/src/config.rs @@ -348,7 +348,6 @@ pub struct CommunicationConfig { pub enum LocalCommunicationConfig { Tcp, Shmem, - #[cfg(unix)] UnixDomain, } diff --git a/libraries/core/src/daemon_messages.rs b/libraries/core/src/daemon_messages.rs index 9e61f5eba..0bda5b429 100644 --- a/libraries/core/src/daemon_messages.rs +++ b/libraries/core/src/daemon_messages.rs @@ -37,7 +37,7 @@ pub enum DaemonCommunication { }, #[cfg(unix)] UnixDomain { - socket_addr: String, + socket_file: PathBuf, }, } From 3c4b96ff46625806322085ac00826dfd7ec8a7b0 Mon Sep 17 00:00:00 2001 From: XxChang Date: Fri, 19 Jul 2024 21:26:38 +0800 Subject: [PATCH 11/11] fix ci --- .github/workflows/ci.yml | 4 +--- binaries/daemon/src/lib.rs | 4 ++-- binaries/daemon/src/node_communication/mod.rs | 4 +--- binaries/daemon/src/socket_stream_utils.rs | 4 +++- examples/rust-dataflow/run.rs | 8 +++++++- 5 files changed, 14 insertions(+), 10 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 0f5debc1c..e2f45f192 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -131,9 +131,7 @@ jobs: run: cargo run --example cmake-dataflow - name: "Unix Domain Socket example" if: runner.os == 'Linux' - run: | - sed -i '1i communication:\n _unstable_local:\n UnixDomain\n' examples/rust-dataflow/dataflow.yml - cargo run --example rust-dataflow + run: cargo run --example rust-dataflow -- dataflow_socket.yml # python examples - uses: actions/setup-python@v2 diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index 711872d1c..85cff2170 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -30,6 +30,7 @@ use inter_daemon::InterDaemonConnection; use local_listener::DynamicNodeEventWrapper; use pending::PendingNodes; use shared_memory_server::ShmemConf; +use socket_stream_utils::socket_stream_send; use std::sync::Arc; use std::time::Instant; use std::{ @@ -39,7 +40,6 @@ use std::{ time::Duration, }; use sysinfo::Pid; -use socket_stream_utils::socket_stream_send; use tokio::fs::File; use tokio::io::AsyncReadExt; use tokio::net::TcpStream; @@ -56,8 +56,8 @@ mod local_listener; mod log; mod node_communication; mod pending; -mod spawn; mod socket_stream_utils; +mod spawn; #[cfg(feature = "telemetry")] use dora_tracing::telemetry::serialize_context; diff --git a/binaries/daemon/src/node_communication/mod.rs b/binaries/daemon/src/node_communication/mod.rs index f132072db..c05078a32 100644 --- a/binaries/daemon/src/node_communication/mod.rs +++ b/binaries/daemon/src/node_communication/mod.rs @@ -166,9 +166,7 @@ pub async fn spawn_listener_loop( tracing::debug!("event listener loop finished for `{event_loop_node_id}`"); }); - Ok(DaemonCommunication::UnixDomain { - socket_file - }) + Ok(DaemonCommunication::UnixDomain { socket_file }) } #[cfg(not(unix))] LocalCommunicationConfig::UnixDomain => { diff --git a/binaries/daemon/src/socket_stream_utils.rs b/binaries/daemon/src/socket_stream_utils.rs index 3f234c428..b2e9e9034 100644 --- a/binaries/daemon/src/socket_stream_utils.rs +++ b/binaries/daemon/src/socket_stream_utils.rs @@ -11,7 +11,9 @@ pub async fn socket_stream_send( Ok(()) } -pub async fn socket_stream_receive(connection: &mut (impl AsyncRead + Unpin)) -> std::io::Result> { +pub async fn socket_stream_receive( + connection: &mut (impl AsyncRead + Unpin), +) -> std::io::Result> { let reply_len = { let mut raw = [0; 8]; connection.read_exact(&mut raw).await?; diff --git a/examples/rust-dataflow/run.rs b/examples/rust-dataflow/run.rs index f5e035a50..213b65a0a 100644 --- a/examples/rust-dataflow/run.rs +++ b/examples/rust-dataflow/run.rs @@ -10,7 +10,13 @@ async fn main() -> eyre::Result<()> { std::env::set_current_dir(root.join(file!()).parent().unwrap()) .wrap_err("failed to set working dir")?; - let dataflow = Path::new("dataflow.yml"); + let args: Vec = std::env::args().collect(); + let dataflow = if args.len() > 1 { + Path::new(&args[1]) + } else { + Path::new("dataflow.yml") + }; + build_dataflow(dataflow).await?; run_dataflow(dataflow).await?;