Skip to content

Commit

Permalink
Made the in-memory cache use a LRU strategy
Browse files Browse the repository at this point in the history
  • Loading branch information
Michael-F-Bryan committed May 1, 2023
1 parent 5842881 commit f16a6da
Show file tree
Hide file tree
Showing 4 changed files with 217 additions and 17 deletions.
2 changes: 1 addition & 1 deletion lib/wasi/src/bin_factory/module_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ impl ModuleCache {
let ident = webc.parse().context("Unable to parse the package name")?;
let http_client = runtime.http_client().context("No http client available")?;
let resolver = runtime.package_resolver();
let fut = resolver.resolve_package(ident, http_client);
let fut = resolver.resolve_package(&ident, http_client);

match runtime.task_manager().block_on(fut) {
Ok(mut data) => {
Expand Down
2 changes: 1 addition & 1 deletion lib/wasi/src/runtime/resolver/builtin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ impl BuiltinResolver {
impl PackageResolver for BuiltinResolver {
async fn resolve_package(
&self,
pkg: WebcIdentifier,
pkg: &WebcIdentifier,
client: &(dyn HttpClient + Send + Sync),
) -> Result<BinaryPackage, ResolverError> {
crate::wapm::fetch_webc(
Expand Down
179 changes: 170 additions & 9 deletions lib/wasi/src/runtime/resolver/cache.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,25 @@
use std::{collections::HashMap, sync::RwLock};
use std::{sync::Mutex, time::Instant};

use crate::{
bin_factory::BinaryPackage,
http::HttpClient,
runtime::resolver::{PackageResolver, ResolverError, WebcIdentifier},
};

/// A simple in-memory caching layer.
/// A resolver that wraps a [`PackageResolver`] in an in-memory LRU cache.
#[derive(Debug)]
pub struct InMemoryCache<R> {
resolver: R,
packages: RwLock<HashMap<WebcIdentifier, BinaryPackage>>,
packages: Mutex<Vec<CacheEntry>>,
config: CacheConfig,
}

impl<R> InMemoryCache<R> {
pub fn new(resolver: R) -> Self {
pub fn new(resolver: R, config: CacheConfig) -> Self {
InMemoryCache {
resolver,
packages: RwLock::new(HashMap::new()),
packages: Mutex::new(Vec::new()),
config,
}
}

Expand All @@ -32,6 +34,33 @@ impl<R> InMemoryCache<R> {
pub fn into_inner(self) -> R {
self.resolver
}

fn lookup(&self, ident: &WebcIdentifier) -> Option<BinaryPackage> {
let mut packages = self.packages.lock().unwrap();

let now = Instant::now();
let entry = packages.iter_mut().find(|entry| entry.ident == *ident)?;
entry.last_touched = now;
let pkg = entry.pkg.clone();

self.config.prune(&mut packages, now);

Some(pkg)
}

fn save(&self, ident: &WebcIdentifier, pkg: BinaryPackage) {
let mut packages = self.packages.lock().unwrap();
packages.insert(
0,
CacheEntry {
last_touched: Instant::now(),
ident: ident.clone(),
pkg,
},
);

self.config.prune(&mut packages, Instant::now());
}
}

#[async_trait::async_trait]
Expand All @@ -41,17 +70,17 @@ where
{
async fn resolve_package(
&self,
ident: WebcIdentifier,
ident: &WebcIdentifier,
client: &(dyn HttpClient + Send + Sync),
) -> Result<BinaryPackage, ResolverError> {
if let Some(cached) = self.packages.read().unwrap().get(&ident).cloned() {
if let Some(cached) = self.lookup(&ident) {
// Cache hit!
tracing::debug!(package=?ident, "The resolved package was already cached");
return Ok(cached);
}

// the slow path
let pkg = self.resolver.resolve_package(ident.clone(), client).await?;
let pkg = self.resolver.resolve_package(ident, client).await?;

tracing::debug!(
request.name = ident.full_name.as_str(),
Expand All @@ -60,8 +89,140 @@ where
resolved.version = pkg.version.as_str(),
"Adding resolved package to the cache",
);
self.packages.write().unwrap().insert(ident, pkg.clone());
self.save(ident, pkg.clone());

Ok(pkg)
}
}

#[derive(Debug, Clone)]
struct CacheEntry {
last_touched: Instant,
ident: WebcIdentifier,
pkg: BinaryPackage,
}

impl CacheEntry {
fn approximate_memory_usage(&self) -> u64 {
self.pkg.file_system_memory_footprint + self.pkg.module_memory_footprint
}
}

/// Configuration for the [`InMemoryCache`].
#[derive(Debug, Default, Clone, PartialEq, Eq)]
#[non_exhaustive]
pub struct CacheConfig {
/// The maximum amount of data that should be held in memory before dropping
/// cached artefacts.
pub max_memory_usage: Option<u64>,
}

impl CacheConfig {
fn prune(&self, packages: &mut Vec<CacheEntry>, now: Instant) {
// Note that we run this function while the cache lock is held, so we
// should prefer faster cache invalidation strategies over more accurate
// ones. It's also important to not block.

packages.sort_by_key(|entry| now.duration_since(entry.last_touched));

if let Some(limit) = self.max_memory_usage {
let mut memory_used = 0;

// Note: retain()'s closure is guaranteed to be run on each entry in
// order.
packages.retain(|entry| {
memory_used += entry.approximate_memory_usage();
memory_used < limit
});
}
}
}

#[cfg(test)]
mod tests {
use std::sync::Arc;

use super::*;

#[derive(Debug, Default)]
struct DummyResolver {
calls: Mutex<Vec<WebcIdentifier>>,
}

#[async_trait::async_trait]
impl PackageResolver for DummyResolver {
async fn resolve_package(
&self,
ident: &WebcIdentifier,
_client: &(dyn HttpClient + Send + Sync),
) -> Result<BinaryPackage, ResolverError> {
self.calls.lock().unwrap().push(ident.clone());
Err(ResolverError::UnknownPackage(ident.clone()))
}
}

fn dummy_pkg(name: impl Into<String>) -> BinaryPackage {
BinaryPackage {
package_name: name.into(),
version: "0.0.0".to_string(),
when_cached: None,
entry: None,
hash: Arc::default(),
webc_fs: None,
commands: Arc::default(),
uses: Vec::new(),
module_memory_footprint: 0,
file_system_memory_footprint: 0,
}
}

#[derive(Debug)]
struct DummyHttpClient;

impl HttpClient for DummyHttpClient {
fn request(
&self,
_request: crate::http::HttpRequest,
) -> futures::future::BoxFuture<'_, Result<crate::http::HttpResponse, anyhow::Error>>
{
todo!()
}
}

#[tokio::test]
async fn cache_hit() {
let resolver = DummyResolver::default();
let cache = InMemoryCache::new(resolver, CacheConfig::default());
let ident: WebcIdentifier = "python/python".parse().unwrap();
cache.packages.lock().unwrap().push(CacheEntry {
last_touched: Instant::now(),
ident: ident.clone(),
pkg: dummy_pkg("python/python"),
});

let pkg = cache
.resolve_package(&ident, &DummyHttpClient)
.await
.unwrap();

assert_eq!(pkg.version, "0.0.0");
}

#[tokio::test]
async fn cache_miss() {
let resolver = DummyResolver::default();
let cache = InMemoryCache::new(resolver, CacheConfig::default());
let ident: WebcIdentifier = "python/python".parse().unwrap();
assert!(cache.packages.lock().unwrap().is_empty());

let expected_err = cache
.resolve_package(&ident, &DummyHttpClient)
.await
.unwrap_err();

assert!(matches!(expected_err, ResolverError::UnknownPackage(_)));
// there should have been one call to the wrapped resolver
let calls = cache.get_ref().calls.lock().unwrap();
assert_eq!(&*calls, &[ident]);
}
}
51 changes: 45 additions & 6 deletions lib/wasi/src/runtime/resolver/types.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,38 @@
use std::{collections::BTreeMap, ops::Deref, path::PathBuf, str::FromStr};
use std::{collections::BTreeMap, fmt::Display, ops::Deref, path::PathBuf, str::FromStr};

use crate::{bin_factory::BinaryPackage, http::HttpClient, runtime::resolver::InMemoryCache};
use crate::{
bin_factory::BinaryPackage,
http::HttpClient,
runtime::resolver::{cache::CacheConfig, InMemoryCache},
};

#[async_trait::async_trait]
pub trait PackageResolver: std::fmt::Debug + Send + Sync {
/// Resolve a package, loading all dependencies.
async fn resolve_package(
&self,
pkg: WebcIdentifier,
pkg: &WebcIdentifier,
client: &(dyn HttpClient + Send + Sync),
) -> Result<BinaryPackage, ResolverError>;

/// Wrap the [`PackageResolver`] in an in-memory LRU cache.
///
/// This is just a shortcut for calling
/// [`PackageResolver::with_cache_and_config()`] using
/// [`CacheConfig::default()`].
fn with_cache(self) -> InMemoryCache<Self>
where
Self: Sized,
{
InMemoryCache::new(self)
self.with_cache_and_config(CacheConfig::default())
}

/// Wrap the [`PackageResolver`] in an in-memory LRU cache.
fn with_cache_and_config(self, cfg: CacheConfig) -> InMemoryCache<Self>
where
Self: Sized,
{
InMemoryCache::new(self, cfg)
}
}

Expand All @@ -28,7 +45,7 @@ where
/// Resolve a package, loading all dependencies.
async fn resolve_package(
&self,
pkg: WebcIdentifier,
pkg: &WebcIdentifier,
client: &(dyn HttpClient + Send + Sync),
) -> Result<BinaryPackage, ResolverError> {
(**self).resolve_package(pkg, client).await
Expand All @@ -40,7 +57,7 @@ pub struct WebcIdentifier {
/// The package's full name (i.e. `wasmer/wapm2pirita`).
pub full_name: String,
pub locator: Locator,
/// A version constraint.
/// A semver-compliant version constraint.
pub version: String,
}

Expand Down Expand Up @@ -70,6 +87,26 @@ impl FromStr for WebcIdentifier {
}
}

impl Display for WebcIdentifier {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let WebcIdentifier {
full_name,
locator,
version,
} = self;

write!(f, "{full_name}@{version}")?;

match locator {
Locator::Registry => {}
Locator::Local(path) => write!(f, " ({})", path.display())?,
Locator::Url(url) => write!(f, " ({url})")?,
}

Ok(())
}
}

#[derive(Debug, PartialEq, Eq, Hash, Clone)]
pub enum Locator {
/// The current registry.
Expand All @@ -82,6 +119,8 @@ pub enum Locator {

#[derive(Debug, thiserror::Error)]
pub enum ResolverError {
#[error("Unknown package, {_0}")]
UnknownPackage(WebcIdentifier),
#[error(transparent)]
Other(Box<dyn std::error::Error + Send + Sync>),
}
Expand Down

0 comments on commit f16a6da

Please sign in to comment.