diff --git a/Cargo.lock b/Cargo.lock index b548688c2e..26bbe3ca42 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4832,6 +4832,7 @@ dependencies = [ "system-services", "tokio", "tokio-stream", + "tonic 0.9.2", "tracing", "tracing-log", "tracing-logfmt", @@ -5338,6 +5339,7 @@ dependencies = [ "serde_json", "thiserror", "tokio", + "tracing", "unsigned-varint 0.8.0", ] diff --git a/aquamarine/src/actor.rs b/aquamarine/src/actor.rs index 10ef379e00..31b46e519f 100644 --- a/aquamarine/src/actor.rs +++ b/aquamarine/src/actor.rs @@ -14,21 +14,19 @@ * limitations under the License. */ +use futures::future::BoxFuture; +use futures::FutureExt; use std::sync::Arc; -use std::task::Context; use std::{ collections::VecDeque, - task::{Poll, Waker}, + task::{Context, Poll, Waker}, }; +use tracing::{instrument, Instrument, Span}; use fluence_keypair::KeyPair; -use futures::future::BoxFuture; -use futures::FutureExt; -use tracing::{Instrument, Span}; - use fluence_libp2p::PeerId; use particle_execution::{ParticleFunctionStatic, ServiceFunction}; -use particle_protocol::Particle; +use particle_protocol::{ExtendedParticle, Particle}; use crate::deadline::Deadline; use crate::particle_effects::RoutingEffects; @@ -42,12 +40,20 @@ struct Reusables { } type AVMCallResult = FutResult<(usize, Option), RoutingEffects, InterpretationStats>; -type AVMTask = BoxFuture<'static, (Reusables, ParticleEffects, InterpretationStats)>; +type AVMTask = BoxFuture< + 'static, + ( + Reusables, + ParticleEffects, + InterpretationStats, + Arc, + ), +>; pub struct Actor { /// Particle of that actor is expired after that deadline deadline: Deadline, future: Option>, - mailbox: VecDeque, + mailbox: VecDeque, waker: Option, functions: Functions, /// Particle that's memoized on the actor creation. @@ -58,8 +64,8 @@ pub struct Actor { /// It's either `host_peer_id` or local worker peer id current_peer_id: PeerId, key_pair: KeyPair, - span: Span, data_store: Arc, + deal_id: Option, } impl Actor @@ -72,9 +78,10 @@ where functions: Functions, current_peer_id: PeerId, key_pair: KeyPair, - span: Span, data_store: Arc, + deal_id: Option, ) -> Self { + let particle = particle; Self { deadline: Deadline::from(particle), functions, @@ -83,18 +90,13 @@ where waker: None, // Clone particle without data particle: Particle { - id: particle.id.clone(), - init_peer_id: particle.init_peer_id, - timestamp: particle.timestamp, - ttl: particle.ttl, - script: particle.script.clone(), - signature: particle.signature.clone(), data: vec![], + ..particle.clone() }, current_peer_id, key_pair, - span, data_store, + deal_id, } } @@ -119,7 +121,8 @@ where self.functions.set_function(function) } - pub fn ingest(&mut self, particle: Particle) { + #[instrument(level = tracing::Level::INFO, skip_all)] + pub fn ingest(&mut self, particle: ExtendedParticle) { self.mailbox.push_back(particle); self.wake(); } @@ -143,21 +146,34 @@ where fn poll_avm_future(&mut self, cx: &mut Context<'_>) -> Option>> { if let Some(Poll::Ready(res)) = self.future.as_mut().map(|f| f.poll_unpin(cx)) { - let (reusables, effects, stats) = res; - let _entered = self.span.enter(); + let (reusables, effects, stats, parent_span) = res; + let span = tracing::info_span!( + parent: parent_span.as_ref(), + "Actor::poll_avm_future::future_ready", + particle_id= self.particle.id, + deal_id = self.deal_id + ); + let _span_guard = span.enter(); self.future.take(); let waker = cx.waker().clone(); // Schedule execution of functions - self.functions - .execute(self.particle.id.clone(), effects.call_requests, waker); + self.functions.execute( + self.particle.id.clone(), + effects.call_requests, + waker, + parent_span.clone(), + ); let effects = RoutingEffects { - particle: Particle { - data: effects.new_data, - ..self.particle.clone() - }, + particle: ExtendedParticle::linked( + Particle { + data: effects.new_data, + ..self.particle.clone() + }, + parent_span, + ), next_peers: effects.next_peers, }; return Some(Poll::Ready(FutResult { @@ -184,30 +200,39 @@ where } // Gather CallResults - let (calls, stats) = self.functions.drain(); + let (calls, stats, call_spans) = self.functions.drain(); // Take the next particle - let particle = self.mailbox.pop_front(); + let ext_particle = self.mailbox.pop_front(); - if particle.is_none() && calls.is_empty() { + if ext_particle.is_none() && calls.is_empty() { debug_assert!(stats.is_empty(), "stats must be empty if calls are empty"); // Nothing to execute, return vm return ActorPoll::Vm(vm_id, vm); } - let particle = particle.unwrap_or_else(|| { - // If mailbox is empty, then take self.particle. - // Its data is empty, so `vm` will process `calls` on the old (saved on disk) data - self.particle.clone() - }); + let particle = ext_particle + .as_ref() + .map(|p| p.particle.clone()) + .unwrap_or_else(|| { + // If mailbox is empty, then take self.particle. + // Its data is empty, so `vm` will process `calls` on the old (saved on disk) data + self.particle.clone() + }); + let waker = cx.waker().clone(); let data_store = self.data_store.clone(); let key_pair = self.key_pair.clone(); let peer_id = self.current_peer_id; + + let (async_span, linking_span) = + self.create_spans(call_spans, ext_particle, particle.id.as_str()); + self.future = Some( async move { let res = vm .execute(data_store, (particle.clone(), calls), peer_id, key_pair) + .in_current_span() .await; waker.wake(); @@ -217,15 +242,36 @@ where vm: res.runtime, }; - (reusables, res.effects, res.stats) + (reusables, res.effects, res.stats, linking_span) } - .instrument(self.span.clone()) + .instrument(async_span) .boxed(), ); self.wake(); ActorPoll::Executing(stats) } + + fn create_spans( + &self, + call_spans: Vec>, + ext_particle: Option, + particle_id: &str, + ) -> (Span, Arc) { + let async_span = tracing::info_span!( + "Actor: async AVM process particle & call results", + particle_id = particle_id, + deal_id = self.deal_id + ); + if let Some(ext_particle) = ext_particle.as_ref() { + async_span.follows_from(ext_particle.span.as_ref()); + } + for span in call_spans { + async_span.follows_from(span.as_ref()); + } + let linking_span = Arc::new(async_span.clone()); + (async_span, linking_span) + } fn wake(&self) { if let Some(waker) = &self.waker { waker.wake_by_ref(); diff --git a/aquamarine/src/aquamarine.rs b/aquamarine/src/aquamarine.rs index c4409f0a5d..7c9322a593 100644 --- a/aquamarine/src/aquamarine.rs +++ b/aquamarine/src/aquamarine.rs @@ -22,13 +22,13 @@ use std::time::Duration; use futures::StreamExt; use tokio::sync::mpsc; use tokio::task::JoinHandle; -use tracing::Instrument; +use tracing::{instrument, Instrument}; use fluence_libp2p::PeerId; use health::HealthCheckRegistry; use key_manager::KeyManager; use particle_execution::{ParticleFunctionStatic, ServiceFunction}; -use particle_protocol::Particle; +use particle_protocol::ExtendedParticle; use peer_metrics::{ParticleExecutorMetrics, VmPoolMetrics}; use crate::aqua_runtime::AquaRuntime; @@ -105,6 +105,8 @@ impl AquamarineBackend { match self.inlet.poll_recv(cx) { Poll::Ready(Some(Ingest { particle, function })) => { wake = true; + let span = tracing::info_span!(parent: particle.span.as_ref(), "Aquamarine::poll::ingest"); + let _guard = span.entered(); // set new particle to be executed // every particle that comes from the connection pool first executed on the host peer id self.plumber.ingest(particle, function, self.host_peer_id); @@ -179,12 +181,13 @@ impl AquamarineApi { } /// Send particle to the interpreters pool + #[instrument(level = tracing::Level::INFO, skip_all)] pub fn execute( self, - particle: Particle, + particle: ExtendedParticle, function: Option, ) -> impl Future> { - let particle_id = particle.id.clone(); + let particle_id = particle.particle.id.clone(); self.send_command(Ingest { particle, function }, Some(particle_id)) } @@ -227,5 +230,6 @@ impl AquamarineApi { AquamarineDied { particle_id } }) } + .in_current_span() } } diff --git a/aquamarine/src/command.rs b/aquamarine/src/command.rs index d59b98a5ba..7a54372896 100644 --- a/aquamarine/src/command.rs +++ b/aquamarine/src/command.rs @@ -14,13 +14,14 @@ * limitations under the License. */ -use particle_execution::ServiceFunction; -use particle_protocol::Particle; use std::collections::HashMap; +use particle_execution::ServiceFunction; +use particle_protocol::ExtendedParticle; + pub enum Command { Ingest { - particle: Particle, + particle: ExtendedParticle, function: Option, }, AddService { diff --git a/aquamarine/src/particle_data_store.rs b/aquamarine/src/particle_data_store.rs index 886bbe6555..cb420b0bfc 100644 --- a/aquamarine/src/particle_data_store.rs +++ b/aquamarine/src/particle_data_store.rs @@ -23,6 +23,7 @@ use avm_server::avm_runner::RawAVMOutcome; use avm_server::{AnomalyData, CallResults, ParticleParameters}; use fluence_libp2p::PeerId; use thiserror::Error; +use tracing::instrument; use crate::DataStoreError::SerializeAnomaly; use now_millis::now_ms; @@ -82,6 +83,7 @@ impl ParticleDataStore { Ok(()) } + #[instrument(level = tracing::Level::INFO, skip_all)] pub async fn store_data( &self, data: &[u8], @@ -97,6 +99,7 @@ impl ParticleDataStore { Ok(()) } + #[instrument(level = tracing::Level::INFO)] pub async fn read_data(&self, particle_id: &str, current_peer_id: &str) -> Result> { let data_path = self.data_file(particle_id, current_peer_id); let data = tokio::fs::read(&data_path).await.unwrap_or_default(); @@ -151,6 +154,7 @@ impl ParticleDataStore { } #[allow(clippy::too_many_arguments)] + #[instrument(level = tracing::Level::INFO, skip_all)] pub async fn save_anomaly_data( &self, air_script: &str, diff --git a/aquamarine/src/particle_effects.rs b/aquamarine/src/particle_effects.rs index 1a4307055a..f342e5ebd7 100644 --- a/aquamarine/src/particle_effects.rs +++ b/aquamarine/src/particle_effects.rs @@ -15,7 +15,7 @@ */ use avm_server::CallRequests; -use particle_protocol::Particle; +use particle_protocol::ExtendedParticle; use std::time::Duration; use libp2p::PeerId; @@ -65,6 +65,6 @@ impl InterpretationStats { /// Instruct to send particle to either virtual or remote peers. #[derive(Clone, Debug)] pub struct RoutingEffects { - pub particle: Particle, + pub particle: ExtendedParticle, pub next_peers: Vec, } diff --git a/aquamarine/src/particle_executor.rs b/aquamarine/src/particle_executor.rs index 50052c8663..f59bc536cc 100644 --- a/aquamarine/src/particle_executor.rs +++ b/aquamarine/src/particle_executor.rs @@ -23,6 +23,7 @@ use avm_server::avm_runner::RawAVMOutcome; use avm_server::{CallResults, ParticleParameters}; use fluence_keypair::KeyPair; use tokio::task::JoinError; +use tracing::instrument; use fluence_libp2p::PeerId; use particle_protocol::Particle; @@ -69,6 +70,7 @@ impl ParticleExecutor for RT { type Output = AVMRes; type Particle = (Particle, CallResults); + #[instrument(level = tracing::Level::INFO, skip_all)] async fn execute( mut self, data_store: Arc, @@ -105,6 +107,7 @@ impl ParticleExecutor for RT { } } +#[instrument(level = tracing::Level::INFO, skip_all)] async fn execute_with_prev_data( vm: RT, data_store: Arc, @@ -150,6 +153,7 @@ async fn execute_with_prev_data( } } +#[instrument(level = tracing::Level::INFO, skip_all)] async fn process_avm_result( data_store: Arc, current_peer_id: PeerId, @@ -228,6 +232,7 @@ where } } +#[instrument(level = tracing::Level::INFO, skip_all)] async fn avm_call<'a, RT: AquaRuntime>( mut vm: RT, current_peer_id: PeerId, diff --git a/aquamarine/src/particle_functions.rs b/aquamarine/src/particle_functions.rs index 26359ae4e0..1e57d910aa 100644 --- a/aquamarine/src/particle_functions.rs +++ b/aquamarine/src/particle_functions.rs @@ -27,7 +27,7 @@ use humantime::format_duration as pretty; use serde_json::json; use serde_json::Value as JValue; use tokio::runtime::Handle; -use tracing::Instrument; +use tracing::{instrument, Instrument, Span}; use particle_args::{Args, JError}; use particle_execution::{ @@ -55,6 +55,7 @@ pub struct SingleCallResult { call_id: u32, result: CallServiceResult, stat: SingleCallStat, + span: Arc, } pub struct Functions { @@ -63,6 +64,7 @@ pub struct Functions { function_calls: FuturesUnordered>, call_results: CallResults, call_stats: Vec, + call_spans: Vec>, particle_function: Option>>, } @@ -74,6 +76,7 @@ impl Functions { function_calls: <_>::default(), call_results: <_>::default(), call_stats: <_>::default(), + call_spans: <_>::default(), particle_function: None, } } @@ -83,6 +86,7 @@ impl Functions { while let Poll::Ready(Some(r)) = self.function_calls.poll_next_unpin(cx) { let overwritten = self.call_results.insert(r.call_id, r.result); self.call_stats.push(r.stat); + self.call_spans.push(r.span); debug_assert!( overwritten.is_none(), @@ -93,20 +97,28 @@ impl Functions { } /// Add a bunch of call requests to execution - pub fn execute(&mut self, particle_id: String, requests: CallRequests, waker: Waker) { + #[instrument(level = tracing::Level::INFO, skip_all)] + pub fn execute( + &mut self, + particle_id: String, + requests: CallRequests, + waker: Waker, + span: Arc, + ) { let futs: Vec<_> = requests .into_iter() - .map(|(id, call)| self.call(particle_id.clone(), id, call, waker.clone())) + .map(|(id, call)| self.call(particle_id.clone(), id, call, waker.clone(), span.clone())) .collect(); self.function_calls.extend(futs); } /// Retrieve all existing call results - pub fn drain(&mut self) -> (CallResults, Vec) { + pub fn drain(&mut self) -> (CallResults, Vec, Vec>) { let call_results = std::mem::take(&mut self.call_results); let stats = std::mem::take(&mut self.call_stats); + let call_spans = std::mem::take(&mut self.call_spans); - (call_results, stats) + (call_results, stats, call_spans) } pub fn set_function(&mut self, function: ServiceFunction) { @@ -120,34 +132,42 @@ impl Functions { // I see the main obstacle to cooperation in streaming results to `self.call_results`. // Streaming can be done through an MPSC channel, but it seems like an overkill. Though // maybe it's a good option. + #[instrument(level = tracing::Level::INFO, skip_all)] fn call( &self, particle_id: String, call_id: u32, call: CallRequestParams, waker: Waker, + span: Arc, ) -> BoxFuture<'static, SingleCallResult> { + let async_span = + tracing::info_span!(parent: span.as_ref(), "ParticleFunctions::call::async"); // Deserialize params let args = match Args::try_from(call) { Ok(args) => args, Err(err) => { - let result = CallServiceResult { - ret_code: 1, - result: json!(format!( - "Failed to deserialize CallRequestParams to Args: {err}" - )), - }; - let result = SingleCallResult { - call_id, - result, - stat: SingleCallStat { - call_time: None, - wait_time: None, - success: false, - kind: FunctionKind::NotHappened, - }, - }; - return async move { result }.boxed(); + return async move { + let result = CallServiceResult { + ret_code: 1, + result: json!(format!( + "Failed to deserialize CallRequestParams to Args: {err}" + )), + }; + SingleCallResult { + call_id, + result, + stat: SingleCallStat { + call_time: None, + wait_time: None, + success: false, + kind: FunctionKind::NotHappened, + }, + span, + } + } + .instrument(async_span) + .boxed(); } }; @@ -162,7 +182,6 @@ impl Functions { let params = self.particle.clone(); let builtins = self.builtins.clone(); let particle_function = self.particle_function.clone(); - let span = tracing::span!(tracing::Level::INFO, "Function"); let schedule_wait_start = Instant::now(); let result = tokio::task::Builder::new() .name(&format!( @@ -250,9 +269,10 @@ impl Functions { call_id, result, stat: stats, + span, } } - .instrument(span) + .in_current_span() .boxed() } } diff --git a/aquamarine/src/plumber.rs b/aquamarine/src/plumber.rs index 8839dbb44d..9900605b6d 100644 --- a/aquamarine/src/plumber.rs +++ b/aquamarine/src/plumber.rs @@ -26,6 +26,7 @@ use std::{ use futures::task::Waker; use tokio::task; +use tracing::instrument; use fluence_libp2p::PeerId; use key_manager::KeyManager; @@ -33,7 +34,7 @@ use key_manager::KeyManager; #[cfg(test)] use mock_time::now_ms; use particle_execution::{ParticleFunctionStatic, ParticleParams, ServiceFunction}; -use particle_protocol::Particle; +use particle_protocol::ExtendedParticle; use peer_metrics::ParticleExecutorMetrics; /// Get current time from OS #[cfg(not(test))] @@ -86,56 +87,65 @@ impl Plumber { } /// Receives and ingests incoming particle: creates a new actor or forwards to the existing mailbox + #[instrument(level = tracing::Level::INFO, skip_all)] pub fn ingest( &mut self, - particle: Particle, + particle: ExtendedParticle, function: Option, worker_id: PeerId, ) { self.wake(); - let deadline = Deadline::from(&particle); + let deadline = Deadline::from(particle.as_ref()); if deadline.is_expired(now_ms()) { - tracing::info!(target: "expired", particle_id = particle.id, "Particle is expired"); + tracing::info!(target: "expired", particle_id = particle.particle.id, "Particle is expired"); self.events .push_back(Err(AquamarineApiError::ParticleExpired { - particle_id: particle.id, + particle_id: particle.particle.id, })); return; } - if let Err(err) = particle.verify() { - tracing::warn!(target: "signature", particle_id = particle.id, "Particle signature verification failed: {err:?}"); + if let Err(err) = particle.particle.verify() { + tracing::warn!(target: "signature", particle_id = particle.particle.id, "Particle signature verification failed: {err:?}"); self.events .push_back(Err(AquamarineApiError::SignatureVerificationFailed { - particle_id: particle.id, + particle_id: particle.particle.id, err, })); return; } if !self.key_manager.is_worker_active(worker_id) - && !self.key_manager.is_management(particle.init_peer_id) + && !self + .key_manager + .is_management(particle.particle.init_peer_id) { - tracing::trace!(target: "worker_inactive", particle_id = particle.id, worker_id = worker_id.to_string(), "Worker is not active"); + tracing::trace!(target: "worker_inactive", particle_id = particle.particle.id, worker_id = worker_id.to_string(), "Worker is not active"); return; } let builtins = &self.builtins; - let key = (ParticleId(particle.signature.clone()), worker_id); + let key = (ParticleId(particle.particle.signature.clone()), worker_id); let entry = self.actors.entry(key); let actor = match entry { Entry::Occupied(actor) => Ok(actor.into_mut()), Entry::Vacant(entry) => { - let params = ParticleParams::clone_from(&particle, worker_id); + let params = ParticleParams::clone_from(particle.as_ref(), worker_id); let functions = Functions::new(params, builtins.clone()); let key_pair = self.key_manager.get_worker_keypair(worker_id); let deal_id = self.key_manager.get_deal_id(worker_id).ok(); let data_store = self.data_store.clone(); key_pair.map(|kp| { - let span = tracing::info_span!("Actor", deal_id = deal_id); - let actor = Actor::new(&particle, functions, worker_id, kp, span, data_store); + let actor = Actor::new( + particle.as_ref(), + functions, + worker_id, + kp, + data_store, + deal_id, + ); entry.insert(actor) }) } @@ -153,7 +163,7 @@ impl Plumber { Err(err) => log::warn!( "No such worker {}, rejected particle {}: {:?}", worker_id, - particle.id, + particle.particle.id, err ), } @@ -309,6 +319,8 @@ impl Plumber { for effect in local_effects { for local_peer in effect.next_peers { + let span = tracing::info_span!(parent: effect.particle.span.as_ref(), "Plumber: routing effect ingest"); + let _guard = span.enter(); self.ingest(effect.particle.clone(), None, local_peer); } } @@ -358,7 +370,7 @@ mod tests { use particle_args::Args; use particle_execution::{FunctionOutcome, ParticleFunction, ParticleParams, ServiceFunction}; - use particle_protocol::Particle; + use particle_protocol::{ExtendedParticle, Particle}; use crate::deadline::Deadline; use crate::plumber::mock_time::set_mock_time; @@ -368,6 +380,7 @@ mod tests { use crate::{AquaRuntime, ParticleDataStore, ParticleEffects, Plumber}; use async_trait::async_trait; use avm_server::avm_runner::RawAVMOutcome; + use tracing::Span; struct MockF; @@ -489,7 +502,11 @@ mod tests { let deadline = Deadline::from(&particle); assert!(!deadline.is_expired(now_ms())); - plumber.ingest(particle, None, RandomPeerId::random()); + plumber.ingest( + ExtendedParticle::new(particle, Span::none()), + None, + RandomPeerId::random(), + ); assert_eq!(plumber.actors.len(), 1); let mut cx = context(); @@ -522,7 +539,11 @@ mod tests { let deadline = Deadline::from(&particle); assert!(deadline.is_expired(now_ms())); - plumber.ingest(particle.clone(), None, RandomPeerId::random()); + plumber.ingest( + ExtendedParticle::new(particle.clone(), Span::none()), + None, + RandomPeerId::random(), + ); assert_eq!(plumber.actors.len(), 0); diff --git a/connection-pool/src/api.rs b/connection-pool/src/api.rs index b5759a056a..79fce9a952 100644 --- a/connection-pool/src/api.rs +++ b/connection-pool/src/api.rs @@ -21,7 +21,7 @@ use libp2p::{core::Multiaddr, PeerId}; use tokio::sync::{mpsc, oneshot}; use tokio_stream::wrappers::UnboundedReceiverStream; -use particle_protocol::Particle; +use particle_protocol::ExtendedParticle; use particle_protocol::{Contact, SendStatus}; use crate::connection_pool::LifecycleEvent; @@ -36,7 +36,7 @@ pub enum Command { }, Send { to: Contact, - particle: Particle, + particle: ExtendedParticle, out: oneshot::Sender, }, Dial { @@ -111,7 +111,7 @@ impl ConnectionPoolT for ConnectionPoolApi { self.execute(|out| Command::GetContact { peer_id, out }) } - fn send(&self, to: Contact, particle: Particle) -> BoxFuture<'static, SendStatus> { + fn send(&self, to: Contact, particle: ExtendedParticle) -> BoxFuture<'static, SendStatus> { let fut = self.execute(|out| Command::Send { to, particle, out }); // timeout on send is required because libp2p can silently drop outbound events let timeout = self.send_timeout; diff --git a/connection-pool/src/behaviour.rs b/connection-pool/src/behaviour.rs index 3c5b4cde30..5301193034 100644 --- a/connection-pool/src/behaviour.rs +++ b/connection-pool/src/behaviour.rs @@ -36,15 +36,14 @@ use tokio::sync::{mpsc, oneshot}; use tokio_stream::wrappers::UnboundedReceiverStream; use tokio_util::sync::PollSender; +use crate::connection_pool::LifecycleEvent; +use crate::{Command, ConnectionPoolApi}; use fluence_libp2p::remote_multiaddr; use particle_protocol::{ - CompletionChannel, Contact, HandlerMessage, Particle, ProtocolConfig, SendStatus, + CompletionChannel, Contact, ExtendedParticle, HandlerMessage, ProtocolConfig, SendStatus, }; use peer_metrics::ConnectionPoolMetrics; -use crate::connection_pool::LifecycleEvent; -use crate::{Command, ConnectionPoolApi}; - // type SwarmEventType = generate_swarm_event_type!(ConnectionPoolBehaviour); // TODO: replace with generate_swarm_event_type @@ -103,10 +102,10 @@ pub struct ConnectionPoolBehaviour { commands: UnboundedReceiverStream, - outlet: PollSender, + outlet: PollSender, subscribers: Vec>, - queue: VecDeque, + queue: VecDeque, contacts: HashMap, dialing: HashMap>>>, @@ -211,23 +210,40 @@ impl ConnectionPoolBehaviour { /// Sends a particle to a connected contact. Returns whether sending succeeded or not /// Result is sent to channel inside `upgrade_outbound` in ProtocolHandler - pub fn send(&mut self, to: Contact, particle: Particle, outlet: oneshot::Sender) { + pub fn send( + &mut self, + to: Contact, + particle: ExtendedParticle, + outlet: oneshot::Sender, + ) { + let span = + tracing::info_span!(parent: particle.span.as_ref(), "ConnectionPool::Behaviour::send"); + let _guard = span.enter(); if to.peer_id == self.peer_id { // If particle is sent to the current node, process it locally self.queue.push_back(particle); outlet.send(SendStatus::Ok).ok(); self.wake(); } else if self.contacts.contains_key(&to.peer_id) { - tracing::debug!(target: "network",particle_id = particle.id , "{}: Sending particle to {}", self.peer_id, to.peer_id); + tracing::debug!( + target: "network", + particle_id = particle.particle.id , + "{}: Sending particle to {}", + self.peer_id, + to.peer_id + ); // Send particle to remote peer self.push_event(ToSwarm::NotifyHandler { peer_id: to.peer_id, handler: NotifyHandler::Any, - event: HandlerMessage::OutParticle(particle, CompletionChannel::Oneshot(outlet)), + event: HandlerMessage::OutParticle( + particle.particle, + CompletionChannel::Oneshot(outlet), + ), }); } else { tracing::warn!( - particle_id = particle.id, + particle_id = particle.particle.id, "Won't send particle to contact {}: not connected", to.peer_id ); @@ -264,7 +280,7 @@ impl ConnectionPoolBehaviour { protocol_config: ProtocolConfig, peer_id: PeerId, metrics: Option, - ) -> (Self, mpsc::Receiver, ConnectionPoolApi) { + ) -> (Self, mpsc::Receiver, ConnectionPoolApi) { let (outlet, inlet) = mpsc::channel(buffer); let outlet = PollSender::new(outlet); let (command_outlet, command_inlet) = mpsc::unbounded_channel(); @@ -589,7 +605,11 @@ impl NetworkBehaviour for ConnectionPoolBehaviour { FromSwarm::NewExternalAddrCandidate(_) => {} FromSwarm::ExternalAddrConfirmed(_) => {} FromSwarm::ExternalAddrExpired(_) => {} - _ => {} + e => { + tracing::warn!("Unexpected event {:?}", e); + #[cfg(test)] + panic!("Unexpected event") + } } } @@ -602,6 +622,8 @@ impl NetworkBehaviour for ConnectionPoolBehaviour { match event { Ok(HandlerMessage::InParticle(particle)) => { tracing::info!(target: "network", particle_id = particle.id,"{}: received particle from {}; queue {}", self.peer_id, from, self.queue.len()); + let root_span = tracing::info_span!("Particle", particle_id = particle.id); + self.meter(|m| { m.incoming_particle( &particle.id, @@ -609,7 +631,8 @@ impl NetworkBehaviour for ConnectionPoolBehaviour { particle.data.len() as f64, ) }); - self.queue.push_back(particle); + self.queue + .push_back(ExtendedParticle::new(particle, root_span)); self.wake(); } Ok(HandlerMessage::InboundUpgradeError(err)) => log::warn!("UpgradeError: {:?}", err), @@ -629,7 +652,8 @@ impl NetworkBehaviour for ConnectionPoolBehaviour { Poll::Ready(Ok(_)) => { // channel is ready to consume more particles, so send them if let Some(particle) = self.queue.pop_front() { - let particle_id = particle.id.clone(); + let particle_id = particle.particle.id.clone(); + if let Err(err) = outlet.start_send(particle) { tracing::error!( particle_id = particle_id, diff --git a/connection-pool/src/connection_pool.rs b/connection-pool/src/connection_pool.rs index 5ee0597974..6409b9b7a0 100644 --- a/connection-pool/src/connection_pool.rs +++ b/connection-pool/src/connection_pool.rs @@ -19,7 +19,7 @@ use std::fmt::{Display, Formatter}; use futures::{future::BoxFuture, stream::BoxStream}; use libp2p::{core::Multiaddr, PeerId}; -use particle_protocol::{Contact, Particle, SendStatus}; +use particle_protocol::{Contact, ExtendedParticle, SendStatus}; #[derive(Debug, Clone)] pub enum LifecycleEvent { @@ -42,7 +42,7 @@ pub trait ConnectionPoolT { fn disconnect(&self, peer_id: PeerId) -> BoxFuture<'static, bool>; fn is_connected(&self, peer_id: PeerId) -> BoxFuture<'static, bool>; fn get_contact(&self, peer_id: PeerId) -> BoxFuture<'static, Option>; - fn send(&self, to: Contact, particle: Particle) -> BoxFuture<'static, SendStatus>; + fn send(&self, to: Contact, particle: ExtendedParticle) -> BoxFuture<'static, SendStatus>; fn count_connections(&self) -> BoxFuture<'static, usize>; fn lifecycle_events(&self) -> BoxStream<'static, LifecycleEvent>; } diff --git a/crates/nox-tests/tests/client_api.rs b/crates/nox-tests/tests/client_api.rs index 220a440f01..aa358ae698 100644 --- a/crates/nox-tests/tests/client_api.rs +++ b/crates/nox-tests/tests/client_api.rs @@ -21,11 +21,12 @@ use futures::future::BoxFuture; use futures::FutureExt; use maplit::hashmap; use serde_json::json; +use tracing::Span; use created_swarm::make_swarms; use now_millis::now_ms; use particle_execution::FunctionOutcome; -use particle_protocol::Particle; +use particle_protocol::{ExtendedParticle, Particle}; use test_constants::PARTICLE_TTL; use test_utils::timeout; use uuid_utils::uuid; @@ -93,7 +94,10 @@ async fn call_custom_service() { data: vec![], }; - let exec_f = swarms[1].aquamarine_api.clone().execute(particle, None); + let exec_f = swarms[1] + .aquamarine_api + .clone() + .execute(ExtendedParticle::new(particle, Span::none()), None); let result = timeout(Duration::from_secs(30), async move { add_first_f.await.expect("add_first_f"); diff --git a/crates/system-services/src/lib.rs b/crates/system-services/src/lib.rs index 2da32a1728..d5b545131b 100644 --- a/crates/system-services/src/lib.rs +++ b/crates/system-services/src/lib.rs @@ -55,7 +55,7 @@ pub struct PackageDistro { pub init: Option>, } -impl std::fmt::Debug for PackageDistro { +impl fmt::Debug for PackageDistro { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("PackageDistro") .field("name", &self.name) diff --git a/nox/Cargo.toml b/nox/Cargo.toml index c5e5bef586..13172636b3 100644 --- a/nox/Cargo.toml +++ b/nox/Cargo.toml @@ -67,6 +67,7 @@ opentelemetry-otlp = "0.14.0" opentelemetry-stdout = { version = "0.2.0", features = ["trace"] } once_cell = { workspace = true } config = "0.13.4" +tonic = "0.9.2" [dev-dependencies] parking_lot = { workspace = true } diff --git a/nox/src/behaviour/network.rs b/nox/src/behaviour/network.rs index ff0aeb4ba3..ea0faae2f9 100644 --- a/nox/src/behaviour/network.rs +++ b/nox/src/behaviour/network.rs @@ -25,7 +25,7 @@ use tokio::sync::mpsc; use connection_pool::ConnectionPoolBehaviour; use health::HealthCheckRegistry; use kademlia::{Kademlia, KademliaConfig}; -use particle_protocol::{Particle, PROTOCOL_NAME}; +use particle_protocol::{ExtendedParticle, PROTOCOL_NAME}; use server_config::NetworkConfig; use crate::connectivity::Connectivity; @@ -45,7 +45,7 @@ impl FluenceNetworkBehaviour { pub fn new( cfg: NetworkConfig, health_registry: Option<&mut HealthCheckRegistry>, - ) -> (Self, Connectivity, mpsc::Receiver) { + ) -> (Self, Connectivity, mpsc::Receiver) { let local_public_key = cfg.key_pair.public(); let identify = Identify::new( IdentifyConfig::new(PROTOCOL_NAME.into(), local_public_key) diff --git a/nox/src/connectivity.rs b/nox/src/connectivity.rs index fbcd06439c..6cb7a5e7eb 100644 --- a/nox/src/connectivity.rs +++ b/nox/src/connectivity.rs @@ -25,10 +25,10 @@ use futures::{stream::iter, StreamExt}; use humantime_serde::re::humantime::format_duration as pretty; use kademlia::{KademliaApi, KademliaApiT, KademliaError}; use libp2p::Multiaddr; -use particle_protocol::{Contact, Particle, SendStatus}; +use particle_protocol::{Contact, ExtendedParticle, SendStatus}; use peer_metrics::{ConnectivityMetrics, Resolution}; use tokio::time::sleep; -use tracing::{Instrument, Span}; +use tracing::{instrument, Instrument, Span}; use crate::tasks::Tasks; @@ -62,6 +62,7 @@ impl Connectivity { Tasks::new("Connectivity", vec![run_bootstrap, reconnect_bootstraps]) } + #[instrument(level = tracing::Level::INFO, skip_all)] pub async fn resolve_contact(&self, target: PeerId, particle_id: &str) -> Option { let metrics = self.metrics.as_ref(); let contact = self.connection_pool.get_contact(target).await; @@ -124,10 +125,15 @@ impl Connectivity { None } - pub async fn send(&self, contact: Contact, particle: Particle) -> bool { - tracing::debug!(particle_id = particle.id, "Sending particle to {}", contact); + #[instrument(level = tracing::Level::INFO, skip_all)] + pub async fn send(&self, contact: Contact, particle: ExtendedParticle) -> bool { + tracing::debug!( + particle_id = particle.particle.id, + "Sending particle to {}", + contact + ); let metrics = self.metrics.as_ref(); - let id = particle.id.clone(); + let id = particle.particle.id.clone(); let sent = self.connection_pool.send(contact.clone(), particle).await; match &sent { SendStatus::Ok => { diff --git a/nox/src/dispatcher.rs b/nox/src/dispatcher.rs index e3dd7ea0ec..8bbfef5ec0 100644 --- a/nox/src/dispatcher.rs +++ b/nox/src/dispatcher.rs @@ -14,15 +14,16 @@ * limitations under the License. */ -use aquamarine::{AquamarineApi, AquamarineApiError, RoutingEffects}; -use fluence_libp2p::PeerId; use futures::{FutureExt, StreamExt}; -use particle_protocol::Particle; -use peer_metrics::DispatcherMetrics; use prometheus_client::registry::Registry; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; -use tracing::Instrument; +use tracing::{instrument, Instrument}; + +use aquamarine::{AquamarineApi, AquamarineApiError, RoutingEffects}; +use fluence_libp2p::PeerId; +use particle_protocol::{ExtendedParticle, Particle}; +use peer_metrics::DispatcherMetrics; use crate::effectors::Effectors; use crate::tasks::Tasks; @@ -61,7 +62,7 @@ impl Dispatcher { impl Dispatcher { pub fn start( self, - particle_stream: mpsc::Receiver, + particle_stream: mpsc::Receiver, effects_stream: mpsc::Receiver, ) -> Tasks { log::info!("starting dispatcher"); @@ -85,31 +86,37 @@ impl Dispatcher { pub async fn process_particles(self, particle_stream: Src) where - Src: futures::Stream + Unpin + Send + Sync + 'static, + Src: futures::Stream + Unpin + Send + Sync + 'static, { let parallelism = self.particle_parallelism; let aquamarine = self.aquamarine; let metrics = self.metrics; particle_stream - .for_each_concurrent(parallelism, move |particle| { + .for_each_concurrent(parallelism, move |ext_particle| { + let current_span = tracing::info_span!(parent: ext_particle.span.as_ref(), "Dispatcher::process_particles::for_each"); + let _ = current_span.enter(); + let async_span = tracing::info_span!("Dispatcher::process_particles::async"); let aquamarine = aquamarine.clone(); let metrics = metrics.clone(); + let particle: &Particle = ext_particle.as_ref(); if particle.is_expired() { + let particle_id = &particle.id.as_str(); if let Some(m) = metrics { - m.particle_expired(&particle.id); + m.particle_expired(particle_id); } - tracing::info!(target: "expired", particle_id = particle.id, "Particle is expired"); + tracing::info!(target: "expired", particle_id = particle_id, "Particle is expired"); return async {}.boxed(); } async move { aquamarine - .execute(particle, None) + .execute(ext_particle, None) // do not log errors: Aquamarine will log them fine .map(|_| ()) .await } + .instrument(async_span) .boxed() }) .await; @@ -117,6 +124,7 @@ impl Dispatcher { log::error!("Particle stream has ended"); } + #[instrument(level = tracing::Level::INFO, skip_all)] async fn process_effects(self, effects_stream: Src) where Src: futures::Stream + Unpin + Send + Sync + 'static, @@ -130,8 +138,9 @@ impl Dispatcher { async move { match effects { Ok(effects) => { + let async_span = tracing::info_span!(parent: effects.particle.span.as_ref(), "Dispatcher::effectors::execute"); // perform effects as instructed by aquamarine - effectors.execute(effects).await; + effectors.execute(effects).instrument(async_span).await; } Err(err) => { // particles are sent in fire and forget fashion, so diff --git a/nox/src/effectors.rs b/nox/src/effectors.rs index 61ef491d34..d4533c394f 100644 --- a/nox/src/effectors.rs +++ b/nox/src/effectors.rs @@ -15,8 +15,10 @@ */ use futures::{stream::iter, StreamExt}; +use tracing::instrument; use aquamarine::RoutingEffects; +use particle_protocol::Particle; use crate::connectivity::Connectivity; @@ -31,9 +33,11 @@ impl Effectors { } /// Perform effects that Aquamarine instructed us to + #[instrument(level = tracing::Level::INFO, skip_all)] pub async fn execute(self, effects: RoutingEffects) { - if effects.particle.is_expired() { - tracing::info!(target: "expired", particle_id = effects.particle.id, "Particle is expired"); + let particle: &Particle = effects.particle.as_ref(); + if particle.is_expired() { + tracing::info!(target: "expired", particle_id = particle.id, "Particle is expired"); return; } @@ -46,7 +50,10 @@ impl Effectors { let particle = particle.clone(); async move { // resolve contact - if let Some(contact) = connectivity.resolve_contact(target, &particle.id).await { + if let Some(contact) = connectivity + .resolve_contact(target, &particle.as_ref()) + .await + { // forward particle let sent = connectivity.send(contact, particle).await; if sent { diff --git a/nox/src/main.rs b/nox/src/main.rs index be4ec3bfde..57293f6968 100644 --- a/nox/src/main.rs +++ b/nox/src/main.rs @@ -27,7 +27,6 @@ )] use base64::{engine::general_purpose::STANDARD as base64, Engine}; - use eyre::WrapErr; use libp2p::PeerId; use tokio::signal; diff --git a/nox/src/node.rs b/nox/src/node.rs index e1d0c819fd..afd3637dd9 100644 --- a/nox/src/node.rs +++ b/nox/src/node.rs @@ -40,7 +40,7 @@ use libp2p_connection_limits::ConnectionLimits; use libp2p_metrics::{Metrics, Recorder}; use particle_builtins::{Builtins, CustomService, NodeInfo}; use particle_execution::ParticleFunctionStatic; -use particle_protocol::Particle; +use particle_protocol::ExtendedParticle; use peer_metrics::{ ConnectionPoolMetrics, ConnectivityMetrics, ParticleExecutorMetrics, ServicesMetrics, ServicesMetricsBackend, SpellMetrics, VmPoolMetrics, @@ -67,7 +67,7 @@ use crate::metrics::TokioCollector; // TODO: documentation pub struct Node { - particle_stream: mpsc::Receiver, + particle_stream: mpsc::Receiver, effects_stream: mpsc::Receiver>, pub swarm: Swarm, @@ -357,7 +357,7 @@ impl Node { ) -> eyre::Result<( Swarm, Connectivity, - mpsc::Receiver, + mpsc::Receiver, )> { let connection_idle_timeout = network_config.connection_idle_timeout; @@ -405,7 +405,7 @@ pub struct StartedNode { impl Node { #[allow(clippy::too_many_arguments)] pub fn with( - particle_stream: mpsc::Receiver, + particle_stream: mpsc::Receiver, effects_stream: mpsc::Receiver>, swarm: Swarm, connectivity: Connectivity, diff --git a/particle-protocol/Cargo.toml b/particle-protocol/Cargo.toml index 3e33db2d54..08628be641 100644 --- a/particle-protocol/Cargo.toml +++ b/particle-protocol/Cargo.toml @@ -24,6 +24,7 @@ tokio = { workspace = true, features = ["sync"] } faster-hex = { workspace = true } asynchronous-codec = { version = "0.7.0", features = ["json"] } unsigned-varint = { version = "0.8.0", features = ["codec", "asynchronous_codec"] } +tracing = { workspace = true } [dev-dependencies] rand = "0.8.5" diff --git a/particle-protocol/src/lib.rs b/particle-protocol/src/lib.rs index a4d1fe8423..9c674735c5 100644 --- a/particle-protocol/src/lib.rs +++ b/particle-protocol/src/lib.rs @@ -42,6 +42,7 @@ pub use libp2p_protocol::message::CompletionChannel; pub use libp2p_protocol::message::SendStatus; pub use libp2p_protocol::message::{HandlerMessage, ProtocolMessage}; pub use libp2p_protocol::upgrade::ProtocolConfig; +pub use particle::ExtendedParticle; pub use particle::Particle; pub const PROTOCOL_NAME: &str = "/fluence/particle/2.0.0"; diff --git a/particle-protocol/src/particle.rs b/particle-protocol/src/particle.rs index a3daf6e7bf..6137aba243 100644 --- a/particle-protocol/src/particle.rs +++ b/particle-protocol/src/particle.rs @@ -15,21 +15,57 @@ */ use std::convert::TryInto; +use std::sync::Arc; use std::time::Duration; use derivative::Derivative; -use fluence_keypair::{KeyPair, PublicKey, Signature}; use libp2p::PeerId; use serde::{Deserialize, Serialize}; +use tracing::Span; use crate::error::ParticleError; use crate::error::ParticleError::{ DecodingError, InvalidKeypair, SignatureVerificationFailed, SigningFailed, }; +use fluence_keypair::{KeyPair, PublicKey, Signature}; use fluence_libp2p::{peerid_serializer, RandomPeerId}; use json_utils::base64_serde; use now_millis::now_ms; +#[derive(Clone, Debug)] +pub struct ExtendedParticle { + pub particle: Particle, + pub span: Arc, +} + +impl AsRef for ExtendedParticle { + fn as_ref(&self) -> &Particle { + &self.particle + } +} + +impl AsRef for ExtendedParticle { + fn as_ref(&self) -> &str { + &self.particle.id + } +} + +impl ExtendedParticle { + pub fn new(particle: Particle, span: Span) -> Self { + Self { + particle, + span: Arc::new(span), + } + } + + pub fn linked(particle: Particle, span: Arc) -> Self { + Self { + particle, + span: span.clone(), + } + } +} + #[derive(Clone, Serialize, Deserialize, PartialEq, Derivative)] #[derivative(Debug)] pub struct Particle { diff --git a/sorcerer/src/script_executor.rs b/sorcerer/src/script_executor.rs index 8bcd2bdf60..a5db753be0 100644 --- a/sorcerer/src/script_executor.rs +++ b/sorcerer/src/script_executor.rs @@ -13,17 +13,18 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -use fluence_libp2p::PeerId; +use std::sync::Arc; +use tracing::{instrument, Span}; use crate::error::SorcererError::{ParticleSigningFailed, ScopeKeypairMissing}; +use crate::Sorcerer; +use fluence_libp2p::PeerId; use now_millis::now_ms; use particle_args::JError; -use particle_protocol::Particle; +use particle_protocol::{ExtendedParticle, Particle}; use spell_event_bus::api::{TriggerEvent, TriggerInfoAqua}; use spell_service_api::CallParams; -use crate::Sorcerer; - impl Sorcerer { fn get_spell_counter(&self, spell_id: String, worker_id: PeerId) -> Result { let params = CallParams::local(spell_id, worker_id, self.spell_script_particle_ttl); @@ -52,6 +53,7 @@ impl Sorcerer { .map_err(|e| JError::new(e.to_string())) } + #[instrument(level = tracing::Level::INFO, skip_all)] pub(crate) fn make_spell_particle( &self, spell_id: String, @@ -97,7 +99,8 @@ impl Sorcerer { .map_err(|e| JError::new(e.to_string())) } - pub async fn execute_script(&self, event: TriggerEvent) { + #[instrument(level = tracing::Level::INFO, skip_all)] + pub async fn execute_script(&self, event: TriggerEvent, span: Arc) { let error: Result<(), JError> = try { let worker_id = self.services.get_service_owner( "", @@ -110,15 +113,19 @@ impl Sorcerer { if let Some(m) = &self.spell_metrics { m.observe_spell_cast(); } - self.aquamarine.clone().execute(particle, None).await?; + + self.aquamarine + .clone() + .execute(ExtendedParticle::linked(particle, span), None) + .await?; }; if let Err(err) = error { log::warn!( - "Failed to execute spell script id: {}, event: {:?}, error: {:?}", - event.spell_id, + "Failed to execute spell script id: {spell_id}, event: {:?}, error: {:?}", event.info, - err + err, + spell_id = event.spell_id.to_string(), ); } } diff --git a/sorcerer/src/sorcerer.rs b/sorcerer/src/sorcerer.rs index 04f72a84e5..c752652dce 100644 --- a/sorcerer/src/sorcerer.rs +++ b/sorcerer/src/sorcerer.rs @@ -15,6 +15,7 @@ */ use std::collections::HashMap; +use std::sync::Arc; use std::time::Duration; use futures::{FutureExt, StreamExt}; @@ -43,6 +44,7 @@ use server_config::ResolvedConfig; use spell_event_bus::api::{from_user_config, SpellEventBusApi, TriggerEvent}; use spell_service_api::{CallParams, SpellServiceApi}; use spell_storage::SpellStorage; +use tracing::Instrument; #[derive(Clone)] pub struct Sorcerer { @@ -143,11 +145,24 @@ impl Sorcerer { let spell_events_stream = UnboundedReceiverStream::new(spell_events_receiver); spell_events_stream .for_each_concurrent(None, move |spell_event| { + let root_span = tracing::info_span!( + "Sorcerer::task::for_each", + spell_id = spell_event.spell_id.to_string() + ); + let root_span = Arc::new(root_span); + let async_span = tracing::info_span!(parent: root_span.as_ref(), + "Sorcerer::task::execute_script", + spell_id = spell_event.spell_id.to_string()); + let sorcerer = self.clone(); // Note that the event that triggered the spell is in `spell_event.event` async move { - sorcerer.execute_script(spell_event).await; + sorcerer + .execute_script(spell_event, root_span) + .in_current_span() + .await; } + .instrument(async_span) }) .await; })