Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(tracing): Add tracing for understand particle processing #1935

Merged
merged 36 commits into from
Dec 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

120 changes: 83 additions & 37 deletions aquamarine/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,19 @@
* limitations under the License.
*/

use futures::future::BoxFuture;
use futures::FutureExt;
use std::sync::Arc;
use std::task::Context;
use std::{
collections::VecDeque,
task::{Poll, Waker},
task::{Context, Poll, Waker},
};
use tracing::{instrument, Instrument, Span};

use fluence_keypair::KeyPair;
use futures::future::BoxFuture;
use futures::FutureExt;
use tracing::{Instrument, Span};

use fluence_libp2p::PeerId;
use particle_execution::{ParticleFunctionStatic, ServiceFunction};
use particle_protocol::Particle;
use particle_protocol::{ExtendedParticle, Particle};

use crate::deadline::Deadline;
use crate::particle_effects::RoutingEffects;
Expand All @@ -42,12 +40,20 @@ struct Reusables<RT> {
}

type AVMCallResult<RT> = FutResult<(usize, Option<RT>), RoutingEffects, InterpretationStats>;
type AVMTask<RT> = BoxFuture<'static, (Reusables<RT>, ParticleEffects, InterpretationStats)>;
type AVMTask<RT> = BoxFuture<
'static,
(
Reusables<RT>,
ParticleEffects,
InterpretationStats,
Arc<Span>,
),
>;
pub struct Actor<RT, F> {
/// Particle of that actor is expired after that deadline
deadline: Deadline,
future: Option<AVMTask<RT>>,
mailbox: VecDeque<Particle>,
mailbox: VecDeque<ExtendedParticle>,
waker: Option<Waker>,
functions: Functions<F>,
/// Particle that's memoized on the actor creation.
Expand All @@ -58,8 +64,8 @@ pub struct Actor<RT, F> {
/// It's either `host_peer_id` or local worker peer id
current_peer_id: PeerId,
key_pair: KeyPair,
span: Span,
data_store: Arc<ParticleDataStore>,
deal_id: Option<String>,
}

impl<RT, F> Actor<RT, F>
Expand All @@ -72,9 +78,10 @@ where
functions: Functions<F>,
current_peer_id: PeerId,
key_pair: KeyPair,
span: Span,
data_store: Arc<ParticleDataStore>,
deal_id: Option<String>,
) -> Self {
let particle = particle;
Self {
deadline: Deadline::from(particle),
functions,
Expand All @@ -83,18 +90,13 @@ where
waker: None,
// Clone particle without data
particle: Particle {
id: particle.id.clone(),
init_peer_id: particle.init_peer_id,
timestamp: particle.timestamp,
ttl: particle.ttl,
script: particle.script.clone(),
signature: particle.signature.clone(),
data: vec![],
..particle.clone()
},
current_peer_id,
key_pair,
span,
data_store,
deal_id,
}
}

Expand All @@ -119,7 +121,8 @@ where
self.functions.set_function(function)
}

pub fn ingest(&mut self, particle: Particle) {
#[instrument(level = tracing::Level::INFO, skip_all)]
pub fn ingest(&mut self, particle: ExtendedParticle) {
self.mailbox.push_back(particle);
self.wake();
}
Expand All @@ -143,21 +146,34 @@ where

fn poll_avm_future(&mut self, cx: &mut Context<'_>) -> Option<Poll<AVMCallResult<RT>>> {
if let Some(Poll::Ready(res)) = self.future.as_mut().map(|f| f.poll_unpin(cx)) {
let (reusables, effects, stats) = res;
let _entered = self.span.enter();
let (reusables, effects, stats, parent_span) = res;
let span = tracing::info_span!(
parent: parent_span.as_ref(),
"Actor::poll_avm_future::future_ready",
particle_id= self.particle.id,
deal_id = self.deal_id
);
let _span_guard = span.enter();

self.future.take();

let waker = cx.waker().clone();
// Schedule execution of functions
self.functions
.execute(self.particle.id.clone(), effects.call_requests, waker);
self.functions.execute(
self.particle.id.clone(),
effects.call_requests,
waker,
parent_span.clone(),
);

let effects = RoutingEffects {
particle: Particle {
data: effects.new_data,
..self.particle.clone()
},
particle: ExtendedParticle::linked(
gurinderu marked this conversation as resolved.
Show resolved Hide resolved
Particle {
data: effects.new_data,
..self.particle.clone()
},
parent_span,
),
next_peers: effects.next_peers,
};
return Some(Poll::Ready(FutResult {
Expand All @@ -184,30 +200,39 @@ where
}

// Gather CallResults
let (calls, stats) = self.functions.drain();
let (calls, stats, call_spans) = self.functions.drain();

// Take the next particle
let particle = self.mailbox.pop_front();
let ext_particle = self.mailbox.pop_front();

if particle.is_none() && calls.is_empty() {
if ext_particle.is_none() && calls.is_empty() {
debug_assert!(stats.is_empty(), "stats must be empty if calls are empty");
// Nothing to execute, return vm
return ActorPoll::Vm(vm_id, vm);
}

let particle = particle.unwrap_or_else(|| {
// If mailbox is empty, then take self.particle.
// Its data is empty, so `vm` will process `calls` on the old (saved on disk) data
self.particle.clone()
});
let particle = ext_particle
.as_ref()
.map(|p| p.particle.clone())
.unwrap_or_else(|| {
// If mailbox is empty, then take self.particle.
// Its data is empty, so `vm` will process `calls` on the old (saved on disk) data
self.particle.clone()
});

let waker = cx.waker().clone();
let data_store = self.data_store.clone();
let key_pair = self.key_pair.clone();
let peer_id = self.current_peer_id;

let (async_span, linking_span) =
self.create_spans(call_spans, ext_particle, particle.id.as_str());

self.future = Some(
async move {
let res = vm
.execute(data_store, (particle.clone(), calls), peer_id, key_pair)
.in_current_span()
.await;

waker.wake();
Expand All @@ -217,15 +242,36 @@ where
vm: res.runtime,
};

(reusables, res.effects, res.stats)
(reusables, res.effects, res.stats, linking_span)
}
.instrument(self.span.clone())
.instrument(async_span)
.boxed(),
);
self.wake();

ActorPoll::Executing(stats)
}

fn create_spans(
&self,
call_spans: Vec<Arc<Span>>,
ext_particle: Option<ExtendedParticle>,
particle_id: &str,
) -> (Span, Arc<Span>) {
let async_span = tracing::info_span!(
"Actor: async AVM process particle & call results",
particle_id = particle_id,
deal_id = self.deal_id
);
if let Some(ext_particle) = ext_particle.as_ref() {
async_span.follows_from(ext_particle.span.as_ref());
}
for span in call_spans {
async_span.follows_from(span.as_ref());
}
let linking_span = Arc::new(async_span.clone());
(async_span, linking_span)
}
fn wake(&self) {
if let Some(waker) = &self.waker {
waker.wake_by_ref();
Expand Down
12 changes: 8 additions & 4 deletions aquamarine/src/aquamarine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ use std::time::Duration;
use futures::StreamExt;
use tokio::sync::mpsc;
use tokio::task::JoinHandle;
use tracing::Instrument;
use tracing::{instrument, Instrument};

use fluence_libp2p::PeerId;
use health::HealthCheckRegistry;
use key_manager::KeyManager;
use particle_execution::{ParticleFunctionStatic, ServiceFunction};
use particle_protocol::Particle;
use particle_protocol::ExtendedParticle;
use peer_metrics::{ParticleExecutorMetrics, VmPoolMetrics};

use crate::aqua_runtime::AquaRuntime;
Expand Down Expand Up @@ -105,6 +105,8 @@ impl<RT: AquaRuntime, F: ParticleFunctionStatic> AquamarineBackend<RT, F> {
match self.inlet.poll_recv(cx) {
Poll::Ready(Some(Ingest { particle, function })) => {
wake = true;
let span = tracing::info_span!(parent: particle.span.as_ref(), "Aquamarine::poll::ingest");
let _guard = span.entered();
// set new particle to be executed
// every particle that comes from the connection pool first executed on the host peer id
self.plumber.ingest(particle, function, self.host_peer_id);
Expand Down Expand Up @@ -179,12 +181,13 @@ impl AquamarineApi {
}

/// Send particle to the interpreters pool
#[instrument(level = tracing::Level::INFO, skip_all)]
pub fn execute(
self,
particle: Particle,
particle: ExtendedParticle,
function: Option<ServiceFunction>,
) -> impl Future<Output = Result<(), AquamarineApiError>> {
let particle_id = particle.id.clone();
let particle_id = particle.particle.id.clone();
self.send_command(Ingest { particle, function }, Some(particle_id))
}

Expand Down Expand Up @@ -227,5 +230,6 @@ impl AquamarineApi {
AquamarineDied { particle_id }
})
}
.in_current_span()
}
}
7 changes: 4 additions & 3 deletions aquamarine/src/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,14 @@
* limitations under the License.
*/

use particle_execution::ServiceFunction;
use particle_protocol::Particle;
use std::collections::HashMap;

use particle_execution::ServiceFunction;
use particle_protocol::ExtendedParticle;

pub enum Command {
Ingest {
particle: Particle,
particle: ExtendedParticle,
function: Option<ServiceFunction>,
},
AddService {
Expand Down
4 changes: 4 additions & 0 deletions aquamarine/src/particle_data_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use avm_server::avm_runner::RawAVMOutcome;
use avm_server::{AnomalyData, CallResults, ParticleParameters};
use fluence_libp2p::PeerId;
use thiserror::Error;
use tracing::instrument;

use crate::DataStoreError::SerializeAnomaly;
use now_millis::now_ms;
Expand Down Expand Up @@ -82,6 +83,7 @@ impl ParticleDataStore {
Ok(())
}

#[instrument(level = tracing::Level::INFO, skip_all)]
pub async fn store_data(
&self,
data: &[u8],
Expand All @@ -97,6 +99,7 @@ impl ParticleDataStore {
Ok(())
}

#[instrument(level = tracing::Level::INFO)]
pub async fn read_data(&self, particle_id: &str, current_peer_id: &str) -> Result<Vec<u8>> {
let data_path = self.data_file(particle_id, current_peer_id);
let data = tokio::fs::read(&data_path).await.unwrap_or_default();
Expand Down Expand Up @@ -151,6 +154,7 @@ impl ParticleDataStore {
}

#[allow(clippy::too_many_arguments)]
#[instrument(level = tracing::Level::INFO, skip_all)]
pub async fn save_anomaly_data(
&self,
air_script: &str,
Expand Down
4 changes: 2 additions & 2 deletions aquamarine/src/particle_effects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/

use avm_server::CallRequests;
use particle_protocol::Particle;
use particle_protocol::ExtendedParticle;
use std::time::Duration;

use libp2p::PeerId;
Expand Down Expand Up @@ -65,6 +65,6 @@ impl InterpretationStats {
/// Instruct to send particle to either virtual or remote peers.
#[derive(Clone, Debug)]
pub struct RoutingEffects {
pub particle: Particle,
pub particle: ExtendedParticle,
pub next_peers: Vec<PeerId>,
}
Loading
Loading