Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor interface for registering custom label providers #130

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 73 additions & 28 deletions lightswitch-metadata/src/metadata_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,34 +4,57 @@ use crate::taskname::TaskName;

use anyhow::Result;
use lru::LruCache;
use std::fmt::Display;
use std::fmt::Formatter;
use std::num::NonZeroUsize;
use std::sync::{Arc, Mutex};
use thiserror::Error;
use tracing::{error, warn};

#[derive(Debug, Clone, Copy)]
pub struct TaskKey {
pub pid: i32,
pub tid: i32,
}

impl Display for TaskKey {
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
write!(f, "pid={}, tid={}", self.pid, self.tid)
}
}

#[derive(Debug, Error)]
pub enum MetadataProviderError {
#[error("Failed to retrieve metadata for task_id={0}, error={1}")]
ErrorRetrievingMetadata(i32, String),
pub enum TaskMetadataProviderError {
#[error("Failed to retrieve metadata for task_key={0}, error={1}")]
ErrorRetrievingMetadata(TaskKey, String),
}

pub trait MetadataProvider {
/// Return a vector of labels for the provided task id.
/// Labels returned by this function will be assumed to apply
/// to all task_ids in the same process/tgid as the provided task_id.
fn get_metadata(&self, task_id: i32) -> Result<Vec<MetadataLabel>, MetadataProviderError>;
pub trait TaskMetadataProvider {
/// Return a vector of labels that apply to the provided task_key.
fn get_metadata(
&self,
task_key: TaskKey,
) -> Result<Vec<MetadataLabel>, TaskMetadataProviderError>;
}
pub type ThreadSafeMetadataProvider = Arc<Mutex<Box<dyn MetadataProvider + Send>>>;
pub type ThreadSafeTaskMetadataProvider = Arc<Mutex<Box<dyn TaskMetadataProvider + Send>>>;

pub struct GlobalMetadataProvider {
pid_label_cache: LruCache</*pid*/ i32, Vec<MetadataLabel>>,
system_metadata: SystemMetadata,
custom_metadata_providers: Vec<ThreadSafeMetadataProvider>,
#[derive(Debug, Error)]
pub enum SystemMetadataProviderError {
#[error("Failed to retrieve system metadata, error={0}")]
ErrorRetrievingMetadata(String),
}
pub trait SystemMetadataProvider {
/// Return a vector of labels that apply to the current host system.
fn get_metadata(&self) -> Result<Vec<MetadataLabel>, SystemMetadataProviderError>;
}

pub struct TaskKey {
pub pid: i32,
pub tid: i32,
pub type ThreadSafeSystemMetadataProvider = Arc<Mutex<Box<dyn SystemMetadataProvider + Send>>>;

pub struct GlobalMetadataProvider {
pid_label_cache: LruCache</*pid*/ i32, Vec<MetadataLabel>>,
default_system_metadata: SystemMetadata,
custom_system_metadata_providers: Vec<ThreadSafeSystemMetadataProvider>,
custom_task_metadata_providers: Vec<ThreadSafeTaskMetadataProvider>,
}

pub type ThreadSafeGlobalMetadataProvider = Arc<Mutex<GlobalMetadataProvider>>;
Expand All @@ -46,29 +69,51 @@ impl GlobalMetadataProvider {
pub fn new(metadata_cache_size: NonZeroUsize) -> Self {
Self {
pid_label_cache: LruCache::new(metadata_cache_size),
system_metadata: SystemMetadata {},
custom_metadata_providers: Vec::new(),
default_system_metadata: SystemMetadata {},
custom_system_metadata_providers: Vec::new(),
custom_task_metadata_providers: Vec::new(),
}
}

pub fn register_custom_providers(&mut self, providers: Vec<ThreadSafeMetadataProvider>) {
self.custom_metadata_providers.extend(providers);
pub fn register_task_metadata_providers(
&mut self,
providers: Vec<ThreadSafeTaskMetadataProvider>,
) {
self.custom_task_metadata_providers.extend(providers);
}

pub fn register_system_metadata_providers(
&mut self,
providers: Vec<ThreadSafeSystemMetadataProvider>,
) {
self.custom_system_metadata_providers.extend(providers);
}

fn get_labels(&mut self, pid: i32) -> Vec<MetadataLabel> {
fn get_labels(&mut self, task_key: TaskKey) -> Vec<MetadataLabel> {
let mut labels = self
.system_metadata
.default_system_metadata
.get_metadata()
.map_err(|err| warn!("{}", err))
.unwrap_or_default();

for provider in &self.custom_metadata_providers {
match provider.lock().unwrap().get_metadata(pid) {
Ok(custom_labels) => {
labels.extend(custom_labels.into_iter());
for provider in &self.custom_system_metadata_providers {
match provider.lock().unwrap().get_metadata() {
Ok(custom_system_labels) => {
labels.extend(custom_system_labels.into_iter());
}
Err(err) => {
warn!("Failed to retrieve custom system metadata, error = {}", err);
}
}
}

for provider in &self.custom_task_metadata_providers {
match provider.lock().unwrap().get_metadata(task_key) {
Ok(custom_task_labels) => {
labels.extend(custom_task_labels.into_iter());
}
Err(err) => {
warn!("Failed to retrieve custom metadata, error = {}", err);
warn!("Failed to retrieve custom task metadata, error = {}", err);
}
}
}
Expand All @@ -88,7 +133,7 @@ impl GlobalMetadataProvider {
if let Some(cached_labels) = self.pid_label_cache.get(&pid) {
task_metadata.extend(cached_labels.iter().cloned());
} else {
let labels = self.get_labels(pid);
let labels = self.get_labels(task_key);
self.pid_label_cache.push(pid, labels.clone());
task_metadata.extend(labels);
}
Expand Down