Skip to content

Commit

Permalink
Merge pull request #4350 from systeminit/brit/pre-calculate-inferred-…
Browse files Browse the repository at this point in the history
…connections-in-dvu

Calculate Inferred Connections Once for a DVU Job
  • Loading branch information
zacharyhamm authored Aug 16, 2024
2 parents 18be599 + 5d6ce84 commit 65b2a07
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 42 deletions.
102 changes: 68 additions & 34 deletions lib/dal/src/attribute/value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ use std::sync::Arc;

use async_recursion::async_recursion;
use indexmap::IndexMap;
use itertools::Itertools;
use petgraph::prelude::*;
use serde::{Deserialize, Serialize};
use serde_json::Value;
Expand All @@ -57,6 +58,7 @@ pub use dependent_value_graph::DependentValueGraph;

use crate::attribute::prototype::AttributePrototypeError;
use crate::change_set::ChangeSetError;
use crate::component::inferred_connection_graph::InferredConnectionGraph;
use crate::component::socket::ComponentInputSocket;
use crate::func::argument::{FuncArgument, FuncArgumentError};
use crate::func::intrinsics::IntrinsicFunc;
Expand Down Expand Up @@ -458,6 +460,7 @@ impl AttributeValue {
ctx: &DalContext,
attribute_value_id: AttributeValueId,
read_lock: Arc<RwLock<()>>,
inferred_connection_graph: Arc<Option<InferredConnectionGraph>>,
) -> AttributeValueResult<FuncRunValue> {
// When functions are being executed in the dependent values update job,
// we need to ensure we are not reading our input sources from a graph
Expand All @@ -473,8 +476,12 @@ impl AttributeValue {
// Prepare arguments for prototype function execution.
let value_is_for = Self::is_for(ctx, attribute_value_id).await?;
let (prototype_func_id, prepared_args) =
Self::prepare_arguments_for_prototype_function_execution(ctx, attribute_value_id)
.await?;
Self::prepare_arguments_for_prototype_function_execution(
ctx,
attribute_value_id,
inferred_connection_graph,
)
.await?;

let result_channel = FuncRunner::run_attribute_value(
ctx,
Expand Down Expand Up @@ -569,6 +576,7 @@ impl AttributeValue {
pub async fn prepare_arguments_for_prototype_function_execution(
ctx: &DalContext,
attribute_value_id: AttributeValueId,
inferred_connection_graph: Arc<Option<InferredConnectionGraph>>,
) -> AttributeValueResult<(FuncId, Value)> {
// Cache the values we need for preparing arguments for execution.
let prototype_id = Self::prototype_id(ctx, attribute_value_id).await?;
Expand Down Expand Up @@ -664,7 +672,10 @@ impl AttributeValue {
// explicitly configured args

if func_binding_args.is_empty() {
let inferred_inputs = Self::get_inferred_input_values(ctx, attribute_value_id).await?;
let inferred_inputs =
Self::get_inferred_input_values(ctx, attribute_value_id, inferred_connection_graph)
.await?;

if !inferred_inputs.is_empty() {
let input_func = AttributePrototype::func_id(ctx, prototype_id).await?;
if let Some(func_arg) = FuncArgument::list_for_func(ctx, input_func).await?.pop() {
Expand Down Expand Up @@ -707,6 +718,7 @@ impl AttributeValue {
async fn get_inferred_input_values(
ctx: &DalContext,
input_attribute_value_id: AttributeValueId,
inferred_connection_graph: Arc<Option<InferredConnectionGraph>>,
) -> AttributeValueResult<Vec<Value>> {
let maybe_input_socket_id =
match AttributeValue::is_for(ctx, input_attribute_value_id).await? {
Expand All @@ -727,40 +739,56 @@ impl AttributeValue {
};
let mut inputs = vec![];

match ComponentInputSocket::find_inferred_connections(ctx, component_input_socket).await {
Ok(connections) => {
for component_output_socket in connections {
// Both deleted and non deleted components can feed data into deleted components.
// ** ONLY ** non-deleted components can feed data into non-deleted components
if Component::should_data_flow_between_components(
ctx,
component_input_socket.component_id,
component_output_socket.component_id,
)
let connections = match inferred_connection_graph.as_ref() {
Some(inferred_connection_graph) => {
let mut outputs = inferred_connection_graph
.get_component_connections_to_input_socket(component_input_socket)
.into_iter()
.collect_vec();
outputs.sort_by_key(|c| c.component_id);
outputs
}
None => {
match ComponentInputSocket::find_inferred_connections(ctx, component_input_socket)
.await
.map_err(Box::new)?
{
// XXX: We need to properly handle the difference between "there is
// XXX: no value" vs "the value is null", but right now we collapse
// XXX: the two to just be "null" when passing these to a function.
let output_av = Self::get_by_id_or_error(
ctx,
component_output_socket.attribute_value_id,
)
.await?;
let view = output_av.view(ctx).await?.unwrap_or(Value::Null);
inputs.push(view);
{
Ok(connections) => connections,
Err(err) => {
error!(
?err,
%component_id,
%input_socket_id,
%input_attribute_value_id,
"error found while finding available inferred connections to input socket"
);
vec![]
}
}
}
Err(err) => error!(
?err,
%component_id,
%input_socket_id,
%input_attribute_value_id,
"error found while finding available inferred connections to input socket"
),
};

for component_output_socket in connections {
// Both deleted and non deleted components can feed data into deleted components.
// ** ONLY ** non-deleted components can feed data into non-deleted components
if Component::should_data_flow_between_components(
ctx,
component_input_socket.component_id,
component_output_socket.component_id,
)
.await
.map_err(Box::new)?
{
// XXX: We need to properly handle the difference between "there is
// XXX: no value" vs "the value is null", but right now we collapse
// XXX: the two to just be "null" when passing these to a function.
let output_av =
Self::get_by_id_or_error(ctx, component_output_socket.attribute_value_id)
.await?;
let view = output_av.view(ctx).await?.unwrap_or(Value::Null);
inputs.push(view);
}
}

Ok(inputs)
}

Expand Down Expand Up @@ -836,8 +864,14 @@ impl AttributeValue {
) -> AttributeValueResult<()> {
// this lock is never locked for writing so is effectively a no-op here
let read_lock = Arc::new(RwLock::new(()));
let execution_result =
AttributeValue::execute_prototype_function(ctx, attribute_value_id, read_lock).await?;
// Don't need to pass in an Inferred Dependency Graph for one off updates, we can just calculate
let execution_result = AttributeValue::execute_prototype_function(
ctx,
attribute_value_id,
read_lock,
Arc::new(None),
)
.await?;

AttributeValue::set_values_from_func_run_value(ctx, attribute_value_id, execution_result)
.await?;
Expand Down
15 changes: 11 additions & 4 deletions lib/dal/src/job/definition/dependent_values_update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use ulid::Ulid;

use crate::{
attribute::value::{dependent_value_graph::DependentValueGraph, AttributeValueError},
component::inferred_connection_graph::InferredConnectionGraph,
job::{
consumer::{
JobCompletionState, JobConsumer, JobConsumerError, JobConsumerMetadata,
Expand All @@ -26,15 +27,17 @@ use crate::{
},
prop::PropError,
status::{StatusMessageState, StatusUpdate, StatusUpdateError},
AccessBuilder, AttributeValue, AttributeValueId, DalContext, TransactionsError, Visibility,
WorkspacePk, WorkspaceSnapshotError, WsEvent, WsEventError,
AccessBuilder, AttributeValue, AttributeValueId, ComponentError, DalContext, TransactionsError,
Visibility, WorkspacePk, WorkspaceSnapshotError, WsEvent, WsEventError,
};

#[remain::sorted]
#[derive(Debug, Error)]
pub enum DependentValueUpdateError {
#[error("attribute value error: {0}")]
AttributeValue(#[from] AttributeValueError),
#[error("component error: {0}")]
Component(#[from] ComponentError),
#[error("prop error: {0}")]
Prop(#[from] PropError),
#[error("status update error: {0}")]
Expand Down Expand Up @@ -155,7 +158,9 @@ impl DependentValuesUpdate {
let mut update_join_set = JoinSet::new();

let mut independent_value_ids = dependency_graph.independent_values();

// Calculate the inferred connection graph up front so we reuse it throughout the job and don't rebuild each time
let inferred_connection_graph =
Arc::new(Some(InferredConnectionGraph::for_workspace(ctx).await?));
loop {
if independent_value_ids.is_empty() && task_id_to_av_id.is_empty() {
break;
Expand All @@ -172,6 +177,7 @@ impl DependentValuesUpdate {
ctx.clone(),
attribute_value_id,
self.set_value_lock.clone(),
inferred_connection_graph.clone()
)
.instrument(info_span!(parent: parent_span, "dependent_values_update.values_from_prototype_function_execution",
attribute_value.id = %attribute_value_id,
Expand Down Expand Up @@ -311,6 +317,7 @@ async fn values_from_prototype_function_execution(
ctx: DalContext,
attribute_value_id: AttributeValueId,
set_value_lock: Arc<RwLock<()>>,
inferred_connection_graph: Arc<Option<InferredConnectionGraph>>,
) -> (Ulid, DependentValueUpdateResult<FuncRunValue>) {
if let Err(err) = send_update_message(
&ctx,
Expand All @@ -324,7 +331,7 @@ async fn values_from_prototype_function_execution(
}
let parent_span = Span::current();
let result =
AttributeValue::execute_prototype_function(&ctx, attribute_value_id, set_value_lock)
AttributeValue::execute_prototype_function(&ctx, attribute_value_id, set_value_lock, inferred_connection_graph)
.instrument(info_span!(parent:parent_span, "value.execute_prototype_function", attribute_value.id= %attribute_value_id))
.await
.map_err(Into::into);
Expand Down
13 changes: 9 additions & 4 deletions lib/dal/tests/integration_test/attribute/value.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::sync::Arc;

use dal::prop::PropPath;
use dal::{AttributeValue, Component, DalContext, Prop, Schema};
use dal_test::helpers::ChangeSetTestHelpers;
Expand Down Expand Up @@ -43,10 +45,13 @@ async fn arguments_for_prototype_function_execution(ctx: &mut DalContext) {
.pop()
.expect("empty attribute value ids");
assert!(attribute_value_ids.is_empty());
let (_, arguments) =
AttributeValue::prepare_arguments_for_prototype_function_execution(ctx, attribute_value_id)
.await
.expect("could not prepare arguments");
let (_, arguments) = AttributeValue::prepare_arguments_for_prototype_function_execution(
ctx,
attribute_value_id,
Arc::new(None),
)
.await
.expect("could not prepare arguments");
assert_eq!(
serde_json::json![{
"identity": expected
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::sync::Arc;

use axum::extract::{Host, OriginalUri};
use axum::{extract::Query, Json};
use dal::{AttributeValue, OutputSocket, OutputSocketId, Prop, PropId, Visibility};
Expand Down Expand Up @@ -74,6 +76,7 @@ pub async fn get_prototype_arguments(
AttributeValue::prepare_arguments_for_prototype_function_execution(
&ctx,
attribute_value_id,
Arc::new(None),
)
.await?;

Expand Down

0 comments on commit 65b2a07

Please sign in to comment.