From b483615ab2bd4e71155ffdade6b741583807816a Mon Sep 17 00:00:00 2001 From: Michael-F-Bryan Date: Mon, 1 May 2023 19:13:15 +0800 Subject: [PATCH] Made the in-memory cache use a LRU strategy --- lib/wasi/src/bin_factory/module_cache.rs | 2 +- lib/wasi/src/runtime/resolver/builtin.rs | 2 +- lib/wasi/src/runtime/resolver/cache.rs | 179 +++++++++++++++++++++-- lib/wasi/src/runtime/resolver/types.rs | 51 ++++++- 4 files changed, 217 insertions(+), 17 deletions(-) diff --git a/lib/wasi/src/bin_factory/module_cache.rs b/lib/wasi/src/bin_factory/module_cache.rs index aa0e49a62b5..553de1301c1 100644 --- a/lib/wasi/src/bin_factory/module_cache.rs +++ b/lib/wasi/src/bin_factory/module_cache.rs @@ -132,7 +132,7 @@ impl ModuleCache { let ident = webc.parse().context("Unable to parse the package name")?; let http_client = runtime.http_client().context("No http client available")?; let resolver = runtime.package_resolver(); - let fut = resolver.resolve_package(ident, http_client); + let fut = resolver.resolve_package(&ident, http_client); match runtime.task_manager().block_on(fut) { Ok(mut data) => { diff --git a/lib/wasi/src/runtime/resolver/builtin.rs b/lib/wasi/src/runtime/resolver/builtin.rs index 8a9840a909a..2ab423c5107 100644 --- a/lib/wasi/src/runtime/resolver/builtin.rs +++ b/lib/wasi/src/runtime/resolver/builtin.rs @@ -51,7 +51,7 @@ impl BuiltinResolver { impl PackageResolver for BuiltinResolver { async fn resolve_package( &self, - pkg: WebcIdentifier, + pkg: &WebcIdentifier, client: &(dyn HttpClient + Send + Sync), ) -> Result { crate::wapm::fetch_webc( diff --git a/lib/wasi/src/runtime/resolver/cache.rs b/lib/wasi/src/runtime/resolver/cache.rs index ca05709f0cb..b7a617bd3b4 100644 --- a/lib/wasi/src/runtime/resolver/cache.rs +++ b/lib/wasi/src/runtime/resolver/cache.rs @@ -1,4 +1,4 @@ -use std::{collections::HashMap, sync::RwLock}; +use std::{sync::Mutex, time::Instant}; use crate::{ bin_factory::BinaryPackage, @@ -6,18 +6,20 @@ use crate::{ runtime::resolver::{PackageResolver, ResolverError, WebcIdentifier}, }; -/// A simple in-memory caching layer. +/// A resolver that wraps a [`PackageResolver`] in an in-memory LRU cache. #[derive(Debug)] pub struct InMemoryCache { resolver: R, - packages: RwLock>, + packages: Mutex>, + config: CacheConfig, } impl InMemoryCache { - pub fn new(resolver: R) -> Self { + pub fn new(resolver: R, config: CacheConfig) -> Self { InMemoryCache { resolver, - packages: RwLock::new(HashMap::new()), + packages: Mutex::new(Vec::new()), + config, } } @@ -32,6 +34,33 @@ impl InMemoryCache { pub fn into_inner(self) -> R { self.resolver } + + fn lookup(&self, ident: &WebcIdentifier) -> Option { + let mut packages = self.packages.lock().unwrap(); + + let now = Instant::now(); + let entry = packages.iter_mut().find(|entry| entry.ident == *ident)?; + entry.last_touched = now; + let pkg = entry.pkg.clone(); + + self.config.prune(&mut packages, now); + + Some(pkg) + } + + fn save(&self, ident: &WebcIdentifier, pkg: BinaryPackage) { + let mut packages = self.packages.lock().unwrap(); + packages.insert( + 0, + CacheEntry { + last_touched: Instant::now(), + ident: ident.clone(), + pkg, + }, + ); + + self.config.prune(&mut packages, Instant::now()); + } } #[async_trait::async_trait] @@ -41,17 +70,17 @@ where { async fn resolve_package( &self, - ident: WebcIdentifier, + ident: &WebcIdentifier, client: &(dyn HttpClient + Send + Sync), ) -> Result { - if let Some(cached) = self.packages.read().unwrap().get(&ident).cloned() { + if let Some(cached) = self.lookup(&ident) { // Cache hit! tracing::debug!(package=?ident, "The resolved package was already cached"); return Ok(cached); } // the slow path - let pkg = self.resolver.resolve_package(ident.clone(), client).await?; + let pkg = self.resolver.resolve_package(ident, client).await?; tracing::debug!( request.name = ident.full_name.as_str(), @@ -60,8 +89,140 @@ where resolved.version = pkg.version.as_str(), "Adding resolved package to the cache", ); - self.packages.write().unwrap().insert(ident, pkg.clone()); + self.save(ident, pkg.clone()); Ok(pkg) } } + +#[derive(Debug, Clone)] +struct CacheEntry { + last_touched: Instant, + ident: WebcIdentifier, + pkg: BinaryPackage, +} + +impl CacheEntry { + fn approximate_memory_usage(&self) -> u64 { + self.pkg.file_system_memory_footprint + self.pkg.module_memory_footprint + } +} + +/// Configuration for the [`InMemoryCache`]. +#[derive(Debug, Default, Clone, PartialEq, Eq)] +#[non_exhaustive] +pub struct CacheConfig { + /// The maximum amount of data that should be held in memory before dropping + /// cached artefacts. + pub max_memory_usage: Option, +} + +impl CacheConfig { + fn prune(&self, packages: &mut Vec, now: Instant) { + // Note that we run this function while the cache lock is held, so we + // should prefer faster cache invalidation strategies over more accurate + // ones. It's also important to not block. + + packages.sort_by_key(|entry| now.duration_since(entry.last_touched)); + + if let Some(limit) = self.max_memory_usage { + let mut memory_used = 0; + + // Note: retain()'s closure is guaranteed to be run on each entry in + // order. + packages.retain(|entry| { + memory_used += entry.approximate_memory_usage(); + memory_used < limit + }); + } + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use super::*; + + #[derive(Debug, Default)] + struct DummyResolver { + calls: Mutex>, + } + + #[async_trait::async_trait] + impl PackageResolver for DummyResolver { + async fn resolve_package( + &self, + ident: &WebcIdentifier, + _client: &(dyn HttpClient + Send + Sync), + ) -> Result { + self.calls.lock().unwrap().push(ident.clone()); + Err(ResolverError::UnknownPackage(ident.clone())) + } + } + + fn dummy_pkg(name: impl Into) -> BinaryPackage { + BinaryPackage { + package_name: name.into(), + version: "0.0.0".to_string(), + when_cached: None, + entry: None, + hash: Arc::default(), + webc_fs: None, + commands: Arc::default(), + uses: Vec::new(), + module_memory_footprint: 0, + file_system_memory_footprint: 0, + } + } + + #[derive(Debug)] + struct DummyHttpClient; + + impl HttpClient for DummyHttpClient { + fn request( + &self, + _request: crate::http::HttpRequest, + ) -> futures::future::BoxFuture<'_, Result> + { + todo!() + } + } + + #[tokio::test] + async fn cache_hit() { + let resolver = DummyResolver::default(); + let cache = InMemoryCache::new(resolver, CacheConfig::default()); + let ident: WebcIdentifier = "python/python".parse().unwrap(); + cache.packages.lock().unwrap().push(CacheEntry { + last_touched: Instant::now(), + ident: ident.clone(), + pkg: dummy_pkg("python/python"), + }); + + let pkg = cache + .resolve_package(&ident, &DummyHttpClient) + .await + .unwrap(); + + assert_eq!(pkg.version, "0.0.0"); + } + + #[tokio::test] + async fn cache_miss() { + let resolver = DummyResolver::default(); + let cache = InMemoryCache::new(resolver, CacheConfig::default()); + let ident: WebcIdentifier = "python/python".parse().unwrap(); + assert!(cache.packages.lock().unwrap().is_empty()); + + let expected_err = cache + .resolve_package(&ident, &DummyHttpClient) + .await + .unwrap_err(); + + assert!(matches!(expected_err, ResolverError::UnknownPackage(_))); + // there should have been one call to the wrapped resolver + let calls = cache.get_ref().calls.lock().unwrap(); + assert_eq!(&*calls, &[ident]); + } +} diff --git a/lib/wasi/src/runtime/resolver/types.rs b/lib/wasi/src/runtime/resolver/types.rs index 7d0472b1113..b49b0366634 100644 --- a/lib/wasi/src/runtime/resolver/types.rs +++ b/lib/wasi/src/runtime/resolver/types.rs @@ -1,21 +1,38 @@ -use std::{collections::BTreeMap, ops::Deref, path::PathBuf, str::FromStr}; +use std::{collections::BTreeMap, fmt::Display, ops::Deref, path::PathBuf, str::FromStr}; -use crate::{bin_factory::BinaryPackage, http::HttpClient, runtime::resolver::InMemoryCache}; +use crate::{ + bin_factory::BinaryPackage, + http::HttpClient, + runtime::resolver::{cache::CacheConfig, InMemoryCache}, +}; #[async_trait::async_trait] pub trait PackageResolver: std::fmt::Debug + Send + Sync { /// Resolve a package, loading all dependencies. async fn resolve_package( &self, - pkg: WebcIdentifier, + pkg: &WebcIdentifier, client: &(dyn HttpClient + Send + Sync), ) -> Result; + /// Wrap the [`PackageResolver`] in an in-memory LRU cache. + /// + /// This is just a shortcut for calling + /// [`PackageResolver::with_cache_and_config()`] using + /// [`CacheConfig::default()`]. fn with_cache(self) -> InMemoryCache where Self: Sized, { - InMemoryCache::new(self) + self.with_cache_and_config(CacheConfig::default()) + } + + /// Wrap the [`PackageResolver`] in an in-memory LRU cache. + fn with_cache_and_config(self, cfg: CacheConfig) -> InMemoryCache + where + Self: Sized, + { + InMemoryCache::new(self, cfg) } } @@ -28,7 +45,7 @@ where /// Resolve a package, loading all dependencies. async fn resolve_package( &self, - pkg: WebcIdentifier, + pkg: &WebcIdentifier, client: &(dyn HttpClient + Send + Sync), ) -> Result { (**self).resolve_package(pkg, client).await @@ -40,7 +57,7 @@ pub struct WebcIdentifier { /// The package's full name (i.e. `wasmer/wapm2pirita`). pub full_name: String, pub locator: Locator, - /// A version constraint. + /// A semver-compliant version constraint. pub version: String, } @@ -70,6 +87,26 @@ impl FromStr for WebcIdentifier { } } +impl Display for WebcIdentifier { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let WebcIdentifier { + full_name, + locator, + version, + } = self; + + write!(f, "{full_name}@{version}")?; + + match locator { + Locator::Registry => {} + Locator::Local(path) => write!(f, " ({})", path.display())?, + Locator::Url(url) => write!(f, " ({url})")?, + } + + Ok(()) + } +} + #[derive(Debug, PartialEq, Eq, Hash, Clone)] pub enum Locator { /// The current registry. @@ -82,6 +119,8 @@ pub enum Locator { #[derive(Debug, thiserror::Error)] pub enum ResolverError { + #[error("Unknown package, {_0}")] + UnknownPackage(WebcIdentifier), #[error(transparent)] Other(Box), }