Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(tracing): Add tracing for understand particle processing #1935

Merged
merged 36 commits into from
Dec 15, 2023
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

105 changes: 71 additions & 34 deletions aquamarine/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
* limitations under the License.
*/

use std::sync::Arc;
use std::{
collections::VecDeque,
task::{Context, Poll, Waker},
Expand All @@ -22,11 +23,11 @@ use std::{
use fluence_keypair::KeyPair;
use futures::future::BoxFuture;
use futures::FutureExt;
use tracing::{Instrument, Span};
use tracing::{instrument, Instrument, Span};

use fluence_libp2p::PeerId;
use particle_execution::{ParticleFunctionStatic, ServiceFunction};
use particle_protocol::Particle;
use particle_protocol::{ExtendedParticle, Particle};

use crate::deadline::Deadline;
use crate::particle_effects::RoutingEffects;
Expand All @@ -39,11 +40,22 @@ struct Reusables<RT> {
vm: Option<RT>,
}

type ExecutionTask<RT> = Option<
BoxFuture<
'static,
(
Reusables<RT>,
ParticleEffects,
InterpretationStats,
Arc<Span>,
),
>,
>;
pub struct Actor<RT, F> {
/// Particle of that actor is expired after that deadline
deadline: Deadline,
future: Option<BoxFuture<'static, (Reusables<RT>, ParticleEffects, InterpretationStats)>>,
mailbox: VecDeque<Particle>,
future: ExecutionTask<RT>,
mailbox: VecDeque<ExtendedParticle>,
waker: Option<Waker>,
functions: Functions<F>,
/// Particle that's memoized on the actor creation.
Expand All @@ -54,7 +66,7 @@ pub struct Actor<RT, F> {
/// It's either `host_peer_id` or local worker peer id
current_peer_id: PeerId,
key_pair: KeyPair,
span: Span,
deal_id: Option<String>,
}

impl<RT, F> Actor<RT, F>
Expand All @@ -67,27 +79,22 @@ where
functions: Functions<F>,
current_peer_id: PeerId,
key_pair: KeyPair,
span: Span,
deal_id: Option<String>,
) -> Self {
Self {
deadline: Deadline::from(particle),
deadline: Deadline::from(&particle),
gurinderu marked this conversation as resolved.
Show resolved Hide resolved
functions,
future: None,
mailbox: <_>::default(),
waker: None,
// Clone particle without data
particle: Particle {
id: particle.id.clone(),
init_peer_id: particle.init_peer_id,
timestamp: particle.timestamp,
ttl: particle.ttl,
script: particle.script.clone(),
signature: particle.signature.clone(),
data: vec![],
..particle.clone()
},
current_peer_id,
key_pair,
span,
deal_id,
}
}

Expand All @@ -102,14 +109,17 @@ where
pub fn cleanup(&self, vm: &mut RT) {
tracing::debug!(
target: "particle_reap",
particle_id = self.particle.id, worker_id = self.current_peer_id.to_string(),
particle_id = self.particle.id,
worker_id = self.current_peer_id.to_string(),
deal_id = self.deal_id,
"Reaping particle's actor"
);
// TODO: remove dirs without using vm https://github.com/fluencelabs/fluence/issues/1216
// TODO: ??? we don't have this issue anymore
if let Err(err) = vm.cleanup(&self.particle.id, &self.current_peer_id.to_string()) {
tracing::warn!(
particle_id = self.particle.id,
deal_id = self.deal_id,
"Error cleaning up after particle {:?}",
err
);
Expand All @@ -124,7 +134,8 @@ where
self.functions.set_function(function)
}

pub fn ingest(&mut self, particle: Particle) {
#[instrument(level = tracing::Level::INFO, skip_all)]
pub fn ingest(&mut self, particle: ExtendedParticle) {
self.mailbox.push_back(particle);
self.wake();
}
Expand All @@ -141,22 +152,29 @@ where
self.functions.poll(cx);

// Poll AquaVM future
if let Some(Ready((reusables, effects, stats))) =
if let Some(Ready((reusables, effects, stats, span))) =
self.future.as_mut().map(|f| f.poll_unpin(cx))
{
let _entered = self.span.enter();

let local_span = tracing::info_span!(parent: span.as_ref(), "Poll AVM future", particle_id= self.particle.id, deal_id = self.deal_id);
gurinderu marked this conversation as resolved.
Show resolved Hide resolved
let _span_guard = local_span.enter();
self.future.take();

let waker = cx.waker().clone();
// Schedule execution of functions
self.functions
.execute(self.particle.id.clone(), effects.call_requests, waker);
self.functions.execute(
self.particle.id.clone(),
effects.call_requests,
waker,
span.clone(),
);

let effects = RoutingEffects {
particle: Particle {
data: effects.new_data,
..self.particle.clone()
particle: ExtendedParticle {
particle: Particle {
data: effects.new_data,
..self.particle.clone()
},
span,
},
next_peers: effects.next_peers,
};
Expand Down Expand Up @@ -185,41 +203,60 @@ where
}

// Gather CallResults
let (calls, stats) = self.functions.drain();
let (calls, stats, spans) = self.functions.drain();

// Take the next particle
let particle = self.mailbox.pop_front();
let ext_particle = self.mailbox.pop_front();

if particle.is_none() && calls.is_empty() {
if ext_particle.is_none() && calls.is_empty() {
debug_assert!(stats.is_empty(), "stats must be empty if calls are empty");
// Nothing to execute, return vm
return ActorPoll::Vm(vm_id, vm);
}

let particle = particle.unwrap_or_else(|| {
// If mailbox is empty, then take self.particle.
// Its data is empty, so `vm` will process `calls` on the old (saved on disk) data
self.particle.clone()
});
let particle = ext_particle
.as_ref()
.map(|p| p.particle.clone())
.unwrap_or_else(|| {
// If mailbox is empty, then take self.particle.
// Its data is empty, so `vm` will process `calls` on the old (saved on disk) data
self.particle.clone()
});

let waker = cx.waker().clone();
// Take ownership of vm to process particle
let peer_id = self.current_peer_id;
// TODO: get rid of this clone by recovering key_pair after `vm.execute` (not trivial to implement)
let key_pair = self.key_pair.clone();

let async_span = tracing::info_span!(
"Actor: async AVM process particle & call results",
particle_id = particle.id,
deal_id = self.deal_id
);
if let Some(ext_particle) = ext_particle.as_ref() {
async_span.follows_from(ext_particle.span.as_ref());
}
for span in spans {
async_span.follows_from(span.as_ref());
}
let linking_span = Arc::new(async_span.clone());
gurinderu marked this conversation as resolved.
Show resolved Hide resolved

// TODO: add timeout for execution https://github.com/fluencelabs/fluence/issues/1212
self.future = Some(
async move {
let res = vm
.execute((particle, calls), waker, peer_id, key_pair)
.in_current_span()
.await;

let reusables = Reusables {
vm_id,
vm: res.runtime,
};
(reusables, res.effects, res.stats)
(reusables, res.effects, res.stats, linking_span)
}
.instrument(self.span.clone())
.instrument(async_span)
.boxed(),
);
self.wake();
Expand Down
12 changes: 8 additions & 4 deletions aquamarine/src/aquamarine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@ use std::time::Duration;
use futures::StreamExt;
use tokio::sync::mpsc;
use tokio::task::JoinHandle;
use tracing::Instrument;
use tracing::{instrument, Instrument};

use fluence_libp2p::PeerId;
use health::HealthCheckRegistry;
use key_manager::KeyManager;
use particle_execution::{ParticleFunctionStatic, ServiceFunction};
use particle_protocol::Particle;
use particle_protocol::ExtendedParticle;
use peer_metrics::{ParticleExecutorMetrics, VmPoolMetrics};

use crate::aqua_runtime::AquaRuntime;
Expand Down Expand Up @@ -88,6 +88,8 @@ impl<RT: AquaRuntime, F: ParticleFunctionStatic> AquamarineBackend<RT, F> {
match self.inlet.poll_recv(cx) {
Poll::Ready(Some(Ingest { particle, function })) => {
wake = true;
let span = tracing::info_span!(parent: particle.span.as_ref(), "Aquamarine: Poll Ingest");
gurinderu marked this conversation as resolved.
Show resolved Hide resolved
let _guard = span.entered();
// set new particle to be executed
// every particle that comes from the connection pool first executed on the host peer id
self.plumber.ingest(particle, function, self.host_peer_id);
Expand Down Expand Up @@ -156,12 +158,13 @@ impl AquamarineApi {
}

/// Send particle to the interpreters pool
#[instrument(level = tracing::Level::INFO, skip_all)]
pub fn execute(
self,
particle: Particle,
particle: ExtendedParticle,
function: Option<ServiceFunction>,
) -> impl Future<Output = Result<(), AquamarineApiError>> {
let particle_id = particle.id.clone();
let particle_id = particle.particle.id.clone();
self.send_command(Ingest { particle, function }, Some(particle_id))
}

Expand Down Expand Up @@ -204,5 +207,6 @@ impl AquamarineApi {
AquamarineDied { particle_id }
})
}
.in_current_span()
}
}
4 changes: 2 additions & 2 deletions aquamarine/src/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@
*/

use particle_execution::ServiceFunction;
use particle_protocol::Particle;
use particle_protocol::ExtendedParticle;
use std::collections::HashMap;

pub enum Command {
Ingest {
particle: Particle,
particle: ExtendedParticle,
function: Option<ServiceFunction>,
},
AddService {
Expand Down
4 changes: 2 additions & 2 deletions aquamarine/src/particle_effects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/

use avm_server::CallRequests;
use particle_protocol::Particle;
use particle_protocol::ExtendedParticle;
use std::time::Duration;

use libp2p::PeerId;
Expand Down Expand Up @@ -54,6 +54,6 @@ pub struct InterpretationStats {
/// Instruct to send particle to either virtual or remote peers.
#[derive(Clone, Debug)]
pub struct RoutingEffects {
pub particle: Particle,
pub particle: ExtendedParticle,
pub next_peers: Vec<PeerId>,
}
10 changes: 8 additions & 2 deletions aquamarine/src/particle_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use avm_server::{CallResults, ParticleParameters};
use fluence_keypair::KeyPair;
use futures::{future::BoxFuture, FutureExt};
use humantime::format_duration as pretty;
use tracing::{instrument, Instrument, Span};

use fluence_libp2p::PeerId;
use particle_protocol::Particle;
Expand Down Expand Up @@ -59,21 +60,25 @@ impl<RT: AquaRuntime> ParticleExecutor for RT {
type Future = Fut<Option<Self>>;
type Particle = (Particle, CallResults);

#[instrument(level = tracing::Level::INFO, skip_all)]
fn execute(
mut self,
p: Self::Particle,
waker: Waker,
current_peer_id: PeerId,
key_pair: KeyPair,
) -> Self::Future {
let span = Span::current();
let (particle, calls) = p;
let particle_id = particle.id.clone();
let data_len = particle.data.len();
let span = tracing::info_span!("Execute");
let blockng_span =
tracing::info_span!(parent: &span, "Particle executor: blocking AVM call");
let async_span = tracing::info_span!(parent: &span, "Particle executor: async task");
gurinderu marked this conversation as resolved.
Show resolved Hide resolved
let task = tokio::task::Builder::new()
.name(&format!("Particle {}", particle.id))
.spawn_blocking(move || {
span.in_scope(move || {
blockng_span.in_scope(move || {
let now = Instant::now();
tracing::trace!(target: "execution", particle_id = particle.id, "Executing particle");

Expand Down Expand Up @@ -162,6 +167,7 @@ impl<RT: AquaRuntime> ParticleExecutor for RT {
}
}
}
.instrument(async_span)
.boxed()
}
}
Loading
Loading