From ef967d234f21f424a174adcaec5f06e9a0aef356 Mon Sep 17 00:00:00 2001 From: Nick Date: Wed, 22 Nov 2023 20:20:22 +0300 Subject: [PATCH 01/33] Fix behaviour --- Cargo.lock | 1 + connection-pool/src/behaviour.rs | 10 +++++++++- particle-protocol/Cargo.toml | 1 + 3 files changed, 11 insertions(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index 0e79fd9b1c..2ba2af94a7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5154,6 +5154,7 @@ dependencies = [ "serde_json", "thiserror", "tokio", + "tracing", "unsigned-varint 0.8.0", ] diff --git a/connection-pool/src/behaviour.rs b/connection-pool/src/behaviour.rs index 3c5b4cde30..cb097dd4c6 100644 --- a/connection-pool/src/behaviour.rs +++ b/connection-pool/src/behaviour.rs @@ -589,7 +589,11 @@ impl NetworkBehaviour for ConnectionPoolBehaviour { FromSwarm::NewExternalAddrCandidate(_) => {} FromSwarm::ExternalAddrConfirmed(_) => {} FromSwarm::ExternalAddrExpired(_) => {} - _ => {} + e => { + tracing::warn!("Unexpected event {:?}", e); + #[cfg(test)] + panic!("Unexpected event") + } } } @@ -602,6 +606,10 @@ impl NetworkBehaviour for ConnectionPoolBehaviour { match event { Ok(HandlerMessage::InParticle(particle)) => { tracing::info!(target: "network", particle_id = particle.id,"{}: received particle from {}; queue {}", self.peer_id, from, self.queue.len()); + let parent_span = + tracing::info_span!("Particle", particle_id = particle.id); + let span = tracing::info_span!(parent: &parent_span, "Inbound particle", particle_id = particle.id); + let _span_guard = span.entered(); self.meter(|m| { m.incoming_particle( &particle.id, diff --git a/particle-protocol/Cargo.toml b/particle-protocol/Cargo.toml index 3e33db2d54..08628be641 100644 --- a/particle-protocol/Cargo.toml +++ b/particle-protocol/Cargo.toml @@ -24,6 +24,7 @@ tokio = { workspace = true, features = ["sync"] } faster-hex = { workspace = true } asynchronous-codec = { version = "0.7.0", features = ["json"] } unsigned-varint = { version = "0.8.0", features = ["codec", "asynchronous_codec"] } +tracing = { workspace = true } [dev-dependencies] rand = "0.8.5" From 2f3320d3f6136da94a2523cd51fe36d3eed3812f Mon Sep 17 00:00:00 2001 From: Nick Date: Fri, 24 Nov 2023 14:48:11 +0300 Subject: [PATCH 02/33] wip --- aquamarine/src/actor.rs | 77 ++++++++++++++++---------- aquamarine/src/aquamarine.rs | 13 +++-- aquamarine/src/command.rs | 4 +- aquamarine/src/particle_effects.rs | 4 +- aquamarine/src/particle_executor.rs | 8 ++- aquamarine/src/particle_functions.rs | 3 +- aquamarine/src/plumber.rs | 29 +++++----- aquamarine/src/vm_pool.rs | 2 +- connection-pool/src/api.rs | 8 ++- connection-pool/src/behaviour.rs | 55 +++++++++++------- connection-pool/src/connection_pool.rs | 4 +- nox/src/behaviour/network.rs | 4 +- nox/src/connectivity.rs | 12 ++-- nox/src/dispatcher.rs | 22 +++++--- nox/src/effectors.rs | 8 ++- nox/src/node.rs | 8 +-- particle-protocol/src/lib.rs | 1 + particle-protocol/src/particle.rs | 8 +++ sorcerer/src/script_executor.rs | 21 ++++++- sorcerer/src/sorcerer.rs | 7 +++ 20 files changed, 196 insertions(+), 102 deletions(-) diff --git a/aquamarine/src/actor.rs b/aquamarine/src/actor.rs index a8700f1a11..4909c67eb4 100644 --- a/aquamarine/src/actor.rs +++ b/aquamarine/src/actor.rs @@ -18,15 +18,16 @@ use std::{ collections::VecDeque, task::{Context, Poll, Waker}, }; +use std::sync::Arc; use fluence_keypair::KeyPair; use futures::future::BoxFuture; use futures::FutureExt; -use tracing::{Instrument, Span}; +use tracing::{instrument, Instrument}; use fluence_libp2p::PeerId; use particle_execution::{ParticleFunctionStatic, ServiceFunction}; -use particle_protocol::Particle; +use particle_protocol::{ExtendedParticle, Particle}; use crate::deadline::Deadline; use crate::particle_effects::RoutingEffects; @@ -42,11 +43,21 @@ struct Reusables { pub struct Actor { /// Particle of that actor is expired after that deadline deadline: Deadline, - future: Option, ParticleEffects, InterpretationStats)>>, - mailbox: VecDeque, + future: Option< + BoxFuture< + 'static, + ( + Reusables, + ParticleEffects, + InterpretationStats, + ExtendedParticle + ), + >, + >, + mailbox: VecDeque, waker: Option, functions: Functions, - /// Particle that's memoized on the actor creation. + /// Particle that's memoized on the actor creation. /// Used to execute CallRequests when mailbox is empty. /// Particle's data is empty. particle: Particle, @@ -54,7 +65,7 @@ pub struct Actor { /// It's either `host_peer_id` or local worker peer id current_peer_id: PeerId, key_pair: KeyPair, - span: Span, + _deal_id: Option, //TODO: fix } impl Actor @@ -63,31 +74,31 @@ where F: ParticleFunctionStatic, { pub fn new( - particle: &Particle, + particle: &ExtendedParticle, functions: Functions, current_peer_id: PeerId, key_pair: KeyPair, - span: Span, + deal_id: Option, ) -> Self { Self { - deadline: Deadline::from(particle), + deadline: Deadline::from(&particle.particle), functions, future: None, mailbox: <_>::default(), waker: None, - // Clone particle without data + // Clone particle without data particle: Particle { - id: particle.id.clone(), - init_peer_id: particle.init_peer_id, - timestamp: particle.timestamp, - ttl: particle.ttl, - script: particle.script.clone(), - signature: particle.signature.clone(), + id: particle.particle.id.clone(), + init_peer_id: particle.particle.init_peer_id, + timestamp: particle.particle.timestamp, + ttl: particle.particle.ttl, + script: particle.particle.script.clone(), + signature: particle.particle.signature.clone(), data: vec![], }, current_peer_id, key_pair, - span, + _deal_id: deal_id, } } @@ -124,7 +135,8 @@ where self.functions.set_function(function) } - pub fn ingest(&mut self, particle: Particle) { + #[instrument(level = tracing::Level::INFO, skip_all)] + pub fn ingest(&mut self, particle: ExtendedParticle) { self.mailbox.push_back(particle); self.wake(); } @@ -141,22 +153,27 @@ where self.functions.poll(cx); // Poll AquaVM future - if let Some(Ready((reusables, effects, stats))) = + if let Some(Ready((reusables, effects, stats, particle))) = self.future.as_mut().map(|f| f.poll_unpin(cx)) { - let _entered = self.span.enter(); + let _span = + tracing::info_span!(parent: particle.span.as_ref(), "Actor: after future ready") + .entered(); self.future.take(); let waker = cx.waker().clone(); // Schedule execution of functions self.functions - .execute(self.particle.id.clone(), effects.call_requests, waker); + .execute(particle.particle.id.clone(), effects.call_requests, waker); let effects = RoutingEffects { - particle: Particle { - data: effects.new_data, - ..self.particle.clone() + particle: ExtendedParticle { + particle: Particle { + data: effects.new_data, + ..particle.particle.clone() + }, + span: particle.span.clone(), }, next_peers: effects.next_peers, }; @@ -188,19 +205,23 @@ where let (calls, stats) = self.functions.drain(); // Take the next particle - let particle = self.mailbox.pop_front(); + let ext_particle = self.mailbox.pop_front(); - if particle.is_none() && calls.is_empty() { + if ext_particle.is_none() && calls.is_empty() { debug_assert!(stats.is_empty(), "stats must be empty if calls are empty"); // Nothing to execute, return vm return ActorPoll::Vm(vm_id, vm); } - let particle = particle.unwrap_or_else(|| { + let particle = ext_particle.map(|p|p.particle).unwrap_or_else(|| { // If mailbox is empty, then take self.particle. // Its data is empty, so `vm` will process `calls` on the old (saved on disk) data self.particle.clone() }); + let span = ext_particle.map(|p|p.span).unwrap_or_else(||{ + Arc::new(tracing::info_span!("Unknown span")) + }); + let waker = cx.waker().clone(); // Take ownership of vm to process particle let peer_id = self.current_peer_id; @@ -219,7 +240,7 @@ where }; (reusables, res.effects, res.stats) } - .instrument(self.span.clone()) + .instrument(*span.clone()) .boxed(), ); self.wake(); diff --git a/aquamarine/src/aquamarine.rs b/aquamarine/src/aquamarine.rs index 7b3bdc63f5..c7ed5614c8 100644 --- a/aquamarine/src/aquamarine.rs +++ b/aquamarine/src/aquamarine.rs @@ -21,13 +21,13 @@ use std::time::Duration; use futures::StreamExt; use tokio::sync::mpsc; use tokio::task::JoinHandle; -use tracing::Instrument; +use tracing::{Instrument, instrument}; use fluence_libp2p::PeerId; use health::HealthCheckRegistry; use key_manager::KeyManager; use particle_execution::{ParticleFunctionStatic, ServiceFunction}; -use particle_protocol::Particle; +use particle_protocol::{ExtendedParticle}; use peer_metrics::{ParticleExecutorMetrics, VmPoolMetrics}; use crate::aqua_runtime::AquaRuntime; @@ -88,6 +88,8 @@ impl AquamarineBackend { match self.inlet.poll_recv(cx) { Poll::Ready(Some(Ingest { particle, function })) => { wake = true; + let span = tracing::info_span!(parent: particle.span.as_ref(), "Aquamarine: Poll Ingest"); + let _guard =span.entered(); // set new particle to be executed // every particle that comes from the connection pool first executed on the host peer id self.plumber.ingest(particle, function, self.host_peer_id); @@ -156,12 +158,13 @@ impl AquamarineApi { } /// Send particle to the interpreters pool + #[instrument(level = tracing::Level::INFO, skip_all)] pub fn execute( self, - particle: Particle, + particle: ExtendedParticle, function: Option, ) -> impl Future> { - let particle_id = particle.id.clone(); + let particle_id = particle.particle.id.clone(); self.send_command(Ingest { particle, function }, Some(particle_id)) } @@ -203,6 +206,6 @@ impl AquamarineApi { log::error!("Aquamarine outlet died!"); AquamarineDied { particle_id } }) - } + }.in_current_span() } } diff --git a/aquamarine/src/command.rs b/aquamarine/src/command.rs index d59b98a5ba..802c3ac6f7 100644 --- a/aquamarine/src/command.rs +++ b/aquamarine/src/command.rs @@ -15,12 +15,12 @@ */ use particle_execution::ServiceFunction; -use particle_protocol::Particle; +use particle_protocol::{ExtendedParticle}; use std::collections::HashMap; pub enum Command { Ingest { - particle: Particle, + particle: ExtendedParticle, function: Option, }, AddService { diff --git a/aquamarine/src/particle_effects.rs b/aquamarine/src/particle_effects.rs index f0682c8d6c..250fdee86b 100644 --- a/aquamarine/src/particle_effects.rs +++ b/aquamarine/src/particle_effects.rs @@ -15,7 +15,7 @@ */ use avm_server::CallRequests; -use particle_protocol::Particle; +use particle_protocol::{ExtendedParticle}; use std::time::Duration; use libp2p::PeerId; @@ -54,6 +54,6 @@ pub struct InterpretationStats { /// Instruct to send particle to either virtual or remote peers. #[derive(Clone, Debug)] pub struct RoutingEffects { - pub particle: Particle, + pub particle: ExtendedParticle, pub next_peers: Vec, } diff --git a/aquamarine/src/particle_executor.rs b/aquamarine/src/particle_executor.rs index 2e82df3203..55b059ea18 100644 --- a/aquamarine/src/particle_executor.rs +++ b/aquamarine/src/particle_executor.rs @@ -22,6 +22,7 @@ use avm_server::{CallResults, ParticleParameters}; use fluence_keypair::KeyPair; use futures::{future::BoxFuture, FutureExt}; use humantime::format_duration as pretty; +use tracing::{instrument, Instrument}; use fluence_libp2p::PeerId; use particle_protocol::Particle; @@ -59,6 +60,7 @@ impl ParticleExecutor for RT { type Future = Fut>; type Particle = (Particle, CallResults); + #[instrument(level = tracing::Level::INFO, skip_all)] fn execute( mut self, p: Self::Particle, @@ -69,11 +71,12 @@ impl ParticleExecutor for RT { let (particle, calls) = p; let particle_id = particle.id.clone(); let data_len = particle.data.len(); - let span = tracing::info_span!("Execute"); + let blockng_span = tracing::info_span!("Particle executor: blocking AVM call"); + let async_span = tracing::info_span!("Particle executor: async task"); let task = tokio::task::Builder::new() .name(&format!("Particle {}", particle.id)) .spawn_blocking(move || { - span.in_scope(move || { + blockng_span.in_scope(move || { let now = Instant::now(); tracing::trace!(target: "execution", particle_id = particle.id, "Executing particle"); @@ -162,6 +165,7 @@ impl ParticleExecutor for RT { } } } + .instrument(async_span) .boxed() } } diff --git a/aquamarine/src/particle_functions.rs b/aquamarine/src/particle_functions.rs index 26359ae4e0..51f29de6df 100644 --- a/aquamarine/src/particle_functions.rs +++ b/aquamarine/src/particle_functions.rs @@ -27,7 +27,7 @@ use humantime::format_duration as pretty; use serde_json::json; use serde_json::Value as JValue; use tokio::runtime::Handle; -use tracing::Instrument; +use tracing::{Instrument, instrument}; use particle_args::{Args, JError}; use particle_execution::{ @@ -93,6 +93,7 @@ impl Functions { } /// Add a bunch of call requests to execution + #[instrument(level = tracing::Level::INFO, skip_all)] pub fn execute(&mut self, particle_id: String, requests: CallRequests, waker: Waker) { let futs: Vec<_> = requests .into_iter() diff --git a/aquamarine/src/plumber.rs b/aquamarine/src/plumber.rs index a6dad5372d..0316e6896c 100644 --- a/aquamarine/src/plumber.rs +++ b/aquamarine/src/plumber.rs @@ -22,6 +22,7 @@ use std::{ use futures::task::Waker; use tokio::task; +use tracing::instrument; use fluence_libp2p::PeerId; use key_manager::KeyManager; @@ -29,7 +30,7 @@ use key_manager::KeyManager; #[cfg(test)] use mock_time::now_ms; use particle_execution::{ParticleFunctionStatic, ParticleParams, ServiceFunction}; -use particle_protocol::Particle; +use particle_protocol::ExtendedParticle; use peer_metrics::ParticleExecutorMetrics; /// Get current time from OS #[cfg(not(test))] @@ -76,29 +77,30 @@ impl Plumber { } /// Receives and ingests incoming particle: creates a new actor or forwards to the existing mailbox + #[instrument(level = tracing::Level::INFO, skip_all)] pub fn ingest( &mut self, - particle: Particle, + particle: ExtendedParticle, function: Option, worker_id: PeerId, ) { self.wake(); - let deadline = Deadline::from(&particle); + let deadline = Deadline::from(&particle.particle); if deadline.is_expired(now_ms()) { - tracing::info!(target: "expired", particle_id = particle.id, "Particle is expired"); + tracing::info!(target: "expired", particle_id = particle.particle.id, "Particle is expired"); self.events .push_back(Err(AquamarineApiError::ParticleExpired { - particle_id: particle.id, + particle_id: particle.particle.id, })); return; } - if let Err(err) = particle.verify() { - tracing::warn!(target: "signature", particle_id = particle.id, "Particle signature verification failed: {err:?}"); + if let Err(err) = particle.particle.verify() { + tracing::warn!(target: "signature", particle_id = particle.particle.id, "Particle signature verification failed: {err:?}"); self.events .push_back(Err(AquamarineApiError::SignatureVerificationFailed { - particle_id: particle.id, + particle_id: particle.particle.id, err, })); return; @@ -112,19 +114,18 @@ impl Plumber { } let builtins = &self.builtins; - let key = (ParticleId(particle.signature.clone()), worker_id); + let key = (ParticleId(particle.particle.signature.clone()), worker_id); let entry = self.actors.entry(key); let actor = match entry { Entry::Occupied(actor) => Ok(actor.into_mut()), Entry::Vacant(entry) => { - let params = ParticleParams::clone_from(&particle, worker_id); + let params = ParticleParams::clone_from(&particle.particle, worker_id); let functions = Functions::new(params, builtins.clone()); let key_pair = self.key_manager.get_worker_keypair(worker_id); let deal_id = self.key_manager.get_deal_id(worker_id).ok(); key_pair.map(|kp| { - let span = tracing::info_span!("Actor", deal_id = deal_id); - let actor = Actor::new(&particle, functions, worker_id, kp, span); + let actor = Actor::new(&particle, functions, worker_id, kp, deal_id); entry.insert(actor) }) } @@ -142,7 +143,7 @@ impl Plumber { Err(err) => log::warn!( "No such worker {}, rejected particle {}: {:?}", worker_id, - particle.id, + particle.particle.id, err ), } @@ -293,6 +294,8 @@ impl Plumber { for effect in local_effects { for local_peer in effect.next_peers { + let span = tracing::info_span!(parent: effect.particle.span.as_ref(), "Plumber: routing effect ingest"); + let _guard = span.enter(); self.ingest(effect.particle.clone(), None, local_peer); } } diff --git a/aquamarine/src/vm_pool.rs b/aquamarine/src/vm_pool.rs index 5951610272..ce0326438f 100644 --- a/aquamarine/src/vm_pool.rs +++ b/aquamarine/src/vm_pool.rs @@ -167,7 +167,7 @@ impl VmPool { // Remove completed future creating_vms.remove(fut_index); if creating_vms.is_empty() { - tracing::info!("All {}ч AquaVMs created.", self.pool_size) + tracing::info!("All {} AquaVMs created.", self.pool_size) } // Put created vm to self.vms diff --git a/connection-pool/src/api.rs b/connection-pool/src/api.rs index b5759a056a..4054a9598f 100644 --- a/connection-pool/src/api.rs +++ b/connection-pool/src/api.rs @@ -20,9 +20,10 @@ use futures::{future::BoxFuture, stream::BoxStream, FutureExt, StreamExt}; use libp2p::{core::Multiaddr, PeerId}; use tokio::sync::{mpsc, oneshot}; use tokio_stream::wrappers::UnboundedReceiverStream; +use tracing::instrument; -use particle_protocol::Particle; use particle_protocol::{Contact, SendStatus}; +use particle_protocol::{ExtendedParticle}; use crate::connection_pool::LifecycleEvent; use crate::ConnectionPoolT; @@ -36,7 +37,7 @@ pub enum Command { }, Send { to: Contact, - particle: Particle, + particle: ExtendedParticle, out: oneshot::Sender, }, Dial { @@ -111,7 +112,8 @@ impl ConnectionPoolT for ConnectionPoolApi { self.execute(|out| Command::GetContact { peer_id, out }) } - fn send(&self, to: Contact, particle: Particle) -> BoxFuture<'static, SendStatus> { + #[instrument(level = tracing::Level::INFO, skip_all)] + fn send(&self, to: Contact, particle: ExtendedParticle) -> BoxFuture<'static, SendStatus> { let fut = self.execute(|out| Command::Send { to, particle, out }); // timeout on send is required because libp2p can silently drop outbound events let timeout = self.send_timeout; diff --git a/connection-pool/src/behaviour.rs b/connection-pool/src/behaviour.rs index cb097dd4c6..750536a67c 100644 --- a/connection-pool/src/behaviour.rs +++ b/connection-pool/src/behaviour.rs @@ -14,6 +14,7 @@ * limitations under the License. */ +use fluence_libp2p::remote_multiaddr; use futures::{Sink, StreamExt}; use libp2p::core::Endpoint; use libp2p::swarm::dial_opts::DialOpts; @@ -27,21 +28,20 @@ use libp2p::{ swarm::{NetworkBehaviour, NotifyHandler, OneShotHandler}, PeerId, }; +use particle_protocol::{ + CompletionChannel, Contact, ExtendedParticle, HandlerMessage, ProtocolConfig, SendStatus, +}; +use peer_metrics::ConnectionPoolMetrics; use std::pin::Pin; use std::{ collections::{hash_map::Entry, HashMap, HashSet, VecDeque}, task::{Context, Poll, Waker}, }; +use std::sync::Arc; use tokio::sync::{mpsc, oneshot}; use tokio_stream::wrappers::UnboundedReceiverStream; use tokio_util::sync::PollSender; -use fluence_libp2p::remote_multiaddr; -use particle_protocol::{ - CompletionChannel, Contact, HandlerMessage, Particle, ProtocolConfig, SendStatus, -}; -use peer_metrics::ConnectionPoolMetrics; - use crate::connection_pool::LifecycleEvent; use crate::{Command, ConnectionPoolApi}; @@ -103,10 +103,10 @@ pub struct ConnectionPoolBehaviour { commands: UnboundedReceiverStream, - outlet: PollSender, + outlet: PollSender, subscribers: Vec>, - queue: VecDeque, + queue: VecDeque, contacts: HashMap, dialing: HashMap>>>, @@ -211,23 +211,36 @@ impl ConnectionPoolBehaviour { /// Sends a particle to a connected contact. Returns whether sending succeeded or not /// Result is sent to channel inside `upgrade_outbound` in ProtocolHandler - pub fn send(&mut self, to: Contact, particle: Particle, outlet: oneshot::Sender) { + pub fn send( + &mut self, + to: Contact, + particle: ExtendedParticle, + outlet: oneshot::Sender, + ) { + tracing::info!("Current arc count {}", Arc::strong_count(&particle.span)); + let span = tracing::info_span!(parent: particle.span.as_ref(), "Connection pool behaviour: send"); + tracing::info!("Current arc count {}", Arc::strong_count(&particle.span)); + let _guard = span.enter(); if to.peer_id == self.peer_id { // If particle is sent to the current node, process it locally self.queue.push_back(particle); outlet.send(SendStatus::Ok).ok(); self.wake(); } else if self.contacts.contains_key(&to.peer_id) { - tracing::debug!(target: "network",particle_id = particle.id , "{}: Sending particle to {}", self.peer_id, to.peer_id); + tracing::debug!(target: "network",particle_id = particle.particle.id , "{}: Sending particle to {}", self.peer_id, to.peer_id); + // Send particle to remote peer self.push_event(ToSwarm::NotifyHandler { peer_id: to.peer_id, handler: NotifyHandler::Any, - event: HandlerMessage::OutParticle(particle, CompletionChannel::Oneshot(outlet)), + event: HandlerMessage::OutParticle( + particle.particle, + CompletionChannel::Oneshot(outlet), + ), }); } else { tracing::warn!( - particle_id = particle.id, + particle_id = particle.particle.id, "Won't send particle to contact {}: not connected", to.peer_id ); @@ -264,7 +277,7 @@ impl ConnectionPoolBehaviour { protocol_config: ProtocolConfig, peer_id: PeerId, metrics: Option, - ) -> (Self, mpsc::Receiver, ConnectionPoolApi) { + ) -> (Self, mpsc::Receiver, ConnectionPoolApi) { let (outlet, inlet) = mpsc::channel(buffer); let outlet = PollSender::new(outlet); let (command_outlet, command_inlet) = mpsc::unbounded_channel(); @@ -606,10 +619,9 @@ impl NetworkBehaviour for ConnectionPoolBehaviour { match event { Ok(HandlerMessage::InParticle(particle)) => { tracing::info!(target: "network", particle_id = particle.id,"{}: received particle from {}; queue {}", self.peer_id, from, self.queue.len()); - let parent_span = - tracing::info_span!("Particle", particle_id = particle.id); - let span = tracing::info_span!(parent: &parent_span, "Inbound particle", particle_id = particle.id); - let _span_guard = span.entered(); + let root_span = tracing::info_span!("Particle", particle_id = particle.id); + let _ = root_span.enter(); + self.meter(|m| { m.incoming_particle( &particle.id, @@ -617,7 +629,10 @@ impl NetworkBehaviour for ConnectionPoolBehaviour { particle.data.len() as f64, ) }); - self.queue.push_back(particle); + self.queue.push_back(ExtendedParticle { + particle, + span: Arc::new(root_span), + }); self.wake(); } Ok(HandlerMessage::InboundUpgradeError(err)) => log::warn!("UpgradeError: {:?}", err), @@ -637,7 +652,9 @@ impl NetworkBehaviour for ConnectionPoolBehaviour { Poll::Ready(Ok(_)) => { // channel is ready to consume more particles, so send them if let Some(particle) = self.queue.pop_front() { - let particle_id = particle.id.clone(); + let particle_id = particle.particle.id.clone(); + let _span = tracing::info_span!(parent: particle.span.as_ref(), "Connection pool: send to outlet").entered(); + if let Err(err) = outlet.start_send(particle) { tracing::error!( particle_id = particle_id, diff --git a/connection-pool/src/connection_pool.rs b/connection-pool/src/connection_pool.rs index 5ee0597974..6409b9b7a0 100644 --- a/connection-pool/src/connection_pool.rs +++ b/connection-pool/src/connection_pool.rs @@ -19,7 +19,7 @@ use std::fmt::{Display, Formatter}; use futures::{future::BoxFuture, stream::BoxStream}; use libp2p::{core::Multiaddr, PeerId}; -use particle_protocol::{Contact, Particle, SendStatus}; +use particle_protocol::{Contact, ExtendedParticle, SendStatus}; #[derive(Debug, Clone)] pub enum LifecycleEvent { @@ -42,7 +42,7 @@ pub trait ConnectionPoolT { fn disconnect(&self, peer_id: PeerId) -> BoxFuture<'static, bool>; fn is_connected(&self, peer_id: PeerId) -> BoxFuture<'static, bool>; fn get_contact(&self, peer_id: PeerId) -> BoxFuture<'static, Option>; - fn send(&self, to: Contact, particle: Particle) -> BoxFuture<'static, SendStatus>; + fn send(&self, to: Contact, particle: ExtendedParticle) -> BoxFuture<'static, SendStatus>; fn count_connections(&self) -> BoxFuture<'static, usize>; fn lifecycle_events(&self) -> BoxStream<'static, LifecycleEvent>; } diff --git a/nox/src/behaviour/network.rs b/nox/src/behaviour/network.rs index ff0aeb4ba3..ea0faae2f9 100644 --- a/nox/src/behaviour/network.rs +++ b/nox/src/behaviour/network.rs @@ -25,7 +25,7 @@ use tokio::sync::mpsc; use connection_pool::ConnectionPoolBehaviour; use health::HealthCheckRegistry; use kademlia::{Kademlia, KademliaConfig}; -use particle_protocol::{Particle, PROTOCOL_NAME}; +use particle_protocol::{ExtendedParticle, PROTOCOL_NAME}; use server_config::NetworkConfig; use crate::connectivity::Connectivity; @@ -45,7 +45,7 @@ impl FluenceNetworkBehaviour { pub fn new( cfg: NetworkConfig, health_registry: Option<&mut HealthCheckRegistry>, - ) -> (Self, Connectivity, mpsc::Receiver) { + ) -> (Self, Connectivity, mpsc::Receiver) { let local_public_key = cfg.key_pair.public(); let identify = Identify::new( IdentifyConfig::new(PROTOCOL_NAME.into(), local_public_key) diff --git a/nox/src/connectivity.rs b/nox/src/connectivity.rs index fbcd06439c..49e27a6b4f 100644 --- a/nox/src/connectivity.rs +++ b/nox/src/connectivity.rs @@ -25,10 +25,10 @@ use futures::{stream::iter, StreamExt}; use humantime_serde::re::humantime::format_duration as pretty; use kademlia::{KademliaApi, KademliaApiT, KademliaError}; use libp2p::Multiaddr; -use particle_protocol::{Contact, Particle, SendStatus}; +use particle_protocol::{Contact, ExtendedParticle, SendStatus}; use peer_metrics::{ConnectivityMetrics, Resolution}; use tokio::time::sleep; -use tracing::{Instrument, Span}; +use tracing::{Instrument, instrument, Span}; use crate::tasks::Tasks; @@ -62,6 +62,7 @@ impl Connectivity { Tasks::new("Connectivity", vec![run_bootstrap, reconnect_bootstraps]) } + #[instrument(level = tracing::Level::INFO, skip_all)] pub async fn resolve_contact(&self, target: PeerId, particle_id: &str) -> Option { let metrics = self.metrics.as_ref(); let contact = self.connection_pool.get_contact(target).await; @@ -124,10 +125,11 @@ impl Connectivity { None } - pub async fn send(&self, contact: Contact, particle: Particle) -> bool { - tracing::debug!(particle_id = particle.id, "Sending particle to {}", contact); + #[instrument(level = tracing::Level::INFO, skip_all)] + pub async fn send(&self, contact: Contact, particle: ExtendedParticle) -> bool { + tracing::debug!(particle_id = particle.particle.id, "Sending particle to {}", contact); let metrics = self.metrics.as_ref(); - let id = particle.id.clone(); + let id = particle.particle.id.clone(); let sent = self.connection_pool.send(contact.clone(), particle).await; match &sent { SendStatus::Ok => { diff --git a/nox/src/dispatcher.rs b/nox/src/dispatcher.rs index 3271ea489b..80fcbec182 100644 --- a/nox/src/dispatcher.rs +++ b/nox/src/dispatcher.rs @@ -17,12 +17,12 @@ use aquamarine::{AquamarineApi, AquamarineApiError, RoutingEffects}; use fluence_libp2p::PeerId; use futures::{FutureExt, StreamExt}; -use particle_protocol::Particle; +use particle_protocol::ExtendedParticle; use peer_metrics::DispatcherMetrics; use prometheus_client::registry::Registry; use tokio::sync::mpsc; use tokio_stream::wrappers::{ReceiverStream, UnboundedReceiverStream}; -use tracing::Instrument; +use tracing::{Instrument, instrument}; use crate::effectors::Effectors; use crate::tasks::Tasks; @@ -61,7 +61,7 @@ impl Dispatcher { impl Dispatcher { pub fn start( self, - particle_stream: mpsc::Receiver, + particle_stream: mpsc::Receiver, effects_stream: mpsc::UnboundedReceiver, ) -> Tasks { log::info!("starting dispatcher"); @@ -85,21 +85,24 @@ impl Dispatcher { pub async fn process_particles(self, particle_stream: Src) where - Src: futures::Stream + Unpin + Send + Sync + 'static, + Src: futures::Stream + Unpin + Send + Sync + 'static, { let parallelism = self.particle_parallelism; let aquamarine = self.aquamarine; let metrics = self.metrics; particle_stream .for_each_concurrent(parallelism, move |particle| { + let current_span = tracing::info_span!(parent: particle.span.as_ref(), "Dispatcher: process particle"); + let async_span = tracing::info_span!(parent: particle.span.as_ref(), "Dispatcher: async Aquamarine.execute"); + let _ = current_span.enter(); let aquamarine = aquamarine.clone(); let metrics = metrics.clone(); - if particle.is_expired() { + if particle.particle.is_expired() { if let Some(m) = metrics { - m.particle_expired(&particle.id); + m.particle_expired(&particle.particle.id); } - tracing::info!(target: "expired", particle_id = particle.id, "Particle is expired"); + tracing::info!(target: "expired", particle_id = particle.particle.id, "Particle is expired"); return async {}.boxed(); } @@ -110,6 +113,7 @@ impl Dispatcher { .map(|_| ()) .await } + .instrument(async_span) .boxed() }) .await; @@ -117,6 +121,7 @@ impl Dispatcher { log::error!("Particle stream has ended"); } + #[instrument(level = tracing::Level::INFO, skip_all)] async fn process_effects(self, effects_stream: Src) where Src: futures::Stream + Unpin + Send + Sync + 'static, @@ -130,8 +135,9 @@ impl Dispatcher { async move { match effects { Ok(effects) => { + let span = tracing::info_span!(parent: effects.particle.span.as_ref(), "Dispatcher: execute effectors"); // perform effects as instructed by aquamarine - effectors.execute(effects).await; + effectors.execute(effects).instrument(span).await; } Err(err) => { // particles are sent in fire and forget fashion, so diff --git a/nox/src/effectors.rs b/nox/src/effectors.rs index 61ef491d34..124621eddb 100644 --- a/nox/src/effectors.rs +++ b/nox/src/effectors.rs @@ -15,6 +15,7 @@ */ use futures::{stream::iter, StreamExt}; +use tracing::instrument; use aquamarine::RoutingEffects; @@ -31,9 +32,10 @@ impl Effectors { } /// Perform effects that Aquamarine instructed us to + #[instrument(level = tracing::Level::INFO, skip_all)] pub async fn execute(self, effects: RoutingEffects) { - if effects.particle.is_expired() { - tracing::info!(target: "expired", particle_id = effects.particle.id, "Particle is expired"); + if effects.particle.particle.is_expired() { + tracing::info!(target: "expired", particle_id = effects.particle.particle.id, "Particle is expired"); return; } @@ -46,7 +48,7 @@ impl Effectors { let particle = particle.clone(); async move { // resolve contact - if let Some(contact) = connectivity.resolve_contact(target, &particle.id).await { + if let Some(contact) = connectivity.resolve_contact(target, &particle.particle.id).await { // forward particle let sent = connectivity.send(contact, particle).await; if sent { diff --git a/nox/src/node.rs b/nox/src/node.rs index 633dfe924b..5cf92fdb3b 100644 --- a/nox/src/node.rs +++ b/nox/src/node.rs @@ -39,7 +39,7 @@ use libp2p_connection_limits::ConnectionLimits; use libp2p_metrics::{Metrics, Recorder}; use particle_builtins::{Builtins, CustomService, NodeInfo}; use particle_execution::ParticleFunctionStatic; -use particle_protocol::Particle; +use particle_protocol::{ExtendedParticle}; use peer_metrics::{ ConnectionPoolMetrics, ConnectivityMetrics, ParticleExecutorMetrics, ServicesMetrics, ServicesMetricsBackend, SpellMetrics, VmPoolMetrics, @@ -66,7 +66,7 @@ use crate::metrics::TokioCollector; // TODO: documentation pub struct Node { - particle_stream: mpsc::Receiver, + particle_stream: mpsc::Receiver, effects_stream: mpsc::UnboundedReceiver>, pub swarm: Swarm, @@ -354,7 +354,7 @@ impl Node { ) -> eyre::Result<( Swarm, Connectivity, - mpsc::Receiver, + mpsc::Receiver, )> { let connection_idle_timeout = network_config.connection_idle_timeout; @@ -402,7 +402,7 @@ pub struct StartedNode { impl Node { #[allow(clippy::too_many_arguments)] pub fn with( - particle_stream: mpsc::Receiver, + particle_stream: mpsc::Receiver, effects_stream: mpsc::UnboundedReceiver>, swarm: Swarm, connectivity: Connectivity, diff --git a/particle-protocol/src/lib.rs b/particle-protocol/src/lib.rs index a4d1fe8423..4f18c54fca 100644 --- a/particle-protocol/src/lib.rs +++ b/particle-protocol/src/lib.rs @@ -43,5 +43,6 @@ pub use libp2p_protocol::message::SendStatus; pub use libp2p_protocol::message::{HandlerMessage, ProtocolMessage}; pub use libp2p_protocol::upgrade::ProtocolConfig; pub use particle::Particle; +pub use particle::ExtendedParticle; pub const PROTOCOL_NAME: &str = "/fluence/particle/2.0.0"; diff --git a/particle-protocol/src/particle.rs b/particle-protocol/src/particle.rs index a3daf6e7bf..71c9c528c8 100644 --- a/particle-protocol/src/particle.rs +++ b/particle-protocol/src/particle.rs @@ -15,12 +15,14 @@ */ use std::convert::TryInto; +use std::sync::Arc; use std::time::Duration; use derivative::Derivative; use fluence_keypair::{KeyPair, PublicKey, Signature}; use libp2p::PeerId; use serde::{Deserialize, Serialize}; +use tracing::Span; use crate::error::ParticleError; use crate::error::ParticleError::{ @@ -29,6 +31,12 @@ use crate::error::ParticleError::{ use fluence_libp2p::{peerid_serializer, RandomPeerId}; use json_utils::base64_serde; use now_millis::now_ms; +#[derive(Clone, Debug)] +pub struct ExtendedParticle { + pub particle: Particle, + pub span: Arc, +} + #[derive(Clone, Serialize, Deserialize, PartialEq, Derivative)] #[derivative(Debug)] diff --git a/sorcerer/src/script_executor.rs b/sorcerer/src/script_executor.rs index 8bcd2bdf60..da8f533ce8 100644 --- a/sorcerer/src/script_executor.rs +++ b/sorcerer/src/script_executor.rs @@ -13,12 +13,14 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +use std::sync::Arc; use fluence_libp2p::PeerId; +use tracing::{instrument, Instrument, Span}; use crate::error::SorcererError::{ParticleSigningFailed, ScopeKeypairMissing}; use now_millis::now_ms; use particle_args::JError; -use particle_protocol::Particle; +use particle_protocol::{ExtendedParticle, Particle}; use spell_event_bus::api::{TriggerEvent, TriggerInfoAqua}; use spell_service_api::CallParams; @@ -97,6 +99,7 @@ impl Sorcerer { .map_err(|e| JError::new(e.to_string())) } + #[instrument(level = tracing::Level::INFO, skip_all)] pub async fn execute_script(&self, event: TriggerEvent) { let error: Result<(), JError> = try { let worker_id = self.services.get_service_owner( @@ -110,7 +113,21 @@ impl Sorcerer { if let Some(m) = &self.spell_metrics { m.observe_spell_cast(); } - self.aquamarine.clone().execute(particle, None).await?; + + let particle_span = Span::current(); + let async_span = particle_span.clone(); + + self.aquamarine + .clone() + .execute( + ExtendedParticle { + particle, + span: Arc::new(particle_span), + }, + None, + ) + .instrument(async_span) + .await?; }; if let Err(err) = error { diff --git a/sorcerer/src/sorcerer.rs b/sorcerer/src/sorcerer.rs index 30aa90b50e..cc28defd45 100644 --- a/sorcerer/src/sorcerer.rs +++ b/sorcerer/src/sorcerer.rs @@ -43,6 +43,7 @@ use server_config::ResolvedConfig; use spell_event_bus::api::{from_user_config, SpellEventBusApi, TriggerEvent}; use spell_service_api::{CallParams, SpellServiceApi}; use spell_storage::SpellStorage; +use tracing::Instrument; #[derive(Clone)] pub struct Sorcerer { @@ -142,11 +143,17 @@ impl Sorcerer { let spell_events_stream = UnboundedReceiverStream::new(spell_events_receiver); spell_events_stream .for_each_concurrent(None, move |spell_event| { + let span = tracing::info_span!( + "Sorcerer: spell processing", + spell_id = spell_event.spell_id.to_string() + ); + let _ = span.enter(); let sorcerer = self.clone(); // Note that the event that triggered the spell is in `spell_event.event` async move { sorcerer.execute_script(spell_event).await; } + .instrument(span) }) .await; }) From 8cae25fc34ece1622d8f841be996ac69bfab25ed Mon Sep 17 00:00:00 2001 From: Nick Date: Mon, 27 Nov 2023 13:58:45 +0300 Subject: [PATCH 03/33] wip --- aquamarine/src/actor.rs | 68 ++++++++++++++++++++------------ connection-pool/src/behaviour.rs | 1 - 2 files changed, 42 insertions(+), 27 deletions(-) diff --git a/aquamarine/src/actor.rs b/aquamarine/src/actor.rs index 4909c67eb4..220825ac3d 100644 --- a/aquamarine/src/actor.rs +++ b/aquamarine/src/actor.rs @@ -14,16 +14,16 @@ * limitations under the License. */ +use std::sync::Arc; use std::{ collections::VecDeque, task::{Context, Poll, Waker}, }; -use std::sync::Arc; use fluence_keypair::KeyPair; use futures::future::BoxFuture; use futures::FutureExt; -use tracing::{instrument, Instrument}; +use tracing::{instrument, Instrument, Span}; use fluence_libp2p::PeerId; use particle_execution::{ParticleFunctionStatic, ServiceFunction}; @@ -50,14 +50,14 @@ pub struct Actor { Reusables, ParticleEffects, InterpretationStats, - ExtendedParticle + Option>, ), >, >, mailbox: VecDeque, waker: Option, functions: Functions, - /// Particle that's memoized on the actor creation. + /// Particle that's memoized on the actor creation. /// Used to execute CallRequests when mailbox is empty. /// Particle's data is empty. particle: Particle, @@ -65,7 +65,7 @@ pub struct Actor { /// It's either `host_peer_id` or local worker peer id current_peer_id: PeerId, key_pair: KeyPair, - _deal_id: Option, //TODO: fix + deal_id: Option, } impl Actor @@ -86,7 +86,7 @@ where future: None, mailbox: <_>::default(), waker: None, - // Clone particle without data + // Clone particle without data particle: Particle { id: particle.particle.id.clone(), init_peer_id: particle.particle.init_peer_id, @@ -98,7 +98,7 @@ where }, current_peer_id, key_pair, - _deal_id: deal_id, + deal_id, } } @@ -113,7 +113,9 @@ where pub fn cleanup(&self, vm: &mut RT) { tracing::debug!( target: "particle_reap", - particle_id = self.particle.id, worker_id = self.current_peer_id.to_string(), + particle_id = self.particle.id, + worker_id = self.current_peer_id.to_string(), + deal_id = self.deal_id, "Reaping particle's actor" ); // TODO: remove dirs without using vm https://github.com/fluencelabs/fluence/issues/1216 @@ -121,6 +123,7 @@ where if let Err(err) = vm.cleanup(&self.particle.id, &self.current_peer_id.to_string()) { tracing::warn!( particle_id = self.particle.id, + deal_id = self.deal_id, "Error cleaning up after particle {:?}", err ); @@ -153,27 +156,31 @@ where self.functions.poll(cx); // Poll AquaVM future - if let Some(Ready((reusables, effects, stats, particle))) = + if let Some(Ready((reusables, effects, stats, future_span))) = self.future.as_mut().map(|f| f.poll_unpin(cx)) { - let _span = - tracing::info_span!(parent: particle.span.as_ref(), "Actor: after future ready") - .entered(); - self.future.take(); + tracing::info!("Opa {}", future_span.is_some()); + + let span = future_span.as_ref().map(|parent_span|{ + tracing::info_span!(parent: parent_span.as_ref(), "Actor: after future ready", deal_id = self.deal_id) + }).unwrap_or_else(Span::none); + let parent_span = future_span.unwrap_or_else(|| Arc::new(Span::none())); + let _span_guard = span.enter(); + let waker = cx.waker().clone(); // Schedule execution of functions self.functions - .execute(particle.particle.id.clone(), effects.call_requests, waker); + .execute(self.particle.id.clone(), effects.call_requests, waker); let effects = RoutingEffects { particle: ExtendedParticle { particle: Particle { data: effects.new_data, - ..particle.particle.clone() + ..self.particle.clone() }, - span: particle.span.clone(), + span: parent_span, }, next_peers: effects.next_peers, }; @@ -213,14 +220,14 @@ where return ActorPoll::Vm(vm_id, vm); } - let particle = ext_particle.map(|p|p.particle).unwrap_or_else(|| { - // If mailbox is empty, then take self.particle. - // Its data is empty, so `vm` will process `calls` on the old (saved on disk) data - self.particle.clone() - }); - let span = ext_particle.map(|p|p.span).unwrap_or_else(||{ - Arc::new(tracing::info_span!("Unknown span")) - }); + let particle = ext_particle + .as_ref() + .map(|p| p.particle.clone()) + .unwrap_or_else(|| { + // If mailbox is empty, then take self.particle. + // Its data is empty, so `vm` will process `calls` on the old (saved on disk) data + self.particle.clone() + }); let waker = cx.waker().clone(); // Take ownership of vm to process particle @@ -228,6 +235,10 @@ where // TODO: get rid of this clone by recovering key_pair after `vm.execute` (not trivial to implement) let key_pair = self.key_pair.clone(); // TODO: add timeout for execution https://github.com/fluencelabs/fluence/issues/1212 + let async_span = ext_particle + .as_ref() + .map(|p| tracing::info_span!(parent: p.span.as_ref(), "Actor future async")) + .unwrap_or_else(Span::none); self.future = Some( async move { let res = vm @@ -238,9 +249,14 @@ where vm_id, vm: res.runtime, }; - (reusables, res.effects, res.stats) + ( + reusables, + res.effects, + res.stats, + ext_particle.map(|p| p.span), + ) } - .instrument(*span.clone()) + .instrument(async_span) .boxed(), ); self.wake(); diff --git a/connection-pool/src/behaviour.rs b/connection-pool/src/behaviour.rs index 750536a67c..0c054ff3be 100644 --- a/connection-pool/src/behaviour.rs +++ b/connection-pool/src/behaviour.rs @@ -219,7 +219,6 @@ impl ConnectionPoolBehaviour { ) { tracing::info!("Current arc count {}", Arc::strong_count(&particle.span)); let span = tracing::info_span!(parent: particle.span.as_ref(), "Connection pool behaviour: send"); - tracing::info!("Current arc count {}", Arc::strong_count(&particle.span)); let _guard = span.enter(); if to.peer_id == self.peer_id { // If particle is sent to the current node, process it locally From d94e5eede45c11578f40dad5d2e034e01a147c08 Mon Sep 17 00:00:00 2001 From: Nick Date: Mon, 27 Nov 2023 19:34:47 +0300 Subject: [PATCH 04/33] wip --- aquamarine/src/actor.rs | 55 ++++++++-------------- aquamarine/src/aquamarine.rs | 2 +- aquamarine/src/particle_functions.rs | 68 ++++++++++++++++++---------- aquamarine/src/plumber.rs | 2 +- connection-pool/src/behaviour.rs | 8 ++-- nox/src/dispatcher.rs | 6 +-- particle-protocol/src/particle.rs | 3 +- sorcerer/src/script_executor.rs | 3 +- 8 files changed, 74 insertions(+), 73 deletions(-) diff --git a/aquamarine/src/actor.rs b/aquamarine/src/actor.rs index 220825ac3d..5b878de578 100644 --- a/aquamarine/src/actor.rs +++ b/aquamarine/src/actor.rs @@ -14,7 +14,6 @@ * limitations under the License. */ -use std::sync::Arc; use std::{ collections::VecDeque, task::{Context, Poll, Waker}, @@ -43,17 +42,7 @@ struct Reusables { pub struct Actor { /// Particle of that actor is expired after that deadline deadline: Deadline, - future: Option< - BoxFuture< - 'static, - ( - Reusables, - ParticleEffects, - InterpretationStats, - Option>, - ), - >, - >, + future: Option, ParticleEffects, InterpretationStats, Span)>>, mailbox: VecDeque, waker: Option, functions: Functions, @@ -156,23 +145,16 @@ where self.functions.poll(cx); // Poll AquaVM future - if let Some(Ready((reusables, effects, stats, future_span))) = + if let Some(Ready((reusables, effects, stats, span))) = self.future.as_mut().map(|f| f.poll_unpin(cx)) { + let local_span = tracing::info_span!(parent: &span, "Poll AVM future"); + let _span_guard = local_span.enter(); self.future.take(); - tracing::info!("Opa {}", future_span.is_some()); - - let span = future_span.as_ref().map(|parent_span|{ - tracing::info_span!(parent: parent_span.as_ref(), "Actor: after future ready", deal_id = self.deal_id) - }).unwrap_or_else(Span::none); - let parent_span = future_span.unwrap_or_else(|| Arc::new(Span::none())); - let _span_guard = span.enter(); - let waker = cx.waker().clone(); // Schedule execution of functions - self.functions - .execute(self.particle.id.clone(), effects.call_requests, waker); + self.functions.execute(self.particle.id.clone(), effects.call_requests, waker, &span); let effects = RoutingEffects { particle: ExtendedParticle { @@ -180,7 +162,7 @@ where data: effects.new_data, ..self.particle.clone() }, - span: parent_span, + span, }, next_peers: effects.next_peers, }; @@ -209,7 +191,7 @@ where } // Gather CallResults - let (calls, stats) = self.functions.drain(); + let (calls, stats, spans) = self.functions.drain(); // Take the next particle let ext_particle = self.mailbox.pop_front(); @@ -234,29 +216,32 @@ where let peer_id = self.current_peer_id; // TODO: get rid of this clone by recovering key_pair after `vm.execute` (not trivial to implement) let key_pair = self.key_pair.clone(); - // TODO: add timeout for execution https://github.com/fluencelabs/fluence/issues/1212 let async_span = ext_particle .as_ref() - .map(|p| tracing::info_span!(parent: p.span.as_ref(), "Actor future async")) - .unwrap_or_else(Span::none); + .map(|p| tracing::info_span!(parent: &p.span, "Actor async AVM process particle with call results")) + .unwrap_or_else(|| tracing::info_span!("Actor async AVM process call results")); + + for span in spans { + async_span.follows_from(span); + } + + // TODO: add timeout for execution https://github.com/fluencelabs/fluence/issues/1212 self.future = Some( async move { + let span = async_span.clone(); + let _guard = span.enter(); let res = vm .execute((particle, calls), waker, peer_id, key_pair) + .in_current_span() .await; let reusables = Reusables { vm_id, vm: res.runtime, }; - ( - reusables, - res.effects, - res.stats, - ext_particle.map(|p| p.span), - ) + (reusables, res.effects, res.stats, async_span) } - .instrument(async_span) + //.instrument(async_span) .boxed(), ); self.wake(); diff --git a/aquamarine/src/aquamarine.rs b/aquamarine/src/aquamarine.rs index c7ed5614c8..62fe314cf9 100644 --- a/aquamarine/src/aquamarine.rs +++ b/aquamarine/src/aquamarine.rs @@ -88,7 +88,7 @@ impl AquamarineBackend { match self.inlet.poll_recv(cx) { Poll::Ready(Some(Ingest { particle, function })) => { wake = true; - let span = tracing::info_span!(parent: particle.span.as_ref(), "Aquamarine: Poll Ingest"); + let span = tracing::info_span!(parent: &particle.span, "Aquamarine: Poll Ingest"); let _guard =span.entered(); // set new particle to be executed // every particle that comes from the connection pool first executed on the host peer id diff --git a/aquamarine/src/particle_functions.rs b/aquamarine/src/particle_functions.rs index 51f29de6df..c45fdbe5c1 100644 --- a/aquamarine/src/particle_functions.rs +++ b/aquamarine/src/particle_functions.rs @@ -27,7 +27,7 @@ use humantime::format_duration as pretty; use serde_json::json; use serde_json::Value as JValue; use tokio::runtime::Handle; -use tracing::{Instrument, instrument}; +use tracing::{instrument, Instrument, Span}; use particle_args::{Args, JError}; use particle_execution::{ @@ -55,6 +55,7 @@ pub struct SingleCallResult { call_id: u32, result: CallServiceResult, stat: SingleCallStat, + span: Span, } pub struct Functions { @@ -63,6 +64,7 @@ pub struct Functions { function_calls: FuturesUnordered>, call_results: CallResults, call_stats: Vec, + call_spans: Vec, particle_function: Option>>, } @@ -74,6 +76,7 @@ impl Functions { function_calls: <_>::default(), call_results: <_>::default(), call_stats: <_>::default(), + call_spans: <_>::default(), particle_function: None, } } @@ -83,6 +86,7 @@ impl Functions { while let Poll::Ready(Some(r)) = self.function_calls.poll_next_unpin(cx) { let overwritten = self.call_results.insert(r.call_id, r.result); self.call_stats.push(r.stat); + self.call_spans.push(r.span); debug_assert!( overwritten.is_none(), @@ -94,20 +98,27 @@ impl Functions { /// Add a bunch of call requests to execution #[instrument(level = tracing::Level::INFO, skip_all)] - pub fn execute(&mut self, particle_id: String, requests: CallRequests, waker: Waker) { + pub fn execute( + &mut self, + particle_id: String, + requests: CallRequests, + waker: Waker, + span: &Span, + ) { let futs: Vec<_> = requests .into_iter() - .map(|(id, call)| self.call(particle_id.clone(), id, call, waker.clone())) + .map(|(id, call)| self.call(particle_id.clone(), id, call, waker.clone(), span)) .collect(); self.function_calls.extend(futs); } /// Retrieve all existing call results - pub fn drain(&mut self) -> (CallResults, Vec) { + pub fn drain(&mut self) -> (CallResults, Vec, Vec) { let call_results = std::mem::take(&mut self.call_results); let stats = std::mem::take(&mut self.call_stats); + let call_spans = std::mem::take(&mut self.call_spans); - (call_results, stats) + (call_results, stats, call_spans) } pub fn set_function(&mut self, function: ServiceFunction) { @@ -127,28 +138,35 @@ impl Functions { call_id: u32, call: CallRequestParams, waker: Waker, + span: &Span, ) -> BoxFuture<'static, SingleCallResult> { + let arg_span = span.clone(); // Deserialize params let args = match Args::try_from(call) { Ok(args) => args, Err(err) => { - let result = CallServiceResult { - ret_code: 1, - result: json!(format!( - "Failed to deserialize CallRequestParams to Args: {err}" - )), - }; - let result = SingleCallResult { - call_id, - result, - stat: SingleCallStat { - call_time: None, - wait_time: None, - success: false, - kind: FunctionKind::NotHappened, - }, - }; - return async move { result }.boxed(); + return async move { + let result = CallServiceResult { + ret_code: 1, + result: json!(format!( + "Failed to deserialize CallRequestParams to Args: {err}" + )), + }; + let result = SingleCallResult { + call_id, + result, + stat: SingleCallStat { + call_time: None, + wait_time: None, + success: false, + kind: FunctionKind::NotHappened, + }, + span: arg_span, + }; + result + } + .in_current_span() + .boxed(); } }; @@ -163,7 +181,7 @@ impl Functions { let params = self.particle.clone(); let builtins = self.builtins.clone(); let particle_function = self.particle_function.clone(); - let span = tracing::span!(tracing::Level::INFO, "Function"); + let async_span = tracing::info_span!(parent: arg_span, "Particle functions async call"); let schedule_wait_start = Instant::now(); let result = tokio::task::Builder::new() .name(&format!( @@ -201,6 +219,7 @@ impl Functions { }) .expect("Could not spawn task"); + let span = span.clone(); async move { let (result, call_kind, call_time, wait_time) = result.await.expect("Could not 'Call function' join"); @@ -251,9 +270,10 @@ impl Functions { call_id, result, stat: stats, + span, } } - .instrument(span) + .instrument(async_span) .boxed() } } diff --git a/aquamarine/src/plumber.rs b/aquamarine/src/plumber.rs index 0316e6896c..c64c0d68e5 100644 --- a/aquamarine/src/plumber.rs +++ b/aquamarine/src/plumber.rs @@ -294,7 +294,7 @@ impl Plumber { for effect in local_effects { for local_peer in effect.next_peers { - let span = tracing::info_span!(parent: effect.particle.span.as_ref(), "Plumber: routing effect ingest"); + let span = tracing::info_span!(parent: &effect.particle.span, "Plumber: routing effect ingest"); let _guard = span.enter(); self.ingest(effect.particle.clone(), None, local_peer); } diff --git a/connection-pool/src/behaviour.rs b/connection-pool/src/behaviour.rs index 0c054ff3be..c9bfb18fcf 100644 --- a/connection-pool/src/behaviour.rs +++ b/connection-pool/src/behaviour.rs @@ -37,7 +37,6 @@ use std::{ collections::{hash_map::Entry, HashMap, HashSet, VecDeque}, task::{Context, Poll, Waker}, }; -use std::sync::Arc; use tokio::sync::{mpsc, oneshot}; use tokio_stream::wrappers::UnboundedReceiverStream; use tokio_util::sync::PollSender; @@ -217,8 +216,7 @@ impl ConnectionPoolBehaviour { particle: ExtendedParticle, outlet: oneshot::Sender, ) { - tracing::info!("Current arc count {}", Arc::strong_count(&particle.span)); - let span = tracing::info_span!(parent: particle.span.as_ref(), "Connection pool behaviour: send"); + let span = tracing::info_span!(parent: &particle.span, "Connection pool behaviour: send"); let _guard = span.enter(); if to.peer_id == self.peer_id { // If particle is sent to the current node, process it locally @@ -630,7 +628,7 @@ impl NetworkBehaviour for ConnectionPoolBehaviour { }); self.queue.push_back(ExtendedParticle { particle, - span: Arc::new(root_span), + span: root_span, }); self.wake(); } @@ -652,7 +650,7 @@ impl NetworkBehaviour for ConnectionPoolBehaviour { // channel is ready to consume more particles, so send them if let Some(particle) = self.queue.pop_front() { let particle_id = particle.particle.id.clone(); - let _span = tracing::info_span!(parent: particle.span.as_ref(), "Connection pool: send to outlet").entered(); + let _span = tracing::info_span!(parent: &particle.span, "Connection pool: send to outlet").entered(); if let Err(err) = outlet.start_send(particle) { tracing::error!( diff --git a/nox/src/dispatcher.rs b/nox/src/dispatcher.rs index 80fcbec182..f191800bc2 100644 --- a/nox/src/dispatcher.rs +++ b/nox/src/dispatcher.rs @@ -92,8 +92,8 @@ impl Dispatcher { let metrics = self.metrics; particle_stream .for_each_concurrent(parallelism, move |particle| { - let current_span = tracing::info_span!(parent: particle.span.as_ref(), "Dispatcher: process particle"); - let async_span = tracing::info_span!(parent: particle.span.as_ref(), "Dispatcher: async Aquamarine.execute"); + let current_span = tracing::info_span!(parent: &particle.span, "Dispatcher: process particle"); + let async_span = tracing::info_span!(parent: &particle.span, "Dispatcher: async Aquamarine.execute"); let _ = current_span.enter(); let aquamarine = aquamarine.clone(); let metrics = metrics.clone(); @@ -135,7 +135,7 @@ impl Dispatcher { async move { match effects { Ok(effects) => { - let span = tracing::info_span!(parent: effects.particle.span.as_ref(), "Dispatcher: execute effectors"); + let span = tracing::info_span!(parent: &effects.particle.span, "Dispatcher: execute effectors"); // perform effects as instructed by aquamarine effectors.execute(effects).instrument(span).await; } diff --git a/particle-protocol/src/particle.rs b/particle-protocol/src/particle.rs index 71c9c528c8..75d2f8eec7 100644 --- a/particle-protocol/src/particle.rs +++ b/particle-protocol/src/particle.rs @@ -15,7 +15,6 @@ */ use std::convert::TryInto; -use std::sync::Arc; use std::time::Duration; use derivative::Derivative; @@ -34,7 +33,7 @@ use now_millis::now_ms; #[derive(Clone, Debug)] pub struct ExtendedParticle { pub particle: Particle, - pub span: Arc, + pub span: Span, } diff --git a/sorcerer/src/script_executor.rs b/sorcerer/src/script_executor.rs index da8f533ce8..f39cda35bc 100644 --- a/sorcerer/src/script_executor.rs +++ b/sorcerer/src/script_executor.rs @@ -13,7 +13,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -use std::sync::Arc; use fluence_libp2p::PeerId; use tracing::{instrument, Instrument, Span}; @@ -122,7 +121,7 @@ impl Sorcerer { .execute( ExtendedParticle { particle, - span: Arc::new(particle_span), + span: particle_span, }, None, ) From 04a33782812ded18aa3980c3fe9d1cca64db4caa Mon Sep 17 00:00:00 2001 From: Nick Date: Mon, 27 Nov 2023 20:51:54 +0300 Subject: [PATCH 05/33] wip --- aquamarine/src/actor.rs | 23 ++++++++++++++------- aquamarine/src/aquamarine.rs | 12 ++++++----- aquamarine/src/command.rs | 2 +- aquamarine/src/particle_effects.rs | 2 +- aquamarine/src/particle_executor.rs | 8 ++++--- aquamarine/src/particle_functions.rs | 31 +++++++++++++--------------- connection-pool/src/api.rs | 2 +- connection-pool/src/behaviour.rs | 3 ++- nox/src/connectivity.rs | 8 +++++-- nox/src/dispatcher.rs | 2 +- nox/src/effectors.rs | 5 ++++- nox/src/node.rs | 2 +- particle-protocol/src/lib.rs | 2 +- particle-protocol/src/particle.rs | 1 - sorcerer/src/script_executor.rs | 1 + 15 files changed, 60 insertions(+), 44 deletions(-) diff --git a/aquamarine/src/actor.rs b/aquamarine/src/actor.rs index 5b878de578..f6123f2742 100644 --- a/aquamarine/src/actor.rs +++ b/aquamarine/src/actor.rs @@ -154,7 +154,8 @@ where let waker = cx.waker().clone(); // Schedule execution of functions - self.functions.execute(self.particle.id.clone(), effects.call_requests, waker, &span); + self.functions + .execute(self.particle.id.clone(), effects.call_requests, waker); let effects = RoutingEffects { particle: ExtendedParticle { @@ -216,32 +217,38 @@ where let peer_id = self.current_peer_id; // TODO: get rid of this clone by recovering key_pair after `vm.execute` (not trivial to implement) let key_pair = self.key_pair.clone(); + let async_span = ext_particle .as_ref() - .map(|p| tracing::info_span!(parent: &p.span, "Actor async AVM process particle with call results")) - .unwrap_or_else(|| tracing::info_span!("Actor async AVM process call results")); + .map(|p| tracing::info_span!(parent: &p.span, "Actor: async AVM process particle with call results", particle_id = particle.id)) + .unwrap_or_else(|| tracing::info_span!("Actor: async AVM process call results", particle_id = particle.id)); + + let _guard = async_span.enter(); for span in spans { + let local_span = span.clone(); + let _guard = local_span.enter(); //we explicitly call enter to prevent panic async_span.follows_from(span); } // TODO: add timeout for execution https://github.com/fluencelabs/fluence/issues/1212 self.future = Some( async move { - let span = async_span.clone(); - let _guard = span.enter(); + let span = Span::current(); + + let local_span = tracing::info_span!(parent: &span, "Actor: vm execute"); + let _guard = local_span.enter(); let res = vm .execute((particle, calls), waker, peer_id, key_pair) - .in_current_span() .await; let reusables = Reusables { vm_id, vm: res.runtime, }; - (reusables, res.effects, res.stats, async_span) + (reusables, res.effects, res.stats, span) } - //.instrument(async_span) + .in_current_span() .boxed(), ); self.wake(); diff --git a/aquamarine/src/aquamarine.rs b/aquamarine/src/aquamarine.rs index 62fe314cf9..b171437e44 100644 --- a/aquamarine/src/aquamarine.rs +++ b/aquamarine/src/aquamarine.rs @@ -21,13 +21,13 @@ use std::time::Duration; use futures::StreamExt; use tokio::sync::mpsc; use tokio::task::JoinHandle; -use tracing::{Instrument, instrument}; +use tracing::{instrument, Instrument}; use fluence_libp2p::PeerId; use health::HealthCheckRegistry; use key_manager::KeyManager; use particle_execution::{ParticleFunctionStatic, ServiceFunction}; -use particle_protocol::{ExtendedParticle}; +use particle_protocol::ExtendedParticle; use peer_metrics::{ParticleExecutorMetrics, VmPoolMetrics}; use crate::aqua_runtime::AquaRuntime; @@ -88,8 +88,9 @@ impl AquamarineBackend { match self.inlet.poll_recv(cx) { Poll::Ready(Some(Ingest { particle, function })) => { wake = true; - let span = tracing::info_span!(parent: &particle.span, "Aquamarine: Poll Ingest"); - let _guard =span.entered(); + let span = + tracing::info_span!(parent: &particle.span, "Aquamarine: Poll Ingest"); + let _guard = span.entered(); // set new particle to be executed // every particle that comes from the connection pool first executed on the host peer id self.plumber.ingest(particle, function, self.host_peer_id); @@ -206,6 +207,7 @@ impl AquamarineApi { log::error!("Aquamarine outlet died!"); AquamarineDied { particle_id } }) - }.in_current_span() + } + .in_current_span() } } diff --git a/aquamarine/src/command.rs b/aquamarine/src/command.rs index 802c3ac6f7..5d7b516116 100644 --- a/aquamarine/src/command.rs +++ b/aquamarine/src/command.rs @@ -15,7 +15,7 @@ */ use particle_execution::ServiceFunction; -use particle_protocol::{ExtendedParticle}; +use particle_protocol::ExtendedParticle; use std::collections::HashMap; pub enum Command { diff --git a/aquamarine/src/particle_effects.rs b/aquamarine/src/particle_effects.rs index 250fdee86b..73aa103e3f 100644 --- a/aquamarine/src/particle_effects.rs +++ b/aquamarine/src/particle_effects.rs @@ -15,7 +15,7 @@ */ use avm_server::CallRequests; -use particle_protocol::{ExtendedParticle}; +use particle_protocol::ExtendedParticle; use std::time::Duration; use libp2p::PeerId; diff --git a/aquamarine/src/particle_executor.rs b/aquamarine/src/particle_executor.rs index 55b059ea18..bd98132ae6 100644 --- a/aquamarine/src/particle_executor.rs +++ b/aquamarine/src/particle_executor.rs @@ -22,7 +22,7 @@ use avm_server::{CallResults, ParticleParameters}; use fluence_keypair::KeyPair; use futures::{future::BoxFuture, FutureExt}; use humantime::format_duration as pretty; -use tracing::{instrument, Instrument}; +use tracing::{instrument, Instrument, Span}; use fluence_libp2p::PeerId; use particle_protocol::Particle; @@ -68,11 +68,13 @@ impl ParticleExecutor for RT { current_peer_id: PeerId, key_pair: KeyPair, ) -> Self::Future { + let span = Span::current(); let (particle, calls) = p; let particle_id = particle.id.clone(); let data_len = particle.data.len(); - let blockng_span = tracing::info_span!("Particle executor: blocking AVM call"); - let async_span = tracing::info_span!("Particle executor: async task"); + let blockng_span = + tracing::info_span!(parent: &span, "Particle executor: blocking AVM call"); + let async_span = tracing::info_span!(parent: &span, "Particle executor: async task"); let task = tokio::task::Builder::new() .name(&format!("Particle {}", particle.id)) .spawn_blocking(move || { diff --git a/aquamarine/src/particle_functions.rs b/aquamarine/src/particle_functions.rs index c45fdbe5c1..125b5df649 100644 --- a/aquamarine/src/particle_functions.rs +++ b/aquamarine/src/particle_functions.rs @@ -98,16 +98,10 @@ impl Functions { /// Add a bunch of call requests to execution #[instrument(level = tracing::Level::INFO, skip_all)] - pub fn execute( - &mut self, - particle_id: String, - requests: CallRequests, - waker: Waker, - span: &Span, - ) { + pub fn execute(&mut self, particle_id: String, requests: CallRequests, waker: Waker) { let futs: Vec<_> = requests .into_iter() - .map(|(id, call)| self.call(particle_id.clone(), id, call, waker.clone(), span)) + .map(|(id, call)| self.call(particle_id.clone(), id, call, waker.clone())) .collect(); self.function_calls.extend(futs); } @@ -132,27 +126,31 @@ impl Functions { // I see the main obstacle to cooperation in streaming results to `self.call_results`. // Streaming can be done through an MPSC channel, but it seems like an overkill. Though // maybe it's a good option. + #[instrument(level = tracing::Level::INFO, skip_all)] fn call( &self, particle_id: String, call_id: u32, call: CallRequestParams, waker: Waker, - span: &Span, ) -> BoxFuture<'static, SingleCallResult> { - let arg_span = span.clone(); + let span = Span::current(); + let local_span = tracing::info_span!(parent: span, "Particle functions: call"); + let _guard = local_span.enter(); // Deserialize params let args = match Args::try_from(call) { Ok(args) => args, Err(err) => { return async move { + let span = Span::current(); + let result = CallServiceResult { ret_code: 1, result: json!(format!( "Failed to deserialize CallRequestParams to Args: {err}" )), }; - let result = SingleCallResult { + SingleCallResult { call_id, result, stat: SingleCallStat { @@ -161,9 +159,8 @@ impl Functions { success: false, kind: FunctionKind::NotHappened, }, - span: arg_span, - }; - result + span, + } } .in_current_span() .boxed(); @@ -181,7 +178,6 @@ impl Functions { let params = self.particle.clone(); let builtins = self.builtins.clone(); let particle_function = self.particle_function.clone(); - let async_span = tracing::info_span!(parent: arg_span, "Particle functions async call"); let schedule_wait_start = Instant::now(); let result = tokio::task::Builder::new() .name(&format!( @@ -219,8 +215,9 @@ impl Functions { }) .expect("Could not spawn task"); - let span = span.clone(); async move { + let span = Span::current(); + let (result, call_kind, call_time, wait_time) = result.await.expect("Could not 'Call function' join"); @@ -273,7 +270,7 @@ impl Functions { span, } } - .instrument(async_span) + .in_current_span() .boxed() } } diff --git a/connection-pool/src/api.rs b/connection-pool/src/api.rs index 4054a9598f..f1867648fe 100644 --- a/connection-pool/src/api.rs +++ b/connection-pool/src/api.rs @@ -22,8 +22,8 @@ use tokio::sync::{mpsc, oneshot}; use tokio_stream::wrappers::UnboundedReceiverStream; use tracing::instrument; +use particle_protocol::ExtendedParticle; use particle_protocol::{Contact, SendStatus}; -use particle_protocol::{ExtendedParticle}; use crate::connection_pool::LifecycleEvent; use crate::ConnectionPoolT; diff --git a/connection-pool/src/behaviour.rs b/connection-pool/src/behaviour.rs index c9bfb18fcf..f54f75cb98 100644 --- a/connection-pool/src/behaviour.rs +++ b/connection-pool/src/behaviour.rs @@ -617,7 +617,8 @@ impl NetworkBehaviour for ConnectionPoolBehaviour { Ok(HandlerMessage::InParticle(particle)) => { tracing::info!(target: "network", particle_id = particle.id,"{}: received particle from {}; queue {}", self.peer_id, from, self.queue.len()); let root_span = tracing::info_span!("Particle", particle_id = particle.id); - let _ = root_span.enter(); + let local_span = root_span.clone(); + let _guard = local_span.enter(); self.meter(|m| { m.incoming_particle( diff --git a/nox/src/connectivity.rs b/nox/src/connectivity.rs index 49e27a6b4f..6cb7a5e7eb 100644 --- a/nox/src/connectivity.rs +++ b/nox/src/connectivity.rs @@ -28,7 +28,7 @@ use libp2p::Multiaddr; use particle_protocol::{Contact, ExtendedParticle, SendStatus}; use peer_metrics::{ConnectivityMetrics, Resolution}; use tokio::time::sleep; -use tracing::{Instrument, instrument, Span}; +use tracing::{instrument, Instrument, Span}; use crate::tasks::Tasks; @@ -127,7 +127,11 @@ impl Connectivity { #[instrument(level = tracing::Level::INFO, skip_all)] pub async fn send(&self, contact: Contact, particle: ExtendedParticle) -> bool { - tracing::debug!(particle_id = particle.particle.id, "Sending particle to {}", contact); + tracing::debug!( + particle_id = particle.particle.id, + "Sending particle to {}", + contact + ); let metrics = self.metrics.as_ref(); let id = particle.particle.id.clone(); let sent = self.connection_pool.send(contact.clone(), particle).await; diff --git a/nox/src/dispatcher.rs b/nox/src/dispatcher.rs index f191800bc2..9419447f24 100644 --- a/nox/src/dispatcher.rs +++ b/nox/src/dispatcher.rs @@ -22,7 +22,7 @@ use peer_metrics::DispatcherMetrics; use prometheus_client::registry::Registry; use tokio::sync::mpsc; use tokio_stream::wrappers::{ReceiverStream, UnboundedReceiverStream}; -use tracing::{Instrument, instrument}; +use tracing::{instrument, Instrument}; use crate::effectors::Effectors; use crate::tasks::Tasks; diff --git a/nox/src/effectors.rs b/nox/src/effectors.rs index 124621eddb..fc13c09488 100644 --- a/nox/src/effectors.rs +++ b/nox/src/effectors.rs @@ -48,7 +48,10 @@ impl Effectors { let particle = particle.clone(); async move { // resolve contact - if let Some(contact) = connectivity.resolve_contact(target, &particle.particle.id).await { + if let Some(contact) = connectivity + .resolve_contact(target, &particle.particle.id) + .await + { // forward particle let sent = connectivity.send(contact, particle).await; if sent { diff --git a/nox/src/node.rs b/nox/src/node.rs index 5cf92fdb3b..6cd73c5e6b 100644 --- a/nox/src/node.rs +++ b/nox/src/node.rs @@ -39,7 +39,7 @@ use libp2p_connection_limits::ConnectionLimits; use libp2p_metrics::{Metrics, Recorder}; use particle_builtins::{Builtins, CustomService, NodeInfo}; use particle_execution::ParticleFunctionStatic; -use particle_protocol::{ExtendedParticle}; +use particle_protocol::ExtendedParticle; use peer_metrics::{ ConnectionPoolMetrics, ConnectivityMetrics, ParticleExecutorMetrics, ServicesMetrics, ServicesMetricsBackend, SpellMetrics, VmPoolMetrics, diff --git a/particle-protocol/src/lib.rs b/particle-protocol/src/lib.rs index 4f18c54fca..9c674735c5 100644 --- a/particle-protocol/src/lib.rs +++ b/particle-protocol/src/lib.rs @@ -42,7 +42,7 @@ pub use libp2p_protocol::message::CompletionChannel; pub use libp2p_protocol::message::SendStatus; pub use libp2p_protocol::message::{HandlerMessage, ProtocolMessage}; pub use libp2p_protocol::upgrade::ProtocolConfig; -pub use particle::Particle; pub use particle::ExtendedParticle; +pub use particle::Particle; pub const PROTOCOL_NAME: &str = "/fluence/particle/2.0.0"; diff --git a/particle-protocol/src/particle.rs b/particle-protocol/src/particle.rs index 75d2f8eec7..5bf475bf5e 100644 --- a/particle-protocol/src/particle.rs +++ b/particle-protocol/src/particle.rs @@ -36,7 +36,6 @@ pub struct ExtendedParticle { pub span: Span, } - #[derive(Clone, Serialize, Deserialize, PartialEq, Derivative)] #[derivative(Debug)] pub struct Particle { diff --git a/sorcerer/src/script_executor.rs b/sorcerer/src/script_executor.rs index f39cda35bc..e08b354bd7 100644 --- a/sorcerer/src/script_executor.rs +++ b/sorcerer/src/script_executor.rs @@ -114,6 +114,7 @@ impl Sorcerer { } let particle_span = Span::current(); + let async_span = particle_span.clone(); self.aquamarine From d2385ea00533aac42309d433939997ab19a050d7 Mon Sep 17 00:00:00 2001 From: Nick Date: Mon, 27 Nov 2023 23:38:11 +0300 Subject: [PATCH 06/33] wip --- Cargo.lock | 1 + aquamarine/src/actor.rs | 55 +++++++++++++++++----------- aquamarine/src/aquamarine.rs | 2 +- aquamarine/src/particle_functions.rs | 18 ++++----- aquamarine/src/plumber.rs | 2 +- connection-pool/src/behaviour.rs | 9 ++--- nox/Cargo.toml | 1 + nox/src/dispatcher.rs | 6 +-- particle-protocol/src/particle.rs | 3 +- sorcerer/src/script_executor.rs | 5 ++- 10 files changed, 57 insertions(+), 45 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2ba2af94a7..fd6237d2cc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4659,6 +4659,7 @@ dependencies = [ "system-services", "tokio", "tokio-stream", + "tonic 0.9.2", "tracing", "tracing-log", "tracing-logfmt", diff --git a/aquamarine/src/actor.rs b/aquamarine/src/actor.rs index f6123f2742..52b79fd9bd 100644 --- a/aquamarine/src/actor.rs +++ b/aquamarine/src/actor.rs @@ -14,6 +14,8 @@ * limitations under the License. */ +use std::ops::Deref; +use std::sync::Arc; use std::{ collections::VecDeque, task::{Context, Poll, Waker}, @@ -42,7 +44,17 @@ struct Reusables { pub struct Actor { /// Particle of that actor is expired after that deadline deadline: Deadline, - future: Option, ParticleEffects, InterpretationStats, Span)>>, + future: Option< + BoxFuture< + 'static, + ( + Reusables, + ParticleEffects, + InterpretationStats, + Arc, + ), + >, + >, mailbox: VecDeque, waker: Option, functions: Functions, @@ -148,14 +160,18 @@ where if let Some(Ready((reusables, effects, stats, span))) = self.future.as_mut().map(|f| f.poll_unpin(cx)) { - let local_span = tracing::info_span!(parent: &span, "Poll AVM future"); + let local_span = tracing::info_span!(parent: span.as_ref(), "Poll AVM future"); let _span_guard = local_span.enter(); self.future.take(); let waker = cx.waker().clone(); // Schedule execution of functions - self.functions - .execute(self.particle.id.clone(), effects.call_requests, waker); + self.functions.execute( + self.particle.id.clone(), + effects.call_requests, + waker, + span.clone(), + ); let effects = RoutingEffects { particle: ExtendedParticle { @@ -218,37 +234,34 @@ where // TODO: get rid of this clone by recovering key_pair after `vm.execute` (not trivial to implement) let key_pair = self.key_pair.clone(); - let async_span = ext_particle - .as_ref() - .map(|p| tracing::info_span!(parent: &p.span, "Actor: async AVM process particle with call results", particle_id = particle.id)) - .unwrap_or_else(|| tracing::info_span!("Actor: async AVM process call results", particle_id = particle.id)); - - let _guard = async_span.enter(); - + let async_span = if let Some(ext_particle) = ext_particle.as_ref() { + tracing::info_span!(parent: ext_particle.span.as_ref(),"Actor: async AVM process particle & call results",particle_id = particle.id) + } else { + tracing::info_span!( + "Actor: async AVM process particle & call results", + particle_id = particle.id + ) + }; for span in spans { - let local_span = span.clone(); - let _guard = local_span.enter(); //we explicitly call enter to prevent panic - async_span.follows_from(span); + async_span.follows_from(span.as_ref()); } - + let async_span = Arc::new(async_span); + let moved_async_span = async_span.clone(); // TODO: add timeout for execution https://github.com/fluencelabs/fluence/issues/1212 self.future = Some( async move { - let span = Span::current(); - - let local_span = tracing::info_span!(parent: &span, "Actor: vm execute"); - let _guard = local_span.enter(); let res = vm .execute((particle, calls), waker, peer_id, key_pair) + .in_current_span() .await; let reusables = Reusables { vm_id, vm: res.runtime, }; - (reusables, res.effects, res.stats, span) + (reusables, res.effects, res.stats, moved_async_span) } - .in_current_span() + .instrument(async_span.clone().deref().clone()) .boxed(), ); self.wake(); diff --git a/aquamarine/src/aquamarine.rs b/aquamarine/src/aquamarine.rs index b171437e44..b95b68d672 100644 --- a/aquamarine/src/aquamarine.rs +++ b/aquamarine/src/aquamarine.rs @@ -89,7 +89,7 @@ impl AquamarineBackend { Poll::Ready(Some(Ingest { particle, function })) => { wake = true; let span = - tracing::info_span!(parent: &particle.span, "Aquamarine: Poll Ingest"); + tracing::info_span!(parent: particle.span.as_ref(), "Aquamarine: Poll Ingest"); let _guard = span.entered(); // set new particle to be executed // every particle that comes from the connection pool first executed on the host peer id diff --git a/aquamarine/src/particle_functions.rs b/aquamarine/src/particle_functions.rs index 125b5df649..01bb430c26 100644 --- a/aquamarine/src/particle_functions.rs +++ b/aquamarine/src/particle_functions.rs @@ -55,7 +55,7 @@ pub struct SingleCallResult { call_id: u32, result: CallServiceResult, stat: SingleCallStat, - span: Span, + span: Arc, } pub struct Functions { @@ -64,7 +64,7 @@ pub struct Functions { function_calls: FuturesUnordered>, call_results: CallResults, call_stats: Vec, - call_spans: Vec, + call_spans: Vec>, particle_function: Option>>, } @@ -98,16 +98,16 @@ impl Functions { /// Add a bunch of call requests to execution #[instrument(level = tracing::Level::INFO, skip_all)] - pub fn execute(&mut self, particle_id: String, requests: CallRequests, waker: Waker) { + pub fn execute(&mut self, particle_id: String, requests: CallRequests, waker: Waker, span: Arc) { let futs: Vec<_> = requests .into_iter() - .map(|(id, call)| self.call(particle_id.clone(), id, call, waker.clone())) + .map(|(id, call)| self.call(particle_id.clone(), id, call, waker.clone(), span.clone())) .collect(); self.function_calls.extend(futs); } /// Retrieve all existing call results - pub fn drain(&mut self) -> (CallResults, Vec, Vec) { + pub fn drain(&mut self) -> (CallResults, Vec, Vec>) { let call_results = std::mem::take(&mut self.call_results); let stats = std::mem::take(&mut self.call_stats); let call_spans = std::mem::take(&mut self.call_spans); @@ -133,17 +133,15 @@ impl Functions { call_id: u32, call: CallRequestParams, waker: Waker, + span: Arc, ) -> BoxFuture<'static, SingleCallResult> { - let span = Span::current(); - let local_span = tracing::info_span!(parent: span, "Particle functions: call"); + let local_span = tracing::info_span!(parent: span.as_ref(), "Particle functions: call"); let _guard = local_span.enter(); // Deserialize params let args = match Args::try_from(call) { Ok(args) => args, Err(err) => { return async move { - let span = Span::current(); - let result = CallServiceResult { ret_code: 1, result: json!(format!( @@ -216,8 +214,6 @@ impl Functions { .expect("Could not spawn task"); async move { - let span = Span::current(); - let (result, call_kind, call_time, wait_time) = result.await.expect("Could not 'Call function' join"); diff --git a/aquamarine/src/plumber.rs b/aquamarine/src/plumber.rs index c64c0d68e5..0316e6896c 100644 --- a/aquamarine/src/plumber.rs +++ b/aquamarine/src/plumber.rs @@ -294,7 +294,7 @@ impl Plumber { for effect in local_effects { for local_peer in effect.next_peers { - let span = tracing::info_span!(parent: &effect.particle.span, "Plumber: routing effect ingest"); + let span = tracing::info_span!(parent: effect.particle.span.as_ref(), "Plumber: routing effect ingest"); let _guard = span.enter(); self.ingest(effect.particle.clone(), None, local_peer); } diff --git a/connection-pool/src/behaviour.rs b/connection-pool/src/behaviour.rs index f54f75cb98..8a34e96ab7 100644 --- a/connection-pool/src/behaviour.rs +++ b/connection-pool/src/behaviour.rs @@ -37,6 +37,7 @@ use std::{ collections::{hash_map::Entry, HashMap, HashSet, VecDeque}, task::{Context, Poll, Waker}, }; +use std::sync::Arc; use tokio::sync::{mpsc, oneshot}; use tokio_stream::wrappers::UnboundedReceiverStream; use tokio_util::sync::PollSender; @@ -216,7 +217,7 @@ impl ConnectionPoolBehaviour { particle: ExtendedParticle, outlet: oneshot::Sender, ) { - let span = tracing::info_span!(parent: &particle.span, "Connection pool behaviour: send"); + let span = tracing::info_span!(parent: particle.span.as_ref(), "Connection pool behaviour: send"); let _guard = span.enter(); if to.peer_id == self.peer_id { // If particle is sent to the current node, process it locally @@ -617,8 +618,6 @@ impl NetworkBehaviour for ConnectionPoolBehaviour { Ok(HandlerMessage::InParticle(particle)) => { tracing::info!(target: "network", particle_id = particle.id,"{}: received particle from {}; queue {}", self.peer_id, from, self.queue.len()); let root_span = tracing::info_span!("Particle", particle_id = particle.id); - let local_span = root_span.clone(); - let _guard = local_span.enter(); self.meter(|m| { m.incoming_particle( @@ -629,7 +628,7 @@ impl NetworkBehaviour for ConnectionPoolBehaviour { }); self.queue.push_back(ExtendedParticle { particle, - span: root_span, + span: Arc::new(root_span), }); self.wake(); } @@ -651,7 +650,7 @@ impl NetworkBehaviour for ConnectionPoolBehaviour { // channel is ready to consume more particles, so send them if let Some(particle) = self.queue.pop_front() { let particle_id = particle.particle.id.clone(); - let _span = tracing::info_span!(parent: &particle.span, "Connection pool: send to outlet").entered(); + let _span = tracing::info_span!(parent: particle.span.as_ref(), "Connection pool: send to outlet").entered(); if let Err(err) = outlet.start_send(particle) { tracing::error!( diff --git a/nox/Cargo.toml b/nox/Cargo.toml index 23a30c7ee8..b609360cdf 100644 --- a/nox/Cargo.toml +++ b/nox/Cargo.toml @@ -66,6 +66,7 @@ opentelemetry_sdk = { version = "0.21.1", features = ["rt-tokio-current-thread"] opentelemetry-otlp = "0.14.0" opentelemetry-stdout = { version = "0.2.0", features = ["trace"] } once_cell = { workspace = true } +tonic = "0.9.2" [dev-dependencies] parking_lot = { workspace = true } diff --git a/nox/src/dispatcher.rs b/nox/src/dispatcher.rs index 9419447f24..193f5b06ac 100644 --- a/nox/src/dispatcher.rs +++ b/nox/src/dispatcher.rs @@ -92,8 +92,8 @@ impl Dispatcher { let metrics = self.metrics; particle_stream .for_each_concurrent(parallelism, move |particle| { - let current_span = tracing::info_span!(parent: &particle.span, "Dispatcher: process particle"); - let async_span = tracing::info_span!(parent: &particle.span, "Dispatcher: async Aquamarine.execute"); + let current_span = tracing::info_span!(parent: particle.span.as_ref(), "Dispatcher: process particle"); + let async_span = tracing::info_span!(parent: particle.span.as_ref(), "Dispatcher: async Aquamarine.execute"); let _ = current_span.enter(); let aquamarine = aquamarine.clone(); let metrics = metrics.clone(); @@ -135,7 +135,7 @@ impl Dispatcher { async move { match effects { Ok(effects) => { - let span = tracing::info_span!(parent: &effects.particle.span, "Dispatcher: execute effectors"); + let span = tracing::info_span!(parent: effects.particle.span.as_ref(), "Dispatcher: execute effectors"); // perform effects as instructed by aquamarine effectors.execute(effects).instrument(span).await; } diff --git a/particle-protocol/src/particle.rs b/particle-protocol/src/particle.rs index 5bf475bf5e..f0cb41e3de 100644 --- a/particle-protocol/src/particle.rs +++ b/particle-protocol/src/particle.rs @@ -15,6 +15,7 @@ */ use std::convert::TryInto; +use std::sync::Arc; use std::time::Duration; use derivative::Derivative; @@ -33,7 +34,7 @@ use now_millis::now_ms; #[derive(Clone, Debug)] pub struct ExtendedParticle { pub particle: Particle, - pub span: Span, + pub span: Arc, } #[derive(Clone, Serialize, Deserialize, PartialEq, Derivative)] diff --git a/sorcerer/src/script_executor.rs b/sorcerer/src/script_executor.rs index e08b354bd7..6d2fad748f 100644 --- a/sorcerer/src/script_executor.rs +++ b/sorcerer/src/script_executor.rs @@ -14,6 +14,7 @@ * limitations under the License. */ use fluence_libp2p::PeerId; +use std::sync::Arc; use tracing::{instrument, Instrument, Span}; use crate::error::SorcererError::{ParticleSigningFailed, ScopeKeypairMissing}; @@ -113,9 +114,9 @@ impl Sorcerer { m.observe_spell_cast(); } - let particle_span = Span::current(); + let particle_span = Arc::new(Span::current()); - let async_span = particle_span.clone(); + let async_span = tracing::info_span!(parent: particle_span.as_ref(), "Script executor: aquamarine async execute"); self.aquamarine .clone() From abc775e91500822810845ae121537801ebc61900 Mon Sep 17 00:00:00 2001 From: Nick Date: Mon, 27 Nov 2023 23:38:16 +0300 Subject: [PATCH 07/33] wip --- aquamarine/src/aquamarine.rs | 3 +-- aquamarine/src/particle_functions.rs | 8 +++++++- connection-pool/src/behaviour.rs | 5 +++-- 3 files changed, 11 insertions(+), 5 deletions(-) diff --git a/aquamarine/src/aquamarine.rs b/aquamarine/src/aquamarine.rs index b95b68d672..c54b8a9eb2 100644 --- a/aquamarine/src/aquamarine.rs +++ b/aquamarine/src/aquamarine.rs @@ -88,8 +88,7 @@ impl AquamarineBackend { match self.inlet.poll_recv(cx) { Poll::Ready(Some(Ingest { particle, function })) => { wake = true; - let span = - tracing::info_span!(parent: particle.span.as_ref(), "Aquamarine: Poll Ingest"); + let span = tracing::info_span!(parent: particle.span.as_ref(), "Aquamarine: Poll Ingest"); let _guard = span.entered(); // set new particle to be executed // every particle that comes from the connection pool first executed on the host peer id diff --git a/aquamarine/src/particle_functions.rs b/aquamarine/src/particle_functions.rs index 01bb430c26..b495db2c2a 100644 --- a/aquamarine/src/particle_functions.rs +++ b/aquamarine/src/particle_functions.rs @@ -98,7 +98,13 @@ impl Functions { /// Add a bunch of call requests to execution #[instrument(level = tracing::Level::INFO, skip_all)] - pub fn execute(&mut self, particle_id: String, requests: CallRequests, waker: Waker, span: Arc) { + pub fn execute( + &mut self, + particle_id: String, + requests: CallRequests, + waker: Waker, + span: Arc, + ) { let futs: Vec<_> = requests .into_iter() .map(|(id, call)| self.call(particle_id.clone(), id, call, waker.clone(), span.clone())) diff --git a/connection-pool/src/behaviour.rs b/connection-pool/src/behaviour.rs index 8a34e96ab7..bdbc8f29f9 100644 --- a/connection-pool/src/behaviour.rs +++ b/connection-pool/src/behaviour.rs @@ -33,11 +33,11 @@ use particle_protocol::{ }; use peer_metrics::ConnectionPoolMetrics; use std::pin::Pin; +use std::sync::Arc; use std::{ collections::{hash_map::Entry, HashMap, HashSet, VecDeque}, task::{Context, Poll, Waker}, }; -use std::sync::Arc; use tokio::sync::{mpsc, oneshot}; use tokio_stream::wrappers::UnboundedReceiverStream; use tokio_util::sync::PollSender; @@ -217,7 +217,8 @@ impl ConnectionPoolBehaviour { particle: ExtendedParticle, outlet: oneshot::Sender, ) { - let span = tracing::info_span!(parent: particle.span.as_ref(), "Connection pool behaviour: send"); + let span = + tracing::info_span!(parent: particle.span.as_ref(), "Connection pool behaviour: send"); let _guard = span.enter(); if to.peer_id == self.peer_id { // If particle is sent to the current node, process it locally From 0d808052396ccb530ec3dc863e2cf0298cbfd6ca Mon Sep 17 00:00:00 2001 From: Nick Date: Tue, 28 Nov 2023 00:12:18 +0300 Subject: [PATCH 08/33] wip --- aquamarine/src/actor.rs | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/aquamarine/src/actor.rs b/aquamarine/src/actor.rs index 52b79fd9bd..53d2cfddf8 100644 --- a/aquamarine/src/actor.rs +++ b/aquamarine/src/actor.rs @@ -14,7 +14,6 @@ * limitations under the License. */ -use std::ops::Deref; use std::sync::Arc; use std::{ collections::VecDeque, @@ -238,15 +237,18 @@ where tracing::info_span!(parent: ext_particle.span.as_ref(),"Actor: async AVM process particle & call results",particle_id = particle.id) } else { tracing::info_span!( - "Actor: async AVM process particle & call results", + "Actor: async AVM process call results", particle_id = particle.id ) }; for span in spans { async_span.follows_from(span.as_ref()); } - let async_span = Arc::new(async_span); - let moved_async_span = async_span.clone(); + let linking_span = ext_particle + .as_ref() + .map(|p| p.span.clone()) + .unwrap_or_else(|| Arc::new(async_span.clone())); + // TODO: add timeout for execution https://github.com/fluencelabs/fluence/issues/1212 self.future = Some( async move { @@ -259,9 +261,9 @@ where vm_id, vm: res.runtime, }; - (reusables, res.effects, res.stats, moved_async_span) + (reusables, res.effects, res.stats, linking_span) } - .instrument(async_span.clone().deref().clone()) + .instrument(async_span) .boxed(), ); self.wake(); From 938da707ea517faa48f82a5aa66437ca9127e063 Mon Sep 17 00:00:00 2001 From: Nick Date: Tue, 28 Nov 2023 00:23:11 +0300 Subject: [PATCH 09/33] wip --- aquamarine/src/plumber.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/aquamarine/src/plumber.rs b/aquamarine/src/plumber.rs index 0316e6896c..1ec2401fb3 100644 --- a/aquamarine/src/plumber.rs +++ b/aquamarine/src/plumber.rs @@ -107,9 +107,11 @@ impl Plumber { } if !self.key_manager.is_worker_active(worker_id) - && !self.key_manager.is_management(particle.init_peer_id) + && !self + .key_manager + .is_management(particle.particle.init_peer_id) { - tracing::trace!(target: "worker_inactive", particle_id = particle.id, worker_id = worker_id.to_string(), "Worker is not active"); + tracing::trace!(target: "worker_inactive", particle_id = particle.particle.id, worker_id = worker_id.to_string(), "Worker is not active"); return; } From b6407ba5c596e882d533b2461ad99c20f77ee3fa Mon Sep 17 00:00:00 2001 From: Nick Date: Tue, 28 Nov 2023 00:40:11 +0300 Subject: [PATCH 10/33] wip --- aquamarine/src/actor.rs | 23 ++++++++++++----------- aquamarine/src/vm_pool.rs | 4 ++-- crates/system-services/src/lib.rs | 2 +- nox/src/layers.rs | 6 ++---- 4 files changed, 17 insertions(+), 18 deletions(-) diff --git a/aquamarine/src/actor.rs b/aquamarine/src/actor.rs index 53d2cfddf8..aade4b53d9 100644 --- a/aquamarine/src/actor.rs +++ b/aquamarine/src/actor.rs @@ -40,20 +40,21 @@ struct Reusables { vm: Option, } +type ExecutionTask = Option< + BoxFuture< + 'static, + ( + Reusables, + ParticleEffects, + InterpretationStats, + Arc, + ), + >, +>; pub struct Actor { /// Particle of that actor is expired after that deadline deadline: Deadline, - future: Option< - BoxFuture< - 'static, - ( - Reusables, - ParticleEffects, - InterpretationStats, - Arc, - ), - >, - >, + future: ExecutionTask, mailbox: VecDeque, waker: Option, functions: Functions, diff --git a/aquamarine/src/vm_pool.rs b/aquamarine/src/vm_pool.rs index ce0326438f..f35b823bbf 100644 --- a/aquamarine/src/vm_pool.rs +++ b/aquamarine/src/vm_pool.rs @@ -120,7 +120,7 @@ impl VmPool { }); } - pub fn recreate_avm(&mut self, id: usize, cx: &mut Context<'_>) { + pub fn recreate_avm(&mut self, id: usize, cx: &Context<'_>) { if self.creating_runtimes.is_none() { log::error!( "Attempt to recreate an AVM before initialization (self.creating_runtimes is None), ignoring" @@ -134,7 +134,7 @@ impl VmPool { } } - fn create_avm(&self, cx: &mut Context<'_>) -> RuntimeF { + fn create_avm(&self, cx: &Context<'_>) -> RuntimeF { let config = self.runtime_config.clone(); let waker = cx.waker().clone(); diff --git a/crates/system-services/src/lib.rs b/crates/system-services/src/lib.rs index 7af278bc42..d5b545131b 100644 --- a/crates/system-services/src/lib.rs +++ b/crates/system-services/src/lib.rs @@ -55,7 +55,7 @@ pub struct PackageDistro { pub init: Option>, } -impl<'a> std::fmt::Debug for PackageDistro { +impl fmt::Debug for PackageDistro { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("PackageDistro") .field("name", &self.name) diff --git a/nox/src/layers.rs b/nox/src/layers.rs index 05d47434fa..c2930c93f5 100644 --- a/nox/src/layers.rs +++ b/nox/src/layers.rs @@ -49,7 +49,7 @@ where .map(|c| &c.format) .unwrap_or(&LogFormat::Default); - let layer = match log_format { + match log_format { LogFormat::Logfmt => tracing_logfmt::builder() .with_target(true) .with_span_path(false) @@ -60,9 +60,7 @@ where .with_thread_ids(true) .with_thread_names(true) .boxed(), - }; - - layer + } } pub fn tokio_console_layer( From ca911a79ef65d2323b394b427a07962b89f40805 Mon Sep 17 00:00:00 2001 From: Nick Date: Tue, 28 Nov 2023 00:54:55 +0300 Subject: [PATCH 11/33] fix tests --- aquamarine/src/plumber.rs | 21 ++++++++++++++++++--- crates/nox-tests/tests/client_api.rs | 12 ++++++++++-- 2 files changed, 28 insertions(+), 5 deletions(-) diff --git a/aquamarine/src/plumber.rs b/aquamarine/src/plumber.rs index 1ec2401fb3..537aa74c9d 100644 --- a/aquamarine/src/plumber.rs +++ b/aquamarine/src/plumber.rs @@ -349,7 +349,7 @@ mod tests { use particle_args::Args; use particle_execution::{FunctionOutcome, ParticleFunction, ParticleParams, ServiceFunction}; - use particle_protocol::Particle; + use particle_protocol::{ExtendedParticle, Particle}; use crate::deadline::Deadline; use crate::plumber::mock_time::set_mock_time; @@ -358,6 +358,7 @@ mod tests { use crate::AquamarineApiError::ParticleExpired; use crate::{AquaRuntime, ParticleEffects, Plumber}; use async_trait::async_trait; + use tracing::Span; struct MockF; @@ -476,7 +477,14 @@ mod tests { let deadline = Deadline::from(&particle); assert!(!deadline.is_expired(now_ms())); - plumber.ingest(particle, None, RandomPeerId::random()); + plumber.ingest( + ExtendedParticle { + particle, + span: Arc::new(Span::none()), + }, + None, + RandomPeerId::random(), + ); assert_eq!(plumber.actors.len(), 1); let mut cx = context(); @@ -509,7 +517,14 @@ mod tests { let deadline = Deadline::from(&particle); assert!(deadline.is_expired(now_ms())); - plumber.ingest(particle.clone(), None, RandomPeerId::random()); + plumber.ingest( + ExtendedParticle { + particle: particle.clone(), + span: Arc::new(Span::none()), + }, + None, + RandomPeerId::random(), + ); assert_eq!(plumber.actors.len(), 0); diff --git a/crates/nox-tests/tests/client_api.rs b/crates/nox-tests/tests/client_api.rs index 220a440f01..ccb04b6999 100644 --- a/crates/nox-tests/tests/client_api.rs +++ b/crates/nox-tests/tests/client_api.rs @@ -14,6 +14,7 @@ * limitations under the License. */ +use std::sync::Arc; use std::time::Duration; use futures::channel::oneshot::channel; @@ -21,11 +22,12 @@ use futures::future::BoxFuture; use futures::FutureExt; use maplit::hashmap; use serde_json::json; +use tracing::Span; use created_swarm::make_swarms; use now_millis::now_ms; use particle_execution::FunctionOutcome; -use particle_protocol::Particle; +use particle_protocol::{ExtendedParticle, Particle}; use test_constants::PARTICLE_TTL; use test_utils::timeout; use uuid_utils::uuid; @@ -93,7 +95,13 @@ async fn call_custom_service() { data: vec![], }; - let exec_f = swarms[1].aquamarine_api.clone().execute(particle, None); + let exec_f = swarms[1].aquamarine_api.clone().execute( + ExtendedParticle { + particle, + span: Arc::new(Span::none()), + }, + None, + ); let result = timeout(Duration::from_secs(30), async move { add_first_f.await.expect("add_first_f"); From d0b3dac9fcd82c11b3001746816b12eb1f1dea7a Mon Sep 17 00:00:00 2001 From: Nick Date: Tue, 28 Nov 2023 01:33:41 +0300 Subject: [PATCH 12/33] add spell span --- sorcerer/src/script_executor.rs | 14 +++----------- sorcerer/src/sorcerer.rs | 11 +++++++---- 2 files changed, 10 insertions(+), 15 deletions(-) diff --git a/sorcerer/src/script_executor.rs b/sorcerer/src/script_executor.rs index 6d2fad748f..214e8c2417 100644 --- a/sorcerer/src/script_executor.rs +++ b/sorcerer/src/script_executor.rs @@ -100,7 +100,7 @@ impl Sorcerer { } #[instrument(level = tracing::Level::INFO, skip_all)] - pub async fn execute_script(&self, event: TriggerEvent) { + pub async fn execute_script(&self, event: TriggerEvent, span: Arc) { let error: Result<(), JError> = try { let worker_id = self.services.get_service_owner( "", @@ -114,19 +114,11 @@ impl Sorcerer { m.observe_spell_cast(); } - let particle_span = Arc::new(Span::current()); - - let async_span = tracing::info_span!(parent: particle_span.as_ref(), "Script executor: aquamarine async execute"); + let async_span = tracing::info_span!(parent: span.as_ref(), "Script executor: aquamarine async execute"); self.aquamarine .clone() - .execute( - ExtendedParticle { - particle, - span: particle_span, - }, - None, - ) + .execute(ExtendedParticle { particle, span }, None) .instrument(async_span) .await?; }; diff --git a/sorcerer/src/sorcerer.rs b/sorcerer/src/sorcerer.rs index cc28defd45..a995f31b85 100644 --- a/sorcerer/src/sorcerer.rs +++ b/sorcerer/src/sorcerer.rs @@ -15,6 +15,7 @@ */ use std::collections::HashMap; +use std::sync::Arc; use std::time::Duration; use futures::{FutureExt, StreamExt}; @@ -143,17 +144,19 @@ impl Sorcerer { let spell_events_stream = UnboundedReceiverStream::new(spell_events_receiver); spell_events_stream .for_each_concurrent(None, move |spell_event| { - let span = tracing::info_span!( + let root_span = tracing::info_span!( "Sorcerer: spell processing", spell_id = spell_event.spell_id.to_string() ); - let _ = span.enter(); + let root_span = Arc::new(root_span); + let async_span = tracing::info_span!(parent: root_span.as_ref(), "Sorcerer: async execute script"); + let sorcerer = self.clone(); // Note that the event that triggered the spell is in `spell_event.event` async move { - sorcerer.execute_script(spell_event).await; + sorcerer.execute_script(spell_event,root_span).await; } - .instrument(span) + .instrument(async_span) }) .await; }) From bfcfcf46f8bda869332e14d000018cc30f6234f4 Mon Sep 17 00:00:00 2001 From: Nick Date: Tue, 28 Nov 2023 13:56:08 +0300 Subject: [PATCH 13/33] Improve logs --- aquamarine/src/actor.rs | 10 +++++++--- sorcerer/src/script_executor.rs | 9 +++++---- sorcerer/src/sorcerer.rs | 9 +++++++-- 3 files changed, 19 insertions(+), 9 deletions(-) diff --git a/aquamarine/src/actor.rs b/aquamarine/src/actor.rs index aade4b53d9..1918c41b3a 100644 --- a/aquamarine/src/actor.rs +++ b/aquamarine/src/actor.rs @@ -160,7 +160,7 @@ where if let Some(Ready((reusables, effects, stats, span))) = self.future.as_mut().map(|f| f.poll_unpin(cx)) { - let local_span = tracing::info_span!(parent: span.as_ref(), "Poll AVM future"); + let local_span = tracing::info_span!(parent: span.as_ref(), "Poll AVM future", particle_id= self.particle.id, deal_id = self.deal_id); let _span_guard = local_span.enter(); self.future.take(); @@ -235,11 +235,15 @@ where let key_pair = self.key_pair.clone(); let async_span = if let Some(ext_particle) = ext_particle.as_ref() { - tracing::info_span!(parent: ext_particle.span.as_ref(),"Actor: async AVM process particle & call results",particle_id = particle.id) + tracing::info_span!(parent: ext_particle.span.as_ref(), + "Actor: async AVM process particle & call results", + particle_id = particle.id, + deal_id = self.deal_id) } else { tracing::info_span!( "Actor: async AVM process call results", - particle_id = particle.id + particle_id = particle.id, + deal_id = self.deal_id ) }; for span in spans { diff --git a/sorcerer/src/script_executor.rs b/sorcerer/src/script_executor.rs index 214e8c2417..0305c4f347 100644 --- a/sorcerer/src/script_executor.rs +++ b/sorcerer/src/script_executor.rs @@ -54,6 +54,7 @@ impl Sorcerer { .map_err(|e| JError::new(e.to_string())) } + #[instrument(level = tracing::Level::INFO, skip_all)] pub(crate) fn make_spell_particle( &self, spell_id: String, @@ -114,7 +115,7 @@ impl Sorcerer { m.observe_spell_cast(); } - let async_span = tracing::info_span!(parent: span.as_ref(), "Script executor: aquamarine async execute"); + let async_span = tracing::info_span!(parent: span.as_ref(), "Script executor: aquamarine async execute", spell_id = event.spell_id.to_string()); self.aquamarine .clone() @@ -125,10 +126,10 @@ impl Sorcerer { if let Err(err) = error { log::warn!( - "Failed to execute spell script id: {}, event: {:?}, error: {:?}", - event.spell_id, + "Failed to execute spell script id: {spell_id}, event: {:?}, error: {:?}", event.info, - err + err, + spell_id = event.spell_id.to_string(), ); } } diff --git a/sorcerer/src/sorcerer.rs b/sorcerer/src/sorcerer.rs index a995f31b85..c9ead11470 100644 --- a/sorcerer/src/sorcerer.rs +++ b/sorcerer/src/sorcerer.rs @@ -149,12 +149,17 @@ impl Sorcerer { spell_id = spell_event.spell_id.to_string() ); let root_span = Arc::new(root_span); - let async_span = tracing::info_span!(parent: root_span.as_ref(), "Sorcerer: async execute script"); + let async_span = tracing::info_span!(parent: root_span.as_ref(), + "Sorcerer: async execute script", + spell_id = spell_event.spell_id.to_string()); let sorcerer = self.clone(); // Note that the event that triggered the spell is in `spell_event.event` async move { - sorcerer.execute_script(spell_event,root_span).await; + sorcerer + .execute_script(spell_event, root_span) + .in_current_span() + .await; } .instrument(async_span) }) From c508c05d121ed76b97f18a64328d50b0f04e5f22 Mon Sep 17 00:00:00 2001 From: Nick Date: Tue, 28 Nov 2023 17:27:36 +0300 Subject: [PATCH 14/33] Improve span --- aquamarine/src/actor.rs | 25 +++++++++---------------- 1 file changed, 9 insertions(+), 16 deletions(-) diff --git a/aquamarine/src/actor.rs b/aquamarine/src/actor.rs index 1918c41b3a..b0ec9e8df2 100644 --- a/aquamarine/src/actor.rs +++ b/aquamarine/src/actor.rs @@ -234,25 +234,18 @@ where // TODO: get rid of this clone by recovering key_pair after `vm.execute` (not trivial to implement) let key_pair = self.key_pair.clone(); - let async_span = if let Some(ext_particle) = ext_particle.as_ref() { - tracing::info_span!(parent: ext_particle.span.as_ref(), - "Actor: async AVM process particle & call results", - particle_id = particle.id, - deal_id = self.deal_id) - } else { - tracing::info_span!( - "Actor: async AVM process call results", - particle_id = particle.id, - deal_id = self.deal_id - ) - }; + let async_span = tracing::info_span!( + "Actor: async AVM process particle & call results", + particle_id = particle.id, + deal_id = self.deal_id + ); + if let Some(ext_particle) = ext_particle.as_ref() { + async_span.follows_from(ext_particle.span.as_ref()); + } for span in spans { async_span.follows_from(span.as_ref()); } - let linking_span = ext_particle - .as_ref() - .map(|p| p.span.clone()) - .unwrap_or_else(|| Arc::new(async_span.clone())); + let linking_span = Arc::new(async_span.clone()); // TODO: add timeout for execution https://github.com/fluencelabs/fluence/issues/1212 self.future = Some( From 4622fb964bcca9b081e1cc6b285509f3a4ae56f8 Mon Sep 17 00:00:00 2001 From: Nick Date: Wed, 6 Dec 2023 14:43:22 +0300 Subject: [PATCH 15/33] Review fixes --- aquamarine/src/actor.rs | 11 +++-------- aquamarine/src/plumber.rs | 2 +- 2 files changed, 4 insertions(+), 9 deletions(-) diff --git a/aquamarine/src/actor.rs b/aquamarine/src/actor.rs index b0ec9e8df2..e86f50dcf4 100644 --- a/aquamarine/src/actor.rs +++ b/aquamarine/src/actor.rs @@ -75,27 +75,22 @@ where F: ParticleFunctionStatic, { pub fn new( - particle: &ExtendedParticle, + particle: &Particle, functions: Functions, current_peer_id: PeerId, key_pair: KeyPair, deal_id: Option, ) -> Self { Self { - deadline: Deadline::from(&particle.particle), + deadline: Deadline::from(&particle), functions, future: None, mailbox: <_>::default(), waker: None, // Clone particle without data particle: Particle { - id: particle.particle.id.clone(), - init_peer_id: particle.particle.init_peer_id, - timestamp: particle.particle.timestamp, - ttl: particle.particle.ttl, - script: particle.particle.script.clone(), - signature: particle.particle.signature.clone(), data: vec![], + ..particle.clone() }, current_peer_id, key_pair, diff --git a/aquamarine/src/plumber.rs b/aquamarine/src/plumber.rs index 537aa74c9d..065af9659c 100644 --- a/aquamarine/src/plumber.rs +++ b/aquamarine/src/plumber.rs @@ -127,7 +127,7 @@ impl Plumber { let key_pair = self.key_manager.get_worker_keypair(worker_id); let deal_id = self.key_manager.get_deal_id(worker_id).ok(); key_pair.map(|kp| { - let actor = Actor::new(&particle, functions, worker_id, kp, deal_id); + let actor = Actor::new(&particle.particle, functions, worker_id, kp, deal_id); entry.insert(actor) }) } From 70e5f61cd0d89f5b293f68cc64456e5ea8d4d177 Mon Sep 17 00:00:00 2001 From: Nick Date: Wed, 6 Dec 2023 14:49:38 +0300 Subject: [PATCH 16/33] Review fixes --- aquamarine/src/actor.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/aquamarine/src/actor.rs b/aquamarine/src/actor.rs index e86f50dcf4..b3c8ffc951 100644 --- a/aquamarine/src/actor.rs +++ b/aquamarine/src/actor.rs @@ -82,7 +82,7 @@ where deal_id: Option, ) -> Self { Self { - deadline: Deadline::from(&particle), + deadline: Deadline::from(particle), functions, future: None, mailbox: <_>::default(), @@ -152,11 +152,11 @@ where self.functions.poll(cx); // Poll AquaVM future - if let Some(Ready((reusables, effects, stats, span))) = + if let Some(Ready((reusables, effects, stats, parent_span))) = self.future.as_mut().map(|f| f.poll_unpin(cx)) { - let local_span = tracing::info_span!(parent: span.as_ref(), "Poll AVM future", particle_id= self.particle.id, deal_id = self.deal_id); - let _span_guard = local_span.enter(); + let span = tracing::info_span!(parent: parent_span.as_ref(), "Actor: execute call requests", particle_id= self.particle.id, deal_id = self.deal_id); + let _span_guard = span.enter(); self.future.take(); let waker = cx.waker().clone(); @@ -165,7 +165,7 @@ where self.particle.id.clone(), effects.call_requests, waker, - span.clone(), + parent_span.clone(), ); let effects = RoutingEffects { @@ -174,7 +174,7 @@ where data: effects.new_data, ..self.particle.clone() }, - span, + span: parent_span, }, next_peers: effects.next_peers, }; From f6c07f1dd50437fa79c2923fe7fddb0e65f1ab93 Mon Sep 17 00:00:00 2001 From: Nick Date: Wed, 6 Dec 2023 14:50:42 +0300 Subject: [PATCH 17/33] Review fixes --- aquamarine/src/actor.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/aquamarine/src/actor.rs b/aquamarine/src/actor.rs index b3c8ffc951..84c2365f4a 100644 --- a/aquamarine/src/actor.rs +++ b/aquamarine/src/actor.rs @@ -155,7 +155,10 @@ where if let Some(Ready((reusables, effects, stats, parent_span))) = self.future.as_mut().map(|f| f.poll_unpin(cx)) { - let span = tracing::info_span!(parent: parent_span.as_ref(), "Actor: execute call requests", particle_id= self.particle.id, deal_id = self.deal_id); + let span = tracing::info_span!(parent: parent_span.as_ref(), + "Actor: execute call requests", + particle_id = self.particle.id, + deal_id = self.deal_id); let _span_guard = span.enter(); self.future.take(); From 14c86959997024377f54fb9a47a28b091634bd32 Mon Sep 17 00:00:00 2001 From: Nick Date: Wed, 13 Dec 2023 18:32:10 +0300 Subject: [PATCH 18/33] wip --- aquamarine/src/actor.rs | 48 +++++++++++++++++---------- aquamarine/src/particle_data_store.rs | 4 +++ aquamarine/src/particle_executor.rs | 3 ++ aquamarine/src/plumber.rs | 10 ++---- connection-pool/src/behaviour.rs | 7 ++-- crates/nox-tests/tests/client_api.rs | 11 +++--- particle-protocol/src/particle.rs | 16 +++++++++ sorcerer/src/script_executor.rs | 2 +- 8 files changed, 63 insertions(+), 38 deletions(-) diff --git a/aquamarine/src/actor.rs b/aquamarine/src/actor.rs index 2224ae9e8e..aaed0f3d13 100644 --- a/aquamarine/src/actor.rs +++ b/aquamarine/src/actor.rs @@ -148,6 +148,9 @@ where if let Some(Poll::Ready(res)) = self.future.as_mut().map(|f| f.poll_unpin(cx)) { let (reusables, effects, stats, parent_span) = res; + let span = tracing::info_span!(parent: parent_span.as_ref(), "Actor: execute call requests", particle_id= self.particle.id, deal_id = self.deal_id); + let _span_guard = span.enter(); + self.future.take(); let waker = cx.waker().clone(); @@ -160,13 +163,13 @@ where ); let effects = RoutingEffects { - particle: ExtendedParticle { - particle: Particle { + particle: ExtendedParticle::linked( + Particle { data: effects.new_data, ..self.particle.clone() }, - span: parent_span, - }, + parent_span, + ), next_peers: effects.next_peers, }; return Some(Poll::Ready(FutResult { @@ -193,7 +196,7 @@ where } // Gather CallResults - let (calls, stats, spans) = self.functions.drain(); + let (calls, stats, call_spans) = self.functions.drain(); // Take the next particle let ext_particle = self.mailbox.pop_front(); @@ -218,18 +221,8 @@ where let key_pair = self.key_pair.clone(); let peer_id = self.current_peer_id; - let async_span = tracing::info_span!( - "Actor: async AVM process particle & call results", - particle_id = particle.id, - deal_id = self.deal_id - ); - if let Some(ext_particle) = ext_particle.as_ref() { - async_span.follows_from(ext_particle.span.as_ref()); - } - for span in spans { - async_span.follows_from(span.as_ref()); - } - let linking_span = Arc::new(async_span.clone()); + let (async_span, linking_span) = + self.create_spans(call_spans, ext_particle, particle.id.as_str()); self.future = Some( async move { @@ -254,6 +247,27 @@ where ActorPoll::Executing(stats) } + + fn create_spans( + &self, + call_spans: Vec>, + ext_particle: Option, + particle_id: &str, + ) -> (Span, Arc) { + let async_span = tracing::info_span!( + "Actor: async AVM process particle & call results", + particle_id = particle_id, + deal_id = self.deal_id + ); + if let Some(ext_particle) = ext_particle.as_ref() { + async_span.follows_from(ext_particle.span.as_ref()); + } + for span in call_spans { + async_span.follows_from(span.as_ref()); + } + let linking_span = Arc::new(async_span.clone()); + (async_span, linking_span) + } fn wake(&self) { if let Some(waker) = &self.waker { waker.wake_by_ref(); diff --git a/aquamarine/src/particle_data_store.rs b/aquamarine/src/particle_data_store.rs index f55ae4a173..4f75e0fb3a 100644 --- a/aquamarine/src/particle_data_store.rs +++ b/aquamarine/src/particle_data_store.rs @@ -23,6 +23,7 @@ use avm_server::avm_runner::RawAVMOutcome; use avm_server::{AnomalyData, CallResults, ParticleParameters}; use fluence_libp2p::PeerId; use thiserror::Error; +use tracing::instrument; use crate::DataStoreError::SerializeAnomaly; use now_millis::now_ms; @@ -82,6 +83,7 @@ impl ParticleDataStore { Ok(()) } + #[instrument(level = tracing::Level::INFO, skip_all)] pub async fn store_data( &self, data: &[u8], @@ -97,6 +99,7 @@ impl ParticleDataStore { Ok(()) } + #[instrument(level = tracing::Level::INFO)] pub async fn read_data(&self, particle_id: &str, current_peer_id: &str) -> Result> { let data_path = self.data_file(particle_id, current_peer_id); let data = tokio::fs::read(&data_path).await.unwrap_or_default(); @@ -151,6 +154,7 @@ impl ParticleDataStore { } #[allow(clippy::too_many_arguments)] + #[instrument(level = tracing::Level::INFO, skip_all)] pub async fn save_anomaly_data( &self, air_script: &str, diff --git a/aquamarine/src/particle_executor.rs b/aquamarine/src/particle_executor.rs index 7dc089bb2d..f59bc536cc 100644 --- a/aquamarine/src/particle_executor.rs +++ b/aquamarine/src/particle_executor.rs @@ -107,6 +107,7 @@ impl ParticleExecutor for RT { } } +#[instrument(level = tracing::Level::INFO, skip_all)] async fn execute_with_prev_data( vm: RT, data_store: Arc, @@ -152,6 +153,7 @@ async fn execute_with_prev_data( } } +#[instrument(level = tracing::Level::INFO, skip_all)] async fn process_avm_result( data_store: Arc, current_peer_id: PeerId, @@ -230,6 +232,7 @@ where } } +#[instrument(level = tracing::Level::INFO, skip_all)] async fn avm_call<'a, RT: AquaRuntime>( mut vm: RT, current_peer_id: PeerId, diff --git a/aquamarine/src/plumber.rs b/aquamarine/src/plumber.rs index f970b10a32..99afbe6ca6 100644 --- a/aquamarine/src/plumber.rs +++ b/aquamarine/src/plumber.rs @@ -503,10 +503,7 @@ mod tests { assert!(!deadline.is_expired(now_ms())); plumber.ingest( - ExtendedParticle { - particle, - span: Arc::new(Span::none()), - }, + ExtendedParticle::new(particle, Span::none()), None, RandomPeerId::random(), ); @@ -543,10 +540,7 @@ mod tests { assert!(deadline.is_expired(now_ms())); plumber.ingest( - ExtendedParticle { - particle: particle.clone(), - span: Arc::new(Span::none()), - }, + ExtendedParticle::new(particle.clone(), Span::none()), None, RandomPeerId::random(), ); diff --git a/connection-pool/src/behaviour.rs b/connection-pool/src/behaviour.rs index bdbc8f29f9..51b2f87914 100644 --- a/connection-pool/src/behaviour.rs +++ b/connection-pool/src/behaviour.rs @@ -33,7 +33,6 @@ use particle_protocol::{ }; use peer_metrics::ConnectionPoolMetrics; use std::pin::Pin; -use std::sync::Arc; use std::{ collections::{hash_map::Entry, HashMap, HashSet, VecDeque}, task::{Context, Poll, Waker}, @@ -627,10 +626,8 @@ impl NetworkBehaviour for ConnectionPoolBehaviour { particle.data.len() as f64, ) }); - self.queue.push_back(ExtendedParticle { - particle, - span: Arc::new(root_span), - }); + self.queue + .push_back(ExtendedParticle::new(particle, root_span)); self.wake(); } Ok(HandlerMessage::InboundUpgradeError(err)) => log::warn!("UpgradeError: {:?}", err), diff --git a/crates/nox-tests/tests/client_api.rs b/crates/nox-tests/tests/client_api.rs index ccb04b6999..70a0c467af 100644 --- a/crates/nox-tests/tests/client_api.rs +++ b/crates/nox-tests/tests/client_api.rs @@ -95,13 +95,10 @@ async fn call_custom_service() { data: vec![], }; - let exec_f = swarms[1].aquamarine_api.clone().execute( - ExtendedParticle { - particle, - span: Arc::new(Span::none()), - }, - None, - ); + let exec_f = swarms[1] + .aquamarine_api + .clone() + .execute(ExtendedParticle::new(particle, Span::none()), None); let result = timeout(Duration::from_secs(30), async move { add_first_f.await.expect("add_first_f"); diff --git a/particle-protocol/src/particle.rs b/particle-protocol/src/particle.rs index f0cb41e3de..ba54068043 100644 --- a/particle-protocol/src/particle.rs +++ b/particle-protocol/src/particle.rs @@ -37,6 +37,22 @@ pub struct ExtendedParticle { pub span: Arc, } +impl ExtendedParticle { + pub fn new(particle: Particle, span: Span) -> Self { + Self { + particle, + span: Arc::new(span), + } + } + + pub fn linked(particle: Particle, span: Arc) -> Self { + Self { + particle, + span: span.clone(), + } + } +} + #[derive(Clone, Serialize, Deserialize, PartialEq, Derivative)] #[derivative(Debug)] pub struct Particle { diff --git a/sorcerer/src/script_executor.rs b/sorcerer/src/script_executor.rs index 0305c4f347..bf328c9d92 100644 --- a/sorcerer/src/script_executor.rs +++ b/sorcerer/src/script_executor.rs @@ -119,7 +119,7 @@ impl Sorcerer { self.aquamarine .clone() - .execute(ExtendedParticle { particle, span }, None) + .execute(ExtendedParticle::linked(particle, span), None) .instrument(async_span) .await?; }; From daf0f590d028e3425e823f78cdc75312fe8ec139 Mon Sep 17 00:00:00 2001 From: Nick Date: Wed, 13 Dec 2023 18:41:02 +0300 Subject: [PATCH 19/33] wip --- aquamarine/src/actor.rs | 2 +- aquamarine/src/aquamarine.rs | 2 +- aquamarine/src/particle_functions.rs | 5 ++--- aquamarine/src/plumber.rs | 6 +++--- nox/src/dispatcher.rs | 5 +++-- particle-protocol/src/particle.rs | 6 ++++++ 6 files changed, 16 insertions(+), 10 deletions(-) diff --git a/aquamarine/src/actor.rs b/aquamarine/src/actor.rs index aaed0f3d13..224a729be6 100644 --- a/aquamarine/src/actor.rs +++ b/aquamarine/src/actor.rs @@ -148,7 +148,7 @@ where if let Some(Poll::Ready(res)) = self.future.as_mut().map(|f| f.poll_unpin(cx)) { let (reusables, effects, stats, parent_span) = res; - let span = tracing::info_span!(parent: parent_span.as_ref(), "Actor: execute call requests", particle_id= self.particle.id, deal_id = self.deal_id); + let span = tracing::info_span!(parent: parent_span.as_ref(), "Actor::execute call requests", particle_id= self.particle.id, deal_id = self.deal_id); let _span_guard = span.enter(); self.future.take(); diff --git a/aquamarine/src/aquamarine.rs b/aquamarine/src/aquamarine.rs index 840b52a562..264082279c 100644 --- a/aquamarine/src/aquamarine.rs +++ b/aquamarine/src/aquamarine.rs @@ -105,7 +105,7 @@ impl AquamarineBackend { match self.inlet.poll_recv(cx) { Poll::Ready(Some(Ingest { particle, function })) => { wake = true; - let span = tracing::info_span!(parent: particle.span.as_ref(), "Aquamarine: Poll Ingest"); + let span = tracing::info_span!(parent: particle.span.as_ref(), "Aquamarine:poll::ingest"); let _guard = span.entered(); // set new particle to be executed // every particle that comes from the connection pool first executed on the host peer id diff --git a/aquamarine/src/particle_functions.rs b/aquamarine/src/particle_functions.rs index b495db2c2a..b10c021401 100644 --- a/aquamarine/src/particle_functions.rs +++ b/aquamarine/src/particle_functions.rs @@ -141,8 +141,7 @@ impl Functions { waker: Waker, span: Arc, ) -> BoxFuture<'static, SingleCallResult> { - let local_span = tracing::info_span!(parent: span.as_ref(), "Particle functions: call"); - let _guard = local_span.enter(); + let async_span = tracing::info_span!(parent: span.as_ref(), "ParticleFunctions: call"); // Deserialize params let args = match Args::try_from(call) { Ok(args) => args, @@ -166,7 +165,7 @@ impl Functions { span, } } - .in_current_span() + .instrument(async_span) .boxed(); } }; diff --git a/aquamarine/src/plumber.rs b/aquamarine/src/plumber.rs index 99afbe6ca6..9900605b6d 100644 --- a/aquamarine/src/plumber.rs +++ b/aquamarine/src/plumber.rs @@ -96,7 +96,7 @@ impl Plumber { ) { self.wake(); - let deadline = Deadline::from(&particle.particle); + let deadline = Deadline::from(particle.as_ref()); if deadline.is_expired(now_ms()) { tracing::info!(target: "expired", particle_id = particle.particle.id, "Particle is expired"); self.events @@ -132,14 +132,14 @@ impl Plumber { let actor = match entry { Entry::Occupied(actor) => Ok(actor.into_mut()), Entry::Vacant(entry) => { - let params = ParticleParams::clone_from(&particle.particle, worker_id); + let params = ParticleParams::clone_from(particle.as_ref(), worker_id); let functions = Functions::new(params, builtins.clone()); let key_pair = self.key_manager.get_worker_keypair(worker_id); let deal_id = self.key_manager.get_deal_id(worker_id).ok(); let data_store = self.data_store.clone(); key_pair.map(|kp| { let actor = Actor::new( - &particle.particle, + particle.as_ref(), functions, worker_id, kp, diff --git a/nox/src/dispatcher.rs b/nox/src/dispatcher.rs index d0676ffede..46aceeb47d 100644 --- a/nox/src/dispatcher.rs +++ b/nox/src/dispatcher.rs @@ -99,10 +99,11 @@ impl Dispatcher { let metrics = metrics.clone(); if particle.particle.is_expired() { + let particle_id = particle.particle.id; if let Some(m) = metrics { - m.particle_expired(&particle.particle.id); + m.particle_expired(particle_id.as_str()); } - tracing::info!(target: "expired", particle_id = particle.particle.id, "Particle is expired"); + tracing::info!(target: "expired", particle_id = particle_id, "Particle is expired"); return async {}.boxed(); } diff --git a/particle-protocol/src/particle.rs b/particle-protocol/src/particle.rs index ba54068043..6a8e7f7d9b 100644 --- a/particle-protocol/src/particle.rs +++ b/particle-protocol/src/particle.rs @@ -37,6 +37,12 @@ pub struct ExtendedParticle { pub span: Arc, } +impl AsRef for ExtendedParticle { + fn as_ref(&self) -> &Particle { + &self.particle + } +} + impl ExtendedParticle { pub fn new(particle: Particle, span: Span) -> Self { Self { From 68fc834f0476fd6bffa26b3a1a85bdbeb1478299 Mon Sep 17 00:00:00 2001 From: Nick Date: Wed, 13 Dec 2023 18:42:09 +0300 Subject: [PATCH 20/33] wip --- connection-pool/src/behaviour.rs | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/connection-pool/src/behaviour.rs b/connection-pool/src/behaviour.rs index 51b2f87914..c89441a0c6 100644 --- a/connection-pool/src/behaviour.rs +++ b/connection-pool/src/behaviour.rs @@ -225,8 +225,13 @@ impl ConnectionPoolBehaviour { outlet.send(SendStatus::Ok).ok(); self.wake(); } else if self.contacts.contains_key(&to.peer_id) { - tracing::debug!(target: "network",particle_id = particle.particle.id , "{}: Sending particle to {}", self.peer_id, to.peer_id); - + tracing::debug!( + target: "network", + particle_id = particle.particle.id , + "{}: Sending particle to {}", + self.peer_id, + to.peer_id + ); // Send particle to remote peer self.push_event(ToSwarm::NotifyHandler { peer_id: to.peer_id, From a6079db540286cd56b8c9d4a24ca6dadd727568c Mon Sep 17 00:00:00 2001 From: Nick Date: Wed, 13 Dec 2023 18:51:22 +0300 Subject: [PATCH 21/33] wip --- aquamarine/src/actor.rs | 3 +-- aquamarine/src/particle_functions.rs | 3 ++- connection-pool/src/behaviour.rs | 4 ++-- nox/src/dispatcher.rs | 8 ++++---- sorcerer/src/script_executor.rs | 3 --- sorcerer/src/sorcerer.rs | 4 ++-- 6 files changed, 11 insertions(+), 14 deletions(-) diff --git a/aquamarine/src/actor.rs b/aquamarine/src/actor.rs index 224a729be6..43a0afc875 100644 --- a/aquamarine/src/actor.rs +++ b/aquamarine/src/actor.rs @@ -147,8 +147,7 @@ where fn poll_avm_future(&mut self, cx: &mut Context<'_>) -> Option>> { if let Some(Poll::Ready(res)) = self.future.as_mut().map(|f| f.poll_unpin(cx)) { let (reusables, effects, stats, parent_span) = res; - - let span = tracing::info_span!(parent: parent_span.as_ref(), "Actor::execute call requests", particle_id= self.particle.id, deal_id = self.deal_id); + let span = tracing::info_span!(parent: parent_span.as_ref(), "Actor::poll_avm_future::future_ready", particle_id= self.particle.id, deal_id = self.deal_id); let _span_guard = span.enter(); self.future.take(); diff --git a/aquamarine/src/particle_functions.rs b/aquamarine/src/particle_functions.rs index b10c021401..1e57d910aa 100644 --- a/aquamarine/src/particle_functions.rs +++ b/aquamarine/src/particle_functions.rs @@ -141,7 +141,8 @@ impl Functions { waker: Waker, span: Arc, ) -> BoxFuture<'static, SingleCallResult> { - let async_span = tracing::info_span!(parent: span.as_ref(), "ParticleFunctions: call"); + let async_span = + tracing::info_span!(parent: span.as_ref(), "ParticleFunctions::call::async"); // Deserialize params let args = match Args::try_from(call) { Ok(args) => args, diff --git a/connection-pool/src/behaviour.rs b/connection-pool/src/behaviour.rs index c89441a0c6..10a54b0a3c 100644 --- a/connection-pool/src/behaviour.rs +++ b/connection-pool/src/behaviour.rs @@ -217,7 +217,7 @@ impl ConnectionPoolBehaviour { outlet: oneshot::Sender, ) { let span = - tracing::info_span!(parent: particle.span.as_ref(), "Connection pool behaviour: send"); + tracing::info_span!(parent: particle.span.as_ref(), "ConnectionPool::Behaviour::send"); let _guard = span.enter(); if to.peer_id == self.peer_id { // If particle is sent to the current node, process it locally @@ -653,7 +653,7 @@ impl NetworkBehaviour for ConnectionPoolBehaviour { // channel is ready to consume more particles, so send them if let Some(particle) = self.queue.pop_front() { let particle_id = particle.particle.id.clone(); - let _span = tracing::info_span!(parent: particle.span.as_ref(), "Connection pool: send to outlet").entered(); + let _span = tracing::info_span!(parent: particle.span.as_ref(), "ConnectionPool::outlet_send").entered(); if let Err(err) = outlet.start_send(particle) { tracing::error!( diff --git a/nox/src/dispatcher.rs b/nox/src/dispatcher.rs index 46aceeb47d..3b9a80bea4 100644 --- a/nox/src/dispatcher.rs +++ b/nox/src/dispatcher.rs @@ -92,9 +92,9 @@ impl Dispatcher { let metrics = self.metrics; particle_stream .for_each_concurrent(parallelism, move |particle| { - let current_span = tracing::info_span!(parent: particle.span.as_ref(), "Dispatcher: process particle"); - let async_span = tracing::info_span!(parent: particle.span.as_ref(), "Dispatcher: async Aquamarine.execute"); + let current_span = tracing::info_span!(parent: particle.span.as_ref(), "Dispatcher::process_particles::for_each"); let _ = current_span.enter(); + let async_span = tracing::info_span!("Dispatcher::process_particles::async"); let aquamarine = aquamarine.clone(); let metrics = metrics.clone(); @@ -136,9 +136,9 @@ impl Dispatcher { async move { match effects { Ok(effects) => { - let span = tracing::info_span!(parent: effects.particle.span.as_ref(), "Dispatcher: execute effectors"); + let async_span = tracing::info_span!(parent: effects.particle.span.as_ref(), "Dispatcher::effectors::execute"); // perform effects as instructed by aquamarine - effectors.execute(effects).instrument(span).await; + effectors.execute(effects).instrument(async_span).await; } Err(err) => { // particles are sent in fire and forget fashion, so diff --git a/sorcerer/src/script_executor.rs b/sorcerer/src/script_executor.rs index bf328c9d92..4b8e8d0d6c 100644 --- a/sorcerer/src/script_executor.rs +++ b/sorcerer/src/script_executor.rs @@ -115,12 +115,9 @@ impl Sorcerer { m.observe_spell_cast(); } - let async_span = tracing::info_span!(parent: span.as_ref(), "Script executor: aquamarine async execute", spell_id = event.spell_id.to_string()); - self.aquamarine .clone() .execute(ExtendedParticle::linked(particle, span), None) - .instrument(async_span) .await?; }; diff --git a/sorcerer/src/sorcerer.rs b/sorcerer/src/sorcerer.rs index 5d5e58db8b..c752652dce 100644 --- a/sorcerer/src/sorcerer.rs +++ b/sorcerer/src/sorcerer.rs @@ -146,12 +146,12 @@ impl Sorcerer { spell_events_stream .for_each_concurrent(None, move |spell_event| { let root_span = tracing::info_span!( - "Sorcerer: spell processing", + "Sorcerer::task::for_each", spell_id = spell_event.spell_id.to_string() ); let root_span = Arc::new(root_span); let async_span = tracing::info_span!(parent: root_span.as_ref(), - "Sorcerer: async execute script", + "Sorcerer::task::execute_script", spell_id = spell_event.spell_id.to_string()); let sorcerer = self.clone(); From 93e4872731b6de83049163174e2b98bada137a09 Mon Sep 17 00:00:00 2001 From: Nick Date: Wed, 13 Dec 2023 18:51:48 +0300 Subject: [PATCH 22/33] wip --- particle-protocol/src/particle.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/particle-protocol/src/particle.rs b/particle-protocol/src/particle.rs index 6a8e7f7d9b..1f4f7b2a71 100644 --- a/particle-protocol/src/particle.rs +++ b/particle-protocol/src/particle.rs @@ -31,6 +31,7 @@ use crate::error::ParticleError::{ use fluence_libp2p::{peerid_serializer, RandomPeerId}; use json_utils::base64_serde; use now_millis::now_ms; + #[derive(Clone, Debug)] pub struct ExtendedParticle { pub particle: Particle, From 5b83434dc9879849ce9b44b77955b4c8b975176d Mon Sep 17 00:00:00 2001 From: Nick Date: Wed, 13 Dec 2023 18:53:49 +0300 Subject: [PATCH 23/33] wip --- connection-pool/src/api.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/connection-pool/src/api.rs b/connection-pool/src/api.rs index f1867648fe..63af1ba393 100644 --- a/connection-pool/src/api.rs +++ b/connection-pool/src/api.rs @@ -112,7 +112,6 @@ impl ConnectionPoolT for ConnectionPoolApi { self.execute(|out| Command::GetContact { peer_id, out }) } - #[instrument(level = tracing::Level::INFO, skip_all)] fn send(&self, to: Contact, particle: ExtendedParticle) -> BoxFuture<'static, SendStatus> { let fut = self.execute(|out| Command::Send { to, particle, out }); // timeout on send is required because libp2p can silently drop outbound events From 9924d27f8d3e0d7dc18fda6dea4522f81002b5fa Mon Sep 17 00:00:00 2001 From: Nick Date: Wed, 13 Dec 2023 18:54:29 +0300 Subject: [PATCH 24/33] wip --- connection-pool/src/behaviour.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/connection-pool/src/behaviour.rs b/connection-pool/src/behaviour.rs index 10a54b0a3c..d02069c6e0 100644 --- a/connection-pool/src/behaviour.rs +++ b/connection-pool/src/behaviour.rs @@ -653,7 +653,6 @@ impl NetworkBehaviour for ConnectionPoolBehaviour { // channel is ready to consume more particles, so send them if let Some(particle) = self.queue.pop_front() { let particle_id = particle.particle.id.clone(); - let _span = tracing::info_span!(parent: particle.span.as_ref(), "ConnectionPool::outlet_send").entered(); if let Err(err) = outlet.start_send(particle) { tracing::error!( From bcd36ec3761272f40954203b555449a27325981e Mon Sep 17 00:00:00 2001 From: Nick Date: Wed, 13 Dec 2023 18:57:47 +0300 Subject: [PATCH 25/33] wip --- nox/src/effectors.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nox/src/effectors.rs b/nox/src/effectors.rs index fc13c09488..84644f3fa1 100644 --- a/nox/src/effectors.rs +++ b/nox/src/effectors.rs @@ -34,7 +34,7 @@ impl Effectors { /// Perform effects that Aquamarine instructed us to #[instrument(level = tracing::Level::INFO, skip_all)] pub async fn execute(self, effects: RoutingEffects) { - if effects.particle.particle.is_expired() { + if effects.particle.as_ref().is_expired() { tracing::info!(target: "expired", particle_id = effects.particle.particle.id, "Particle is expired"); return; } From 27b5245df7bb581b135731fcaebc6f07c78673a8 Mon Sep 17 00:00:00 2001 From: Nick Date: Wed, 13 Dec 2023 18:58:44 +0300 Subject: [PATCH 26/33] wip --- nox/src/effectors.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nox/src/effectors.rs b/nox/src/effectors.rs index 84644f3fa1..afd5c771fc 100644 --- a/nox/src/effectors.rs +++ b/nox/src/effectors.rs @@ -49,7 +49,7 @@ impl Effectors { async move { // resolve contact if let Some(contact) = connectivity - .resolve_contact(target, &particle.particle.id) + .resolve_contact(target, &particle.as_ref().id) .await { // forward particle From 6530b1400026c29d68a69f40d7cb64e1275b6d6c Mon Sep 17 00:00:00 2001 From: Nick Date: Wed, 13 Dec 2023 19:02:19 +0300 Subject: [PATCH 27/33] wip --- connection-pool/src/api.rs | 1 - nox/src/effectors.rs | 8 +++++--- particle-protocol/src/particle.rs | 6 ++++++ sorcerer/src/script_executor.rs | 2 +- 4 files changed, 12 insertions(+), 5 deletions(-) diff --git a/connection-pool/src/api.rs b/connection-pool/src/api.rs index 63af1ba393..79fce9a952 100644 --- a/connection-pool/src/api.rs +++ b/connection-pool/src/api.rs @@ -20,7 +20,6 @@ use futures::{future::BoxFuture, stream::BoxStream, FutureExt, StreamExt}; use libp2p::{core::Multiaddr, PeerId}; use tokio::sync::{mpsc, oneshot}; use tokio_stream::wrappers::UnboundedReceiverStream; -use tracing::instrument; use particle_protocol::ExtendedParticle; use particle_protocol::{Contact, SendStatus}; diff --git a/nox/src/effectors.rs b/nox/src/effectors.rs index afd5c771fc..d4533c394f 100644 --- a/nox/src/effectors.rs +++ b/nox/src/effectors.rs @@ -18,6 +18,7 @@ use futures::{stream::iter, StreamExt}; use tracing::instrument; use aquamarine::RoutingEffects; +use particle_protocol::Particle; use crate::connectivity::Connectivity; @@ -34,8 +35,9 @@ impl Effectors { /// Perform effects that Aquamarine instructed us to #[instrument(level = tracing::Level::INFO, skip_all)] pub async fn execute(self, effects: RoutingEffects) { - if effects.particle.as_ref().is_expired() { - tracing::info!(target: "expired", particle_id = effects.particle.particle.id, "Particle is expired"); + let particle: &Particle = effects.particle.as_ref(); + if particle.is_expired() { + tracing::info!(target: "expired", particle_id = particle.id, "Particle is expired"); return; } @@ -49,7 +51,7 @@ impl Effectors { async move { // resolve contact if let Some(contact) = connectivity - .resolve_contact(target, &particle.as_ref().id) + .resolve_contact(target, &particle.as_ref()) .await { // forward particle diff --git a/particle-protocol/src/particle.rs b/particle-protocol/src/particle.rs index 1f4f7b2a71..098c82173d 100644 --- a/particle-protocol/src/particle.rs +++ b/particle-protocol/src/particle.rs @@ -44,6 +44,12 @@ impl AsRef for ExtendedParticle { } } +impl AsRef for ExtendedParticle { + fn as_ref(&self) -> &str { + &self.particle.id + } +} + impl ExtendedParticle { pub fn new(particle: Particle, span: Span) -> Self { Self { diff --git a/sorcerer/src/script_executor.rs b/sorcerer/src/script_executor.rs index 4b8e8d0d6c..f43268a6a5 100644 --- a/sorcerer/src/script_executor.rs +++ b/sorcerer/src/script_executor.rs @@ -15,7 +15,7 @@ */ use fluence_libp2p::PeerId; use std::sync::Arc; -use tracing::{instrument, Instrument, Span}; +use tracing::{instrument, Span}; use crate::error::SorcererError::{ParticleSigningFailed, ScopeKeypairMissing}; use now_millis::now_ms; From 45a5dc141f8c6aead50065f3b1deae716c1200b4 Mon Sep 17 00:00:00 2001 From: Nick Date: Wed, 13 Dec 2023 19:12:45 +0300 Subject: [PATCH 28/33] wip --- crates/nox-tests/tests/client_api.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/crates/nox-tests/tests/client_api.rs b/crates/nox-tests/tests/client_api.rs index 70a0c467af..aa358ae698 100644 --- a/crates/nox-tests/tests/client_api.rs +++ b/crates/nox-tests/tests/client_api.rs @@ -14,7 +14,6 @@ * limitations under the License. */ -use std::sync::Arc; use std::time::Duration; use futures::channel::oneshot::channel; From b000e73b4859a7a7ebf97aa4641989d07e9218c5 Mon Sep 17 00:00:00 2001 From: Nick Date: Wed, 13 Dec 2023 19:13:51 +0300 Subject: [PATCH 29/33] wip --- nox/src/dispatcher.rs | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/nox/src/dispatcher.rs b/nox/src/dispatcher.rs index 3b9a80bea4..b1605759e3 100644 --- a/nox/src/dispatcher.rs +++ b/nox/src/dispatcher.rs @@ -91,15 +91,16 @@ impl Dispatcher { let aquamarine = self.aquamarine; let metrics = self.metrics; particle_stream - .for_each_concurrent(parallelism, move |particle| { - let current_span = tracing::info_span!(parent: particle.span.as_ref(), "Dispatcher::process_particles::for_each"); + .for_each_concurrent(parallelism, move |ext_particle| { + let current_span = tracing::info_span!(parent: ext_particle.span.as_ref(), "Dispatcher::process_particles::for_each"); let _ = current_span.enter(); let async_span = tracing::info_span!("Dispatcher::process_particles::async"); let aquamarine = aquamarine.clone(); let metrics = metrics.clone(); + let particle = ext_particle.as_ref(); - if particle.particle.is_expired() { - let particle_id = particle.particle.id; + if particle.is_expired() { + let particle_id = particle.id; if let Some(m) = metrics { m.particle_expired(particle_id.as_str()); } @@ -109,7 +110,7 @@ impl Dispatcher { async move { aquamarine - .execute(particle, None) + .execute(ext_particle, None) // do not log errors: Aquamarine will log them fine .map(|_| ()) .await From 72d1787028d46e1c3ee0ba97dab5cc1d4db5f180 Mon Sep 17 00:00:00 2001 From: Nick Date: Wed, 13 Dec 2023 19:15:35 +0300 Subject: [PATCH 30/33] wip --- connection-pool/src/behaviour.rs | 10 +++++----- nox/src/main.rs | 1 - 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/connection-pool/src/behaviour.rs b/connection-pool/src/behaviour.rs index d02069c6e0..5301193034 100644 --- a/connection-pool/src/behaviour.rs +++ b/connection-pool/src/behaviour.rs @@ -14,7 +14,6 @@ * limitations under the License. */ -use fluence_libp2p::remote_multiaddr; use futures::{Sink, StreamExt}; use libp2p::core::Endpoint; use libp2p::swarm::dial_opts::DialOpts; @@ -28,10 +27,6 @@ use libp2p::{ swarm::{NetworkBehaviour, NotifyHandler, OneShotHandler}, PeerId, }; -use particle_protocol::{ - CompletionChannel, Contact, ExtendedParticle, HandlerMessage, ProtocolConfig, SendStatus, -}; -use peer_metrics::ConnectionPoolMetrics; use std::pin::Pin; use std::{ collections::{hash_map::Entry, HashMap, HashSet, VecDeque}, @@ -43,6 +38,11 @@ use tokio_util::sync::PollSender; use crate::connection_pool::LifecycleEvent; use crate::{Command, ConnectionPoolApi}; +use fluence_libp2p::remote_multiaddr; +use particle_protocol::{ + CompletionChannel, Contact, ExtendedParticle, HandlerMessage, ProtocolConfig, SendStatus, +}; +use peer_metrics::ConnectionPoolMetrics; // type SwarmEventType = generate_swarm_event_type!(ConnectionPoolBehaviour); diff --git a/nox/src/main.rs b/nox/src/main.rs index be4ec3bfde..57293f6968 100644 --- a/nox/src/main.rs +++ b/nox/src/main.rs @@ -27,7 +27,6 @@ )] use base64::{engine::general_purpose::STANDARD as base64, Engine}; - use eyre::WrapErr; use libp2p::PeerId; use tokio::signal; From ec9db867b8cc03ef9a170c295658b758ccf0227f Mon Sep 17 00:00:00 2001 From: Nick Date: Wed, 13 Dec 2023 19:16:44 +0300 Subject: [PATCH 31/33] wip --- aquamarine/src/actor.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/aquamarine/src/actor.rs b/aquamarine/src/actor.rs index 43a0afc875..6b69848cd1 100644 --- a/aquamarine/src/actor.rs +++ b/aquamarine/src/actor.rs @@ -14,16 +14,16 @@ * limitations under the License. */ -use fluence_keypair::KeyPair; -use futures::future::BoxFuture; -use futures::FutureExt; use std::sync::Arc; use std::{ collections::VecDeque, task::{Context, Poll, Waker}, }; +use futures::future::BoxFuture; +use futures::FutureExt; use tracing::{instrument, Instrument, Span}; +use fluence_keypair::KeyPair; use fluence_libp2p::PeerId; use particle_execution::{ParticleFunctionStatic, ServiceFunction}; use particle_protocol::{ExtendedParticle, Particle}; From bc38ff26ba45fd76c1c76cdc36ca68b668650d87 Mon Sep 17 00:00:00 2001 From: Nick Date: Wed, 13 Dec 2023 19:23:44 +0300 Subject: [PATCH 32/33] wip --- aquamarine/src/actor.rs | 4 ++-- aquamarine/src/command.rs | 3 ++- nox/src/dispatcher.rs | 9 +++++---- particle-protocol/src/particle.rs | 2 +- sorcerer/src/script_executor.rs | 5 ++--- 5 files changed, 12 insertions(+), 11 deletions(-) diff --git a/aquamarine/src/actor.rs b/aquamarine/src/actor.rs index 6b69848cd1..a4142e0797 100644 --- a/aquamarine/src/actor.rs +++ b/aquamarine/src/actor.rs @@ -14,13 +14,13 @@ * limitations under the License. */ +use futures::future::BoxFuture; +use futures::FutureExt; use std::sync::Arc; use std::{ collections::VecDeque, task::{Context, Poll, Waker}, }; -use futures::future::BoxFuture; -use futures::FutureExt; use tracing::{instrument, Instrument, Span}; use fluence_keypair::KeyPair; diff --git a/aquamarine/src/command.rs b/aquamarine/src/command.rs index 5d7b516116..7a54372896 100644 --- a/aquamarine/src/command.rs +++ b/aquamarine/src/command.rs @@ -14,9 +14,10 @@ * limitations under the License. */ +use std::collections::HashMap; + use particle_execution::ServiceFunction; use particle_protocol::ExtendedParticle; -use std::collections::HashMap; pub enum Command { Ingest { diff --git a/nox/src/dispatcher.rs b/nox/src/dispatcher.rs index b1605759e3..f387c8485b 100644 --- a/nox/src/dispatcher.rs +++ b/nox/src/dispatcher.rs @@ -14,16 +14,17 @@ * limitations under the License. */ -use aquamarine::{AquamarineApi, AquamarineApiError, RoutingEffects}; -use fluence_libp2p::PeerId; use futures::{FutureExt, StreamExt}; -use particle_protocol::ExtendedParticle; -use peer_metrics::DispatcherMetrics; use prometheus_client::registry::Registry; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; use tracing::{instrument, Instrument}; +use aquamarine::{AquamarineApi, AquamarineApiError, RoutingEffects}; +use fluence_libp2p::PeerId; +use particle_protocol::ExtendedParticle; +use peer_metrics::DispatcherMetrics; + use crate::effectors::Effectors; use crate::tasks::Tasks; diff --git a/particle-protocol/src/particle.rs b/particle-protocol/src/particle.rs index 098c82173d..6137aba243 100644 --- a/particle-protocol/src/particle.rs +++ b/particle-protocol/src/particle.rs @@ -19,7 +19,6 @@ use std::sync::Arc; use std::time::Duration; use derivative::Derivative; -use fluence_keypair::{KeyPair, PublicKey, Signature}; use libp2p::PeerId; use serde::{Deserialize, Serialize}; use tracing::Span; @@ -28,6 +27,7 @@ use crate::error::ParticleError; use crate::error::ParticleError::{ DecodingError, InvalidKeypair, SignatureVerificationFailed, SigningFailed, }; +use fluence_keypair::{KeyPair, PublicKey, Signature}; use fluence_libp2p::{peerid_serializer, RandomPeerId}; use json_utils::base64_serde; use now_millis::now_ms; diff --git a/sorcerer/src/script_executor.rs b/sorcerer/src/script_executor.rs index f43268a6a5..a5db753be0 100644 --- a/sorcerer/src/script_executor.rs +++ b/sorcerer/src/script_executor.rs @@ -13,19 +13,18 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -use fluence_libp2p::PeerId; use std::sync::Arc; use tracing::{instrument, Span}; use crate::error::SorcererError::{ParticleSigningFailed, ScopeKeypairMissing}; +use crate::Sorcerer; +use fluence_libp2p::PeerId; use now_millis::now_ms; use particle_args::JError; use particle_protocol::{ExtendedParticle, Particle}; use spell_event_bus::api::{TriggerEvent, TriggerInfoAqua}; use spell_service_api::CallParams; -use crate::Sorcerer; - impl Sorcerer { fn get_spell_counter(&self, spell_id: String, worker_id: PeerId) -> Result { let params = CallParams::local(spell_id, worker_id, self.spell_script_particle_ttl); From d362b3abc7c965797ddf971ae552de60ce43de4b Mon Sep 17 00:00:00 2001 From: Nick Date: Wed, 13 Dec 2023 19:43:40 +0300 Subject: [PATCH 33/33] wip --- aquamarine/src/actor.rs | 7 ++++++- aquamarine/src/aquamarine.rs | 2 +- nox/src/dispatcher.rs | 8 ++++---- 3 files changed, 11 insertions(+), 6 deletions(-) diff --git a/aquamarine/src/actor.rs b/aquamarine/src/actor.rs index a4142e0797..31b46e519f 100644 --- a/aquamarine/src/actor.rs +++ b/aquamarine/src/actor.rs @@ -147,7 +147,12 @@ where fn poll_avm_future(&mut self, cx: &mut Context<'_>) -> Option>> { if let Some(Poll::Ready(res)) = self.future.as_mut().map(|f| f.poll_unpin(cx)) { let (reusables, effects, stats, parent_span) = res; - let span = tracing::info_span!(parent: parent_span.as_ref(), "Actor::poll_avm_future::future_ready", particle_id= self.particle.id, deal_id = self.deal_id); + let span = tracing::info_span!( + parent: parent_span.as_ref(), + "Actor::poll_avm_future::future_ready", + particle_id= self.particle.id, + deal_id = self.deal_id + ); let _span_guard = span.enter(); self.future.take(); diff --git a/aquamarine/src/aquamarine.rs b/aquamarine/src/aquamarine.rs index 264082279c..7c9322a593 100644 --- a/aquamarine/src/aquamarine.rs +++ b/aquamarine/src/aquamarine.rs @@ -105,7 +105,7 @@ impl AquamarineBackend { match self.inlet.poll_recv(cx) { Poll::Ready(Some(Ingest { particle, function })) => { wake = true; - let span = tracing::info_span!(parent: particle.span.as_ref(), "Aquamarine:poll::ingest"); + let span = tracing::info_span!(parent: particle.span.as_ref(), "Aquamarine::poll::ingest"); let _guard = span.entered(); // set new particle to be executed // every particle that comes from the connection pool first executed on the host peer id diff --git a/nox/src/dispatcher.rs b/nox/src/dispatcher.rs index f387c8485b..8bbfef5ec0 100644 --- a/nox/src/dispatcher.rs +++ b/nox/src/dispatcher.rs @@ -22,7 +22,7 @@ use tracing::{instrument, Instrument}; use aquamarine::{AquamarineApi, AquamarineApiError, RoutingEffects}; use fluence_libp2p::PeerId; -use particle_protocol::ExtendedParticle; +use particle_protocol::{ExtendedParticle, Particle}; use peer_metrics::DispatcherMetrics; use crate::effectors::Effectors; @@ -98,12 +98,12 @@ impl Dispatcher { let async_span = tracing::info_span!("Dispatcher::process_particles::async"); let aquamarine = aquamarine.clone(); let metrics = metrics.clone(); - let particle = ext_particle.as_ref(); + let particle: &Particle = ext_particle.as_ref(); if particle.is_expired() { - let particle_id = particle.id; + let particle_id = &particle.id.as_str(); if let Some(m) = metrics { - m.particle_expired(particle_id.as_str()); + m.particle_expired(particle_id); } tracing::info!(target: "expired", particle_id = particle_id, "Particle is expired"); return async {}.boxed();