Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(metrics): measure call time and wait time separately #1858

Merged
merged 4 commits into from
Oct 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 20 additions & 11 deletions aquamarine/src/particle_functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,10 @@ use crate::log::builtin_log_fn;
#[derive(Clone, Debug)]
/// Performance statistics about executed function call
pub struct SingleCallStat {
/// If execution happened, then how much time it took
pub run_time: Option<Duration>,
/// If execution happened, then how much time it took to execute the call
pub call_time: Option<Duration>,
/// If execution happened, then how much time call waited to be scheduled on blocking pool
pub wait_time: Option<Duration>,
pub success: bool,
/// Whether function call was to builtin functions (like op noop) or to services
pub kind: FunctionKind,
Expand Down Expand Up @@ -139,7 +141,8 @@ impl<F: ParticleFunctionStatic> Functions<F> {
call_id,
result,
stat: SingleCallStat {
run_time: None,
call_time: None,
wait_time: None,
success: false,
kind: FunctionKind::NotHappened,
},
Expand All @@ -155,19 +158,21 @@ impl<F: ParticleFunctionStatic> Functions<F> {
json!(&args.function_args)
);
let service_id = args.service_id.clone();
let start = Instant::now();

let params = self.particle.clone();
let builtins = self.builtins.clone();
let particle_function = self.particle_function.clone();
let span = tracing::span!(tracing::Level::INFO, "Function");
let schedule_wait_start = Instant::now();
let result = tokio::task::Builder::new()
.name(&format!(
"Call function {}:{}",
args.service_id, args.function_name
))
.spawn_blocking(|| {
.spawn_blocking(move || {
Handle::current().block_on(async move {
// How much time it took to start execution on blocking pool
let schedule_wait_time = schedule_wait_start.elapsed();
let outcome = builtins.call(args, params).await;
// record whether call was handled by builtin or not. needed for stats.
let mut call_kind = FunctionKind::Service;
Expand All @@ -187,14 +192,17 @@ impl<F: ParticleFunctionStatic> Functions<F> {
// Builtins were called, return their outcome
outcome => outcome,
};
(outcome, call_kind)
// How much time it took to execute the call
// TODO: Time for ParticleFunction includes lock time, which is not good. Low priority cuz ParticleFunctions are barely used.
let call_time = schedule_wait_start.elapsed() - schedule_wait_time;
(outcome, call_kind, call_time, schedule_wait_time)
})
})
.expect("Could not spawn task");

async move {
let (result, call_kind) = result.await.expect("Could not 'Call function' join");
let elapsed = start.elapsed();
let (result, call_kind, call_time, wait_time) =
result.await.expect("Could not 'Call function' join");

let result = match result {
FunctionOutcome::NotDefined { args, .. } => Err(JError::new(format!(
Expand All @@ -211,15 +219,16 @@ impl<F: ParticleFunctionStatic> Functions<F> {
particle_id = particle_id,
"Failed host call {} ({}): {}",
log_args,
pretty(elapsed),
pretty(call_time),
err
)
} else {
builtin_log_fn(&service_id, &log_args, pretty(elapsed), particle_id);
builtin_log_fn(&service_id, &log_args, pretty(call_time), particle_id);
};

let stats = SingleCallStat {
run_time: Some(elapsed),
call_time: Some(call_time),
wait_time: Some(wait_time),
success: result.is_ok(),
kind: call_kind,
};
Expand Down
6 changes: 3 additions & 3 deletions aquamarine/src/plumber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -281,12 +281,12 @@ impl<RT: AquaRuntime, F: ParticleFunctionStatic> Plumber<RT, F> {

let time = stat.interpretation_time.as_secs_f64();
m.interpretation_time_sec.observe(time);
m.total_actors_mailbox.set(mailbox_size as i64);
m.alive_actors.set(self.actors.len() as i64);
}
m.total_actors_mailbox.set(mailbox_size as i64);
m.alive_actors.set(self.actors.len() as i64);

for stat in &stats {
m.service_call(stat.success, stat.kind, stat.run_time)
m.service_call(stat.success, stat.kind, stat.call_time)
}
});

Expand Down
1 change: 1 addition & 0 deletions crates/peer-metrics/src/services_metrics/builtin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ impl Stats {
ServiceCallStats::Success {
memory_delta_bytes,
call_time_sec,
lock_wait_time_sec: _,
timestamp,
} => {
self.memory_deltas_bytes.update(
Expand Down
15 changes: 12 additions & 3 deletions crates/peer-metrics/src/services_metrics/external.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ pub struct ServicesMetricsExternal {
pub modules_in_services_count: Histogram,

/// Service call time
pub call_time_msec: Family<ServiceTypeLabel, Histogram>,
pub call_time_sec: Family<ServiceTypeLabel, Histogram>,
pub lock_wait_time_sec: Family<ServiceTypeLabel, Histogram>,
pub call_success_count: Family<ServiceTypeLabel, Counter>,
pub call_failed_count: Family<ServiceTypeLabel, Counter>,

Expand Down Expand Up @@ -182,13 +183,20 @@ impl ServicesMetricsExternal {
"number of modules per services",
);

let call_time_msec: Family<_, _> = register(
let call_time_sec: Family<_, _> = register(
sub_registry,
Family::new_with_constructor(|| Histogram::new(execution_time_buckets())),
"call_time_msec",
Copy link
Member Author

@folex folex Oct 26, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand why it's called msec. @kmd-fl wdyt?

"how long it took to execute a call",
);

let lock_wait_time_sec: Family<_, _> = register(
sub_registry,
Family::new_with_constructor(|| Histogram::new(execution_time_buckets())),
"lock_wait_time_sec",
"how long a service waited for Mutex",
);

let memory_metrics = ServicesMemoryMetrics {
mem_max_bytes,
mem_max_per_module_bytes,
Expand Down Expand Up @@ -217,7 +225,8 @@ impl ServicesMetricsExternal {
removal_count,
creation_failure_count,
modules_in_services_count,
call_time_msec,
call_time_sec,
lock_wait_time_sec,
call_success_count,
call_failed_count,
memory_metrics,
Expand Down
1 change: 1 addition & 0 deletions crates/peer-metrics/src/services_metrics/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ pub enum ServiceCallStats {
Success {
memory_delta_bytes: f64,
call_time_sec: f64,
lock_wait_time_sec: f64,
timestamp: u64,
},
Fail {
Expand Down
18 changes: 12 additions & 6 deletions crates/peer-metrics/src/services_metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ impl ServicesMetrics {
service_type: ServiceType::Builtin,
};
external
.call_time_msec
.call_time_sec
.get_or_create(&label)
.observe(call_time);
if is_ok {
Expand All @@ -99,11 +99,17 @@ impl ServicesMetrics {
) {
self.observe_external(|external| {
let label = ServiceTypeLabel { service_type };
if let Success { call_time_sec, .. } = &stats {
external
.call_time_msec
.get_or_create(&label)
.observe(*call_time_sec);
if let Success {
call_time_sec,
lock_wait_time_sec,
..
} = &stats
{
let call_time_metric = external.call_time_sec.get_or_create(&label);
call_time_metric.observe(*call_time_sec);

let lock_time_metric = external.lock_wait_time_sec.get_or_create(&label);
lock_time_metric.observe(*lock_wait_time_sec);
}
external.call_success_count.get_or_create(&label).inc();
self.observe_service_mem(service_id.clone(), label.service_type, memory);
Expand Down
7 changes: 5 additions & 2 deletions particle-services/src/app_services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,6 @@ impl ParticleAppServices {
particle: ParticleParams,
create_vault: bool,
) -> FunctionOutcome {
let call_time_start = Instant::now();
let services = self.services.read();
let aliases = self.aliases.read();
let worker_id = particle.host_id;
Expand Down Expand Up @@ -509,10 +508,12 @@ impl ParticleAppServices {
};
let function_name = function_args.function_name;

let lock_acquire_start = Instant::now();
let mut service = service.lock();
let old_memory = service.module_memory_stats();
let old_mem_usage = ServicesMetricsBuiltin::get_used_memory(&old_memory);
// TODO: set execution timeout https://github.com/fluencelabs/fluence/issues/1212
let call_time_start = Instant::now();
let result = service
.call(
function_name.clone(),
Expand All @@ -539,15 +540,17 @@ impl ParticleAppServices {
ServiceError::Engine(e)
})?;

let call_time_sec = call_time_start.elapsed().as_secs_f64();
if let Some(metrics) = self.metrics.as_ref() {
let call_time_sec = call_time_start.elapsed().as_secs_f64();
let lock_wait_time_sec = lock_acquire_start.elapsed().as_secs_f64();
let new_memory = service.module_memory_stats();
let new_memory_usage = ServicesMetricsBuiltin::get_used_memory(&new_memory);

let memory_delta_bytes = new_memory_usage - old_mem_usage;
let stats = ServiceCallStats::Success {
memory_delta_bytes: memory_delta_bytes as f64,
call_time_sec,
lock_wait_time_sec,
timestamp,
};

Expand Down