Skip to content

Commit

Permalink
Merge pull request #429 from dora-rs/out_dir
Browse files Browse the repository at this point in the history
Send runs artefacts into a dedicated `out` folder
  • Loading branch information
phil-opp authored Feb 29, 2024
2 parents a6a5181 + f0db125 commit 14bb4fb
Show file tree
Hide file tree
Showing 13 changed files with 112 additions and 67 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Generated by Cargo
# will have compiled files and executables
/target/

examples/**/*.txt
# These are backup files generated by rustfmt
**/*.rs.bk

Expand Down
15 changes: 11 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 10 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,15 @@ members = [
"binaries/coordinator",
"binaries/daemon",
"binaries/runtime",
"examples/rust-dataflow/*",
"examples/rust-ros2-dataflow/*",
"examples/benchmark/*",
"examples/multiple-daemons/*",
"examples/rust-dataflow/node",
"examples/rust-dataflow/operator",
"examples/rust-dataflow/sink",
"examples/rust-ros2-dataflow/node",
"examples/benchmark/node",
"examples/benchmark/sink",
"examples/multiple-daemons/node",
"examples/multiple-daemons/operator",
"examples/multiple-daemons/sink",
"libraries/arrow-convert",
"libraries/communication-layer/*",
"libraries/core",
Expand Down Expand Up @@ -87,7 +92,7 @@ dora-tracing = { workspace = true }
dora-download = { workspace = true }
dunce = "1.0.2"
serde_yaml = "0.8.23"
uuid = { version = "1.2.1", features = ["v4", "serde"] }
uuid = { version = "1.7", features = ["v7", "serde"] }
tracing = "0.1.36"
futures = "0.3.25"
tokio-stream = "0.1.11"
Expand Down
8 changes: 7 additions & 1 deletion apis/rust/node/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use aligned_vec::{AVec, ConstAlign};
use arrow::array::Array;
use dora_core::{
config::{DataId, NodeId, NodeRunConfig},
daemon_messages::{DataMessage, DropToken, NodeConfig},
daemon_messages::{DataMessage, DataflowId, DropToken, NodeConfig},
descriptor::Descriptor,
message::{uhlc, ArrowTypeInfo, Metadata, MetadataParameters},
};
Expand All @@ -33,6 +33,7 @@ pub const ZERO_COPY_THRESHOLD: usize = 4096;

pub struct DoraNode {
id: NodeId,
dataflow_id: DataflowId,
node_config: NodeRunConfig,
control_channel: ControlChannel,
clock: Arc<uhlc::HLC>,
Expand Down Expand Up @@ -89,6 +90,7 @@ impl DoraNode {

let node = Self {
id: node_id,
dataflow_id: dataflow_id,

Check warning on line 93 in apis/rust/node/src/node/mod.rs

View workflow job for this annotation

GitHub Actions / Clippy

redundant field names in struct initialization
node_config: run_config,
control_channel,
clock,
Expand Down Expand Up @@ -243,6 +245,10 @@ impl DoraNode {
&self.id
}

pub fn dataflow_id(&self) -> &DataflowId {
&self.dataflow_id
}

pub fn node_config(&self) -> &NodeRunConfig {
&self.node_config
}
Expand Down
2 changes: 1 addition & 1 deletion binaries/cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ serde_yaml = "0.9.11"
webbrowser = "0.8.3"
serde_json = "1.0.86"
termcolor = "1.1.3"
uuid = { version = "1.2.1", features = ["v4", "serde"] }
uuid = { version = "1.7", features = ["v7", "serde"] }
inquire = "0.5.2"
communication-layer-request-reply = { workspace = true }
notify = "5.1.0"
Expand Down
4 changes: 2 additions & 2 deletions binaries/coordinator/src/run/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use std::{
collections::{BTreeMap, BTreeSet, HashMap},
path::PathBuf,
};
use uuid::Uuid;
use uuid::{NoContext, Timestamp, Uuid};

#[tracing::instrument(skip(daemon_connections, clock))]
pub(super) async fn spawn_dataflow(
Expand All @@ -27,7 +27,7 @@ pub(super) async fn spawn_dataflow(
dataflow.check(&working_dir)?;

let nodes = dataflow.resolve_aliases_and_set_defaults();
let uuid = Uuid::new_v4();
let uuid = Uuid::new_v7(Timestamp::now(NoContext));

let machines: BTreeSet<_> = nodes.iter().map(|n| n.deploy.machine.clone()).collect();
let machine_listen_ports = machines
Expand Down
2 changes: 1 addition & 1 deletion binaries/daemon/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ dora-tracing = { workspace = true, optional = true }
dora-arrow-convert = { workspace = true }
dora-node-api = { workspace = true }
serde_yaml = "0.8.23"
uuid = { version = "1.1.2", features = ["v4"] }
uuid = { version = "1.7", features = ["v7"] }
futures = "0.3.25"
shared-memory-server = { workspace = true }
bincode = "1.3.3"
Expand Down
66 changes: 42 additions & 24 deletions binaries/daemon/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use futures_concurrency::stream::Merge;
use inter_daemon::InterDaemonConnection;
use pending::PendingNodes;
use shared_memory_server::ShmemConf;
use std::env::temp_dir;
use std::sync::Arc;
use std::time::Instant;
use std::{
Expand All @@ -41,7 +40,7 @@ use tokio::sync::oneshot::Sender;
use tokio::sync::{mpsc, oneshot};
use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt};
use tracing::error;
use uuid::Uuid;
use uuid::{NoContext, Timestamp, Uuid};

mod coordinator;
mod inter_daemon;
Expand All @@ -60,6 +59,7 @@ use crate::pending::DataflowStatus;

pub struct Daemon {
running: HashMap<DataflowId, RunningDataflow>,
working_dir: HashMap<DataflowId, PathBuf>,

events_tx: mpsc::Sender<Timestamped<Event>>,

Expand Down Expand Up @@ -130,7 +130,7 @@ impl Daemon {
let nodes = descriptor.resolve_aliases_and_set_defaults();

let spawn_command = SpawnDataflowNodes {
dataflow_id: Uuid::new_v4(),
dataflow_id: Uuid::new_v7(Timestamp::now(NoContext)),
working_dir,
nodes,
machine_listen_ports: BTreeMap::new(),
Expand Down Expand Up @@ -213,6 +213,7 @@ impl Daemon {
let (dora_events_tx, dora_events_rx) = mpsc::channel(5);
let daemon = Self {
running: HashMap::new(),
working_dir: HashMap::new(),
events_tx: dora_events_tx,
coordinator_connection,
last_coordinator_heartbeat: Instant::now(),
Expand Down Expand Up @@ -371,29 +372,43 @@ impl Daemon {
dataflow_id,
node_id,
} => {
tokio::spawn(async move {
let logs = async {
let log_dir = temp_dir();

let mut file =
File::open(log_dir.join(log::log_path(&dataflow_id, &node_id)))
.await
.wrap_err("Could not open log file")?;

let mut contents = vec![];
file.read_to_end(&mut contents)
match self.working_dir.get(&dataflow_id) {
Some(working_dir) => {
let working_dir = working_dir.clone();
tokio::spawn(async move {
let logs = async {
let mut file =
File::open(log::log_path(&working_dir, &dataflow_id, &node_id))
.await
.wrap_err(format!(
"Could not open log file: {:#?}",
log::log_path(&working_dir, &dataflow_id, &node_id)
))?;

let mut contents = vec![];
file.read_to_end(&mut contents)
.await
.wrap_err("Could not read content of log file")?;
Result::<Vec<u8>, eyre::Report>::Ok(contents)
}
.await
.wrap_err("Could not read content of log file")?;
Result::<Vec<u8>, eyre::Report>::Ok(contents)
.map_err(|err| format!("{err:?}"));
let _ = reply_tx
.send(Some(DaemonCoordinatorReply::Logs(logs)))
.map_err(|_| {
error!("could not send logs reply from daemon to coordinator")
});
});
}
.await
.map_err(|err| format!("{err:?}"));
let _ = reply_tx
.send(Some(DaemonCoordinatorReply::Logs(logs)))
.map_err(|_| {
error!("could not send logs reply from daemon to coordinator")
None => {
tracing::warn!("received Logs for unknown dataflow (ID `{dataflow_id}`)");
let _ = reply_tx.send(None).map_err(|_| {
error!(
"could not send `AllNodesReady` reply from daemon to coordinator"
)
});
});
}
}
RunStatus::Continue
}
DaemonCoordinatorEvent::ReloadDataflow {
Expand Down Expand Up @@ -517,7 +532,10 @@ impl Daemon {
) -> eyre::Result<()> {
let dataflow = RunningDataflow::new(dataflow_id, self.machine_id.clone());
let dataflow = match self.running.entry(dataflow_id) {
std::collections::hash_map::Entry::Vacant(entry) => entry.insert(dataflow),
std::collections::hash_map::Entry::Vacant(entry) => {
self.working_dir.insert(dataflow_id, working_dir.clone());
entry.insert(dataflow)
}
std::collections::hash_map::Entry::Occupied(_) => {
bail!("there is already a running dataflow with ID `{dataflow_id}`")
}
Expand Down
7 changes: 4 additions & 3 deletions binaries/daemon/src/log.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use std::path::PathBuf;
use std::path::{Path, PathBuf};

use dora_core::config::NodeId;
use uuid::Uuid;

pub fn log_path(dataflow_id: &Uuid, node_id: &NodeId) -> PathBuf {
PathBuf::from(format!("{dataflow_id}-{node_id}.txt"))
pub fn log_path(working_dir: &Path, dataflow_id: &Uuid, node_id: &NodeId) -> PathBuf {
let dataflow_dir = working_dir.join("out").join(dataflow_id.to_string());
dataflow_dir.join(format!("log_{node_id}.txt"))
}
17 changes: 9 additions & 8 deletions binaries/daemon/src/spawn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ use dora_node_api::{
};
use eyre::WrapErr;
use std::{
env::{consts::EXE_EXTENSION, temp_dir},
path::Path,
env::consts::EXE_EXTENSION,
path::{Path, PathBuf},
process::Stdio,
sync::Arc,
};
Expand Down Expand Up @@ -217,13 +217,14 @@ pub async fn spawn_node(
}
};

let log_dir = temp_dir();

let dataflow_dir = PathBuf::from(working_dir.join("out").join(dataflow_id.to_string()));
if !dataflow_dir.exists() {
std::fs::create_dir_all(&dataflow_dir).context("could not create dataflow_dir")?;
}
let (tx, mut rx) = mpsc::channel(10);
let mut file =
File::create(&log_dir.join(log::log_path(&dataflow_id, &node_id).with_extension("txt")))
.await
.expect("Failed to create log file");
let mut file = File::create(log::log_path(working_dir, &dataflow_id, &node_id))
.await
.expect("Failed to create log file");
let mut child_stdout =
tokio::io::BufReader::new(child.stdout.take().expect("failed to take stdout"));

Expand Down
2 changes: 1 addition & 1 deletion libraries/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ serde = { version = "1.0.136", features = ["derive"] }
serde_yaml = "0.9.11"
once_cell = "1.13.0"
which = "5.0.0"
uuid = { version = "1.2.1", features = ["serde"] }
uuid = { version = "1.7", features = ["serde", "v7"] }
dora-message = { workspace = true }
tracing = "0.1"
serde-with-expand-env = "1.1.0"
Expand Down
4 changes: 2 additions & 2 deletions libraries/core/src/daemon_messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::{
};
use aligned_vec::{AVec, ConstAlign};
use dora_message::{uhlc, Metadata};
use uuid::Uuid;
use uuid::{NoContext, Timestamp, Uuid};

#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct NodeConfig {
Expand Down Expand Up @@ -178,7 +178,7 @@ pub struct DropToken(Uuid);

impl DropToken {
pub fn generate() -> Self {
Self(Uuid::new_v4())
Self(Uuid::new_v7(Timestamp::now(NoContext)))
}
}

Expand Down
Loading

0 comments on commit 14bb4fb

Please sign in to comment.