Skip to content

Commit

Permalink
feat(http): Added http healthcheck endpoint (#1725)
Browse files Browse the repository at this point in the history
  • Loading branch information
gurinderu authored Jul 28, 2023
1 parent 1b25178 commit ba2c9d1
Show file tree
Hide file tree
Showing 26 changed files with 862 additions and 41 deletions.
11 changes: 11 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ members = [
"crates/peer-metrics",
"crates/spell-event-bus",
"crates/key-manager",
"crates/health",
"sorcerer",
"crates/nox-tests",
"nox",
Expand Down Expand Up @@ -86,6 +87,7 @@ script-storage = { path = "script-storage" }
spell-storage = { path = "spell-storage" }
particle-execution = { path = "particle-execution" }
system-services = { path = "crates/system-services" }
health = { path = "crates/health" }

# spell
fluence-spell-dtos = "=0.5.17"
Expand Down
1 change: 1 addition & 0 deletions aquamarine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,4 @@ anyhow = "1.0.72"
eyre = { workspace = true }
bytesize = "1.2.0"
async-trait = "0.1.72"
health = { workspace = true }
10 changes: 9 additions & 1 deletion aquamarine/src/aquamarine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use tokio::sync::mpsc;
use tokio::task::JoinHandle;

use fluence_libp2p::PeerId;
use health::HealthCheckRegistry;
use key_manager::KeyManager;
use particle_execution::{ParticleFunctionStatic, ServiceFunction};
use particle_protocol::Particle;
Expand All @@ -46,19 +47,26 @@ pub struct AquamarineBackend<RT: AquaRuntime, F> {
}

impl<RT: AquaRuntime, F: ParticleFunctionStatic> AquamarineBackend<RT, F> {
#[allow(clippy::too_many_arguments)]
pub fn new(
config: VmPoolConfig,
runtime_config: RT::Config,
builtins: F,
out: EffectsChannel,
plumber_metrics: Option<ParticleExecutorMetrics>,
vm_pool_metrics: Option<VmPoolMetrics>,
health_registry: Option<&mut HealthCheckRegistry>,
key_manager: KeyManager,
) -> (Self, AquamarineApi) {
// TODO: make `100` configurable
let (outlet, inlet) = mpsc::channel(100);
let sender = AquamarineApi::new(outlet, config.execution_timeout);
let vm_pool = VmPool::new(config.pool_size, runtime_config, vm_pool_metrics);
let vm_pool = VmPool::new(
config.pool_size,
runtime_config,
vm_pool_metrics,
health_registry,
);
let host_peer_id = key_manager.get_host_peer_id();
let plumber = Plumber::new(vm_pool, builtins, plumber_metrics, key_manager);
let this = Self {
Expand Down
97 changes: 97 additions & 0 deletions aquamarine/src/health.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
use health::HealthCheck;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;

#[derive(Clone)]
pub struct VMPoolHealth {
expected_count: usize,
current_count: Arc<AtomicUsize>,
}

impl VMPoolHealth {
pub fn new(expected_count: usize) -> Self {
Self {
expected_count,
current_count: Arc::new(AtomicUsize::new(0)),
}
}

pub fn increment_count(&self) {
self.current_count.fetch_add(1, Ordering::Release);
}
}

impl HealthCheck for VMPoolHealth {
fn status(&self) -> eyre::Result<()> {
let current = self.current_count.load(Ordering::Acquire);
if self.expected_count != current {
return Err(eyre::eyre!(
"VM pool isn't full. Current: {}, Expected: {}",
current,
self.expected_count
));
}

Ok(())
}
}

#[cfg(test)]
mod tests {
use super::*;
use std::thread;

#[test]
fn test_vm_pool_health_empty() {
let pool_health = VMPoolHealth::new(0);
let status = pool_health.status();
assert!(status.is_ok());
}

#[test]
fn test_vm_pool_health_partial() {
let pool_health = VMPoolHealth::new(5);
pool_health.increment_count();
pool_health.increment_count();
pool_health.increment_count();

let status = pool_health.status();
assert!(status.is_err());
}

#[test]
fn test_vm_pool_health_full() {
let pool_health = VMPoolHealth::new(3);
pool_health.increment_count();
pool_health.increment_count();
pool_health.increment_count();

let status = pool_health.status();
assert!(status.is_ok());
}

#[test]
fn test_vm_pool_health_concurrent_access() {
let pool_health = VMPoolHealth::new(100);
let mut handles = vec![];

// Simulate concurrent access by spawning multiple threads.
for _ in 0..50 {
let pool_health_clone = pool_health.clone();
let handle = thread::spawn(move || {
for _ in 0..2 {
pool_health_clone.increment_count();
}
});
handles.push(handle);
}

// Wait for all threads to finish.
for handle in handles {
handle.join().unwrap();
}

let status = pool_health.status();
assert!(status.is_ok());
}
}
1 change: 1 addition & 0 deletions aquamarine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ mod command;
mod config;
mod deadline;
mod error;
mod health;
mod invoke;
mod log;
mod particle_data_store;
Expand Down
2 changes: 1 addition & 1 deletion aquamarine/src/plumber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ mod tests {

fn plumber() -> Plumber<VMMock, Arc<MockF>> {
// Pool is of size 1 so it's easier to control tests
let vm_pool = VmPool::new(1, (), None);
let vm_pool = VmPool::new(1, (), None, None);
let builtin_mock = Arc::new(MockF);
let key_manager = KeyManager::new(
"keypair".into(),
Expand Down
18 changes: 17 additions & 1 deletion aquamarine/src/vm_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@
use std::task::{Context, Poll};

use futures::{future::BoxFuture, FutureExt};
use health::HealthCheckRegistry;

use peer_metrics::VmPoolMetrics;

use crate::aqua_runtime::AquaRuntime;
use crate::health::VMPoolHealth;

type RuntimeF<RT> = BoxFuture<'static, Result<RT, <RT as AquaRuntime>::Error>>;

Expand All @@ -39,6 +41,7 @@ pub struct VmPool<RT: AquaRuntime> {
runtime_config: RT::Config,
pool_size: usize,
metrics: Option<VmPoolMetrics>,
health: Option<VMPoolHealth>,
}

impl<RT: AquaRuntime> VmPool<RT> {
Expand All @@ -47,13 +50,21 @@ impl<RT: AquaRuntime> VmPool<RT> {
pool_size: usize,
runtime_config: RT::Config,
metrics: Option<VmPoolMetrics>,
health_registry: Option<&mut HealthCheckRegistry>,
) -> Self {
let health = health_registry.map(|registry| {
let health = VMPoolHealth::new(pool_size);
registry.register("vm_pool", health.clone());
health
});

let mut this = Self {
runtimes: Vec::with_capacity(pool_size),
creating_runtimes: None,
runtime_config,
pool_size,
metrics,
health,
};

this.runtimes.resize_with(pool_size, || None);
Expand Down Expand Up @@ -161,7 +172,12 @@ impl<RT: AquaRuntime> VmPool<RT> {

// Put created vm to self.vms
match vm {
Ok(vm) => vms[id] = Some(vm),
Ok(vm) => {
vms[id] = Some(vm);
if let Some(h) = self.health.as_ref() {
h.increment_count()
}
}
Err(err) => log::error!("Failed to create vm: {:?}", err), // TODO: don't panic
}

Expand Down
10 changes: 10 additions & 0 deletions crates/health/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
[package]
name = "health"
version = "0.1.0"
authors = ["Fluence Labs"]
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
eyre = { workspace = true }
Loading

0 comments on commit ba2c9d1

Please sign in to comment.