Skip to content

Commit

Permalink
fix: silencing warnings that do not give helpful info anyway (#400)
Browse files Browse the repository at this point in the history
  • Loading branch information
jsoverson authored Aug 9, 2023
1 parent 744f1ac commit daccbc3
Showing 1 changed file with 45 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ impl State {
}
}

fn get_tx(&self, uuid: &Uuid) -> Option<&(ExecutionContext, Metadata)> {
fn get_ctx(&self, uuid: &Uuid) -> Option<&(ExecutionContext, Metadata)> {
self.context_map.get(uuid)
}

Expand All @@ -35,34 +35,34 @@ impl State {

pub(super) fn run_cleanup(&mut self) -> Result<(), ExecutionError> {
let mut cleanup = Vec::new();
for (id, (tx, meta)) in self.context_map.iter() {
let last_update = tx.last_access().elapsed().unwrap();
for (id, (ctx, meta)) in self.context_map.iter() {
let last_update = ctx.last_access().elapsed().unwrap();

let active_instances = tx.active_instances().iter().map(|i| i.id()).collect::<Vec<_>>();
let active_instances = ctx.active_instances().iter().map(|i| i.id()).collect::<Vec<_>>();
if last_update > EventLoop::SLOW_TX_TIMEOUT {
if active_instances.is_empty() && tx.done() {
if active_instances.is_empty() && ctx.done() {
cleanup.push(*id);

continue;
}

if !meta.have_warned() {
warn!(%id, ?active_instances, "slow tx: no packet received in a long time");
warn!(%id, ?active_instances, "slow invocation: no packet received in a long time");
meta.set_have_warned();
}
}
if last_update > EventLoop::STALLED_TX_TIMEOUT {
match tx.check_stalled() {
match ctx.check_stalled() {
Ok(TxState::Finished) => {
// execution has completed its output and isn't generating more data, clean it up.
cleanup.push(*id);
}
Ok(TxState::OutputPending) => {
error!(%id, "tx reached timeout while still waiting for output data");
error!(%id, "invocation reached timeout while still waiting for output data");
cleanup.push(*id);
}
Ok(TxState::CompleteWithTasksPending) => {
error!(%id, "tx reached timeout while still waiting for tasks to complete");
error!(%id, "invocation reached timeout while still waiting for tasks to complete");
cleanup.push(*id);
}
Err(error) => {
Expand Down Expand Up @@ -98,10 +98,10 @@ impl State {

#[allow(clippy::unused_async)]
pub(super) async fn handle_exec_done(&mut self, ctx_id: Uuid) -> Result<(), ExecutionError> {
let is_done = if let Some(tx) = self.get_mut(&ctx_id) {
let statistics = tx.finish()?;
let is_done = if let Some(ctx) = self.get_mut(&ctx_id) {
let statistics = ctx.finish()?;
trace!(?statistics);
tx.active_instances().is_empty()
ctx.active_instances().is_empty()
} else {
false
};
Expand All @@ -114,23 +114,24 @@ impl State {

#[allow(clippy::unused_async)]
async fn handle_input_data(&mut self, ctx_id: Uuid, port: PortReference) -> Result<(), ExecutionError> {
let (tx, _) = match self.get_tx(&ctx_id) {
Some(tx) => tx,
let (ctx, _) = match self.get_ctx(&ctx_id) {
Some(ctx) => ctx,
None => {
// This is a warning, not an error, because it's possible the transaction completes OK, it's just that a
// component is misbehaving.
warn!(
port = %port, %ctx_id, "still receiving upstream data for missing tx, this may be due to a component panic or premature close"
debug!(
port = %port, %ctx_id, "still receiving upstream data for invocation that has already been completed, this may be due to a component panic or premature close"
);
return Ok(());
}
};

let graph = tx.schematic();
let graph = ctx.schematic();
let port_name = graph.get_port_name(&port);
let instance = tx.instance(port.node_index());
let instance = ctx.instance(port.node_index());

tx.stats
ctx
.stats
.mark(format!("input:{}:{}:ready", port.node_index(), port.port_index()));

let is_schematic_output = port.node_index() == graph.output().index();
Expand All @@ -142,8 +143,8 @@ impl State {
"handling schematic output"
);

tx.handle_schematic_output()?;
} else if let Some(packet) = tx.take_instance_input(&port) {
ctx.handle_schematic_output()?;
} else if let Some(packet) = ctx.take_instance_input(&port) {
if packet.is_error() {
warn!(
operation = %instance,
Expand All @@ -160,34 +161,35 @@ impl State {
);
}

tx.push_packets(port.node_index(), vec![packet]).await?;
ctx.push_packets(port.node_index(), vec![packet]).await?;
}
Ok(())
}

#[allow(clippy::unused_async)]
async fn handle_output_data(&mut self, ctx_id: Uuid, port: PortReference) -> Result<(), ExecutionError> {
let (tx, _) = match self.get_tx(&ctx_id) {
Some(tx) => tx,
let (ctx, _) = match self.get_ctx(&ctx_id) {
Some(ctx) => ctx,
None => {
// This is a warning, not an error, because it's possible the transaction completes OK, it's just that a
// component is misbehaving.
warn!(
port = %port, %ctx_id, "still receiving downstream data for missing tx, this may be due to a component panic or premature close"
debug!(
port = %port, %ctx_id, "still receiving downstream data for invocation that has already been completed, this may be due to a component panic or premature close"
);
return Ok(());
}
};

let graph = tx.schematic();
let graph = ctx.schematic();
let port_name = graph.get_port_name(&port);

let instance = tx.instance(port.node_index());
let instance = ctx.instance(port.node_index());

tx.stats
ctx
.stats
.mark(format!("output:{}:{}:ready", port.node_index(), port.port_index()));

if let Some(packet) = tx.take_instance_output(&port) {
if let Some(packet) = ctx.take_instance_output(&port) {
if packet.is_error() {
warn!(
operation = %instance,
Expand All @@ -210,7 +212,7 @@ impl State {
let name = graph.get_port_name(&downport);

let channel = self.channel.clone();
let downstream_instance = tx.instance(downport.node_index()).clone();
let downstream_instance = ctx.instance(downport.node_index()).clone();
let message = packet.clone().set_port(name);
trace!(%connection, "delivering packet to downstream",);
downstream_instance.buffer_in(&downport, message);
Expand All @@ -232,26 +234,26 @@ impl State {

#[allow(clippy::unused_async)]
pub(super) async fn handle_call_complete(&self, ctx_id: Uuid, data: CallComplete) -> Result<(), ExecutionError> {
let (tx, _) = match self.get_tx(&ctx_id) {
Some(tx) => tx,
let (ctx, _) = match self.get_ctx(&ctx_id) {
Some(ctx) => ctx,
None => {
// This is a warning, not an error, because it's possible the transaction completes OK, it's just that a
// component is misbehaving.
warn!(
debug!(
?data,
%ctx_id, "tried to cleanup call for missing tx, this may be due to a component panic or premature close"
%ctx_id, "tried to cleanup missing invocation, this may be due to a component panic or premature close"
);
return Ok(());
}
};
let instance = tx.instance(data.index);
let instance = ctx.instance(data.index);
debug!(operation = instance.id(), entity = %instance.entity(), "call complete");

if let Some(PacketPayload::Err(err)) = data.err {
warn!(?err, "op:error");
// If the call contains an error, then the component panicked.
// We need to propagate the error downward...
tx.handle_op_err(data.index, &err)?;
ctx.handle_op_err(data.index, &err)?;
// ...and clean up the call.
// instance.handle_stream_complete(CompletionStatus::Error)?;
}
Expand Down Expand Up @@ -295,16 +297,16 @@ impl ContextMap {
}

fn get(&self, uuid: &Uuid) -> Option<&(ExecutionContext, Metadata)> {
self.0.get(uuid).map(|tx| {
tx.0.update_last_access();
tx
self.0.get(uuid).map(|ctx| {
ctx.0.update_last_access();
ctx
})
}

fn get_mut(&mut self, uuid: &Uuid) -> Option<&mut ExecutionContext> {
self.0.get_mut(uuid).map(|tx| {
tx.0.update_last_access();
&mut tx.0
self.0.get_mut(uuid).map(|ctx| {
ctx.0.update_last_access();
&mut ctx.0
})
}

Expand Down

0 comments on commit daccbc3

Please sign in to comment.