Skip to content

Commit

Permalink
Move more types from dora-core to dora-message to avoid dependency (
Browse files Browse the repository at this point in the history
  • Loading branch information
haixuanTao authored Nov 14, 2024
2 parents 2f5945f + 7dc6b1d commit 7b1f757
Show file tree
Hide file tree
Showing 33 changed files with 826 additions and 728 deletions.
8 changes: 6 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions apis/rust/node/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use arrow::array::Array;
use dora_core::{
config::{DataId, NodeId, NodeRunConfig},
descriptor::Descriptor,
metadata::ArrowTypeInfoExt,
topics::{DORA_DAEMON_LOCAL_LISTEN_PORT_DEFAULT, LOCALHOST},
uhlc,
};
Expand Down
2 changes: 1 addition & 1 deletion binaries/cli/src/attach.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use colored::Colorize;
use communication_layer_request_reply::{TcpConnection, TcpRequestReplyConnection};
use dora_core::descriptor::{resolve_path, CoreNodeKind, Descriptor};
use dora_core::descriptor::{resolve_path, CoreNodeKind, Descriptor, DescriptorExt};
use dora_message::cli_to_coordinator::ControlRequest;
use dora_message::common::LogMessage;
use dora_message::coordinator_to_cli::ControlRequestReply;
Expand Down
2 changes: 1 addition & 1 deletion binaries/cli/src/build.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use dora_core::{
config::OperatorId,
descriptor::{Descriptor, SINGLE_OPERATOR_DEFAULT_ID},
descriptor::{Descriptor, DescriptorExt, NodeExt, SINGLE_OPERATOR_DEFAULT_ID},
};
use eyre::{eyre, Context};
use std::{path::Path, process::Command};
Expand Down
2 changes: 1 addition & 1 deletion binaries/cli/src/graph/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{fs::File, io::Write, path::Path};

use dora_core::descriptor::Descriptor;
use dora_core::descriptor::{Descriptor, DescriptorExt};
use eyre::Context;

const MERMAID_TEMPLATE: &str = include_str!("mermaid-template.html");
Expand Down
2 changes: 1 addition & 1 deletion binaries/cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use colored::Colorize;
use communication_layer_request_reply::{RequestReplyLayer, TcpLayer, TcpRequestReplyConnection};
use dora_coordinator::Event;
use dora_core::{
descriptor::{source_is_url, Descriptor},
descriptor::{source_is_url, Descriptor, DescriptorExt},
topics::{
DORA_COORDINATOR_PORT_CONTROL_DEFAULT, DORA_COORDINATOR_PORT_DEFAULT,
DORA_DAEMON_LOCAL_LISTEN_PORT_DEFAULT,
Expand Down
2 changes: 1 addition & 1 deletion binaries/coordinator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use crate::{
pub use control::ControlEvent;
use dora_core::{
config::{NodeId, OperatorId},
descriptor::{Descriptor, ResolvedNode},
uhlc::{self, HLC},
};
use dora_message::{
Expand All @@ -16,6 +15,7 @@ use dora_message::{
},
coordinator_to_daemon::{DaemonCoordinatorEvent, RegisterResult, Timestamped},
daemon_to_coordinator::{DaemonCoordinatorReply, DataflowDaemonResult},
descriptor::{Descriptor, ResolvedNode},
};
use eyre::{bail, eyre, ContextCompat, Result, WrapErr};
use futures::{future::join_all, stream::FuturesUnordered, Future, Stream, StreamExt};
Expand Down
6 changes: 2 additions & 4 deletions binaries/coordinator/src/run/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,11 @@ use crate::{
DaemonConnection,
};

use dora_core::{
descriptor::{Descriptor, ResolvedNode},
uhlc::HLC,
};
use dora_core::{descriptor::DescriptorExt, uhlc::HLC};
use dora_message::{
coordinator_to_daemon::{DaemonCoordinatorEvent, SpawnDataflowNodes, Timestamped},
daemon_to_coordinator::DaemonCoordinatorReply,
descriptor::{Descriptor, ResolvedNode},
};
use eyre::{bail, eyre, ContextCompat, WrapErr};
use std::{
Expand Down
76 changes: 71 additions & 5 deletions binaries/daemon/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@ use aligned_vec::{AVec, ConstAlign};
use coordinator::CoordinatorEvent;
use crossbeam::queue::ArrayQueue;
use dora_core::{
config::{DataId, Input, InputMapping, NodeId, OperatorId},
descriptor::{runtime_node_inputs, CoreNodeKind, Descriptor, ResolvedNode},
config::{DataId, Input, InputMapping, NodeId, NodeRunConfig, OperatorId},
descriptor::{
read_as_descriptor, CoreNodeKind, Descriptor, DescriptorExt, ResolvedNode, RuntimeNode,
DYNAMIC_SOURCE,
},
topics::LOCALHOST,
uhlc::{self, HLC},
};
Expand All @@ -20,7 +23,7 @@ use dora_message::{
node_to_daemon::{DynamicNodeEvent, Timestamped},
DataflowId,
};
use dora_node_api::Parameter;
use dora_node_api::{arrow::datatypes::DataType, Parameter};
use eyre::{bail, eyre, Context, ContextCompat, Result};
use futures::{future, stream, FutureExt, TryFutureExt};
use futures_concurrency::stream::Merge;
Expand Down Expand Up @@ -162,7 +165,7 @@ impl Daemon {
.ok_or_else(|| eyre::eyre!("canonicalized dataflow path has no parent"))?
.to_owned();

let descriptor = Descriptor::read(dataflow_path).await?;
let descriptor = read_as_descriptor(dataflow_path).await?;
descriptor.check(&working_dir)?;
let nodes = descriptor.resolve_aliases_and_set_defaults()?;

Expand Down Expand Up @@ -1565,7 +1568,7 @@ impl RunningDataflow {

let metadata = metadata::Metadata::from_parameters(
hlc.new_timestamp(),
ArrowTypeInfo::empty(),
empty_type_info(),
parameters,
);

Expand Down Expand Up @@ -1672,6 +1675,18 @@ impl RunningDataflow {
}
}

fn empty_type_info() -> ArrowTypeInfo {
ArrowTypeInfo {
data_type: DataType::Null,
len: 0,
null_count: 0,
validity: None,
offset: 0,
buffer_offsets: Vec::new(),
child_data: Vec::new(),
}
}

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct OutputId(NodeId, DataId);
type InputId = (NodeId, DataId);
Expand Down Expand Up @@ -1821,3 +1836,54 @@ impl CascadingErrorCauses {
self.caused_by.entry(affected_node).or_insert(causing_node);
}
}

fn runtime_node_inputs(n: &RuntimeNode) -> BTreeMap<DataId, Input> {
n.operators
.iter()
.flat_map(|operator| {
operator.config.inputs.iter().map(|(input_id, mapping)| {
(
DataId::from(format!("{}/{input_id}", operator.id)),
mapping.clone(),
)
})
})
.collect()
}

fn runtime_node_outputs(n: &RuntimeNode) -> BTreeSet<DataId> {
n.operators
.iter()
.flat_map(|operator| {
operator
.config
.outputs
.iter()
.map(|output_id| DataId::from(format!("{}/{output_id}", operator.id)))
})
.collect()
}

trait CoreNodeKindExt {
fn run_config(&self) -> NodeRunConfig;
fn dynamic(&self) -> bool;
}

impl CoreNodeKindExt for CoreNodeKind {
fn run_config(&self) -> NodeRunConfig {
match self {
CoreNodeKind::Runtime(n) => NodeRunConfig {
inputs: runtime_node_inputs(n),
outputs: runtime_node_outputs(n),
},
CoreNodeKind::Custom(n) => n.run_config.clone(),
}
}

fn dynamic(&self) -> bool {
match self {
CoreNodeKind::Runtime(_n) => false,
CoreNodeKind::Custom(n) => n.source == DYNAMIC_SOURCE,
}
}
}
6 changes: 3 additions & 3 deletions binaries/daemon/src/spawn.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{
log, node_communication::spawn_listener_loop, node_inputs, DoraEvent, Event, OutputId,
RunningNode,
log, node_communication::spawn_listener_loop, node_inputs, CoreNodeKindExt, DoraEvent, Event,
OutputId, RunningNode,
};
use aligned_vec::{AVec, ConstAlign};
use crossbeam::queue::ArrayQueue;
Expand All @@ -9,7 +9,7 @@ use dora_core::{
config::DataId,
descriptor::{
resolve_path, source_is_url, Descriptor, OperatorDefinition, OperatorSource, PythonSource,
ResolvedNode, DYNAMIC_SOURCE, SHELL_SOURCE,
ResolvedNode, ResolvedNodeExt, DYNAMIC_SOURCE, SHELL_SOURCE,
},
get_python_path,
uhlc::HLC,
Expand Down
1 change: 1 addition & 0 deletions binaries/runtime/src/operator/python.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,7 @@ mod callback_impl {
use super::SendOutputCallback;
use aligned_vec::{AVec, ConstAlign};
use arrow::{array::ArrayData, pyarrow::FromPyArrow};
use dora_core::metadata::ArrowTypeInfoExt;
use dora_message::metadata::ArrowTypeInfo;
use dora_node_api::{
arrow_utils::{copy_array_into_sample, required_data_size},
Expand Down
4 changes: 2 additions & 2 deletions examples/multiple-daemons/run.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use dora_coordinator::{ControlEvent, Event};
use dora_core::{
descriptor::Descriptor,
descriptor::{read_as_descriptor, DescriptorExt},
topics::{DORA_COORDINATOR_PORT_CONTROL_DEFAULT, DORA_COORDINATOR_PORT_DEFAULT},
};
use dora_message::{
Expand Down Expand Up @@ -115,7 +115,7 @@ async fn start_dataflow(
dataflow: &Path,
coordinator_events_tx: &Sender<Event>,
) -> eyre::Result<Uuid> {
let dataflow_descriptor = Descriptor::read(dataflow)
let dataflow_descriptor = read_as_descriptor(dataflow)
.await
.wrap_err("failed to read yaml dataflow")?;
let working_dir = dataflow
Expand Down
2 changes: 1 addition & 1 deletion libraries/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ repository.workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
dora-message = { workspace = true }
eyre = "0.6.8"
serde = { version = "1.0.136", features = ["derive"] }
serde_yaml = "0.9.11"
Expand All @@ -22,4 +23,3 @@ tokio = { version = "1.24.1", features = ["fs", "process", "sync"] }
schemars = "0.8.19"
serde_json = "1.0.117"
log = { version = "0.4.21", features = ["serde"] }
uhlc = "0.5.1"
2 changes: 1 addition & 1 deletion libraries/core/src/bin/generate_schema.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{env, path::Path};

use dora_core::descriptor::Descriptor;
use dora_message::descriptor::Descriptor;
use schemars::schema_for;

fn main() {
Expand Down
Loading

0 comments on commit 7b1f757

Please sign in to comment.