Skip to content

Commit

Permalink
Merge pull request #4574 from systeminit/jhelwig/bug-532-lazily-deser…
Browse files Browse the repository at this point in the history
…ialize

Update LayerDb to lazily deserialize values from cache update messages
  • Loading branch information
jhelwig authored Sep 16, 2024
2 parents eae17ea + b6ad87b commit 7a1a916
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 42 deletions.
36 changes: 6 additions & 30 deletions lib/si-layer-cache/src/db/cache_updates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,27 +164,19 @@ where
match event.event_kind {
crate::event::LayeredEventKind::CasInsertion => {
if !self.cas_cache.contains(&event.key) {
let memory_value = self
.cas_cache
.deserialize_memory_value(event.payload.value.clone())
.await?;
let serialized_value =
Arc::try_unwrap(event.payload.value).unwrap_or_else(|arc| (*arc).clone());
self.cas_cache
.insert_from_cache_updates(event.key, memory_value, serialized_value)
.insert_from_cache_updates(event.key, serialized_value)
.await?;
}
}
crate::event::LayeredEventKind::EncryptedSecretInsertion => {
if !self.encrypted_secret_cache.contains(&event.key) {
let memory_value = self
.encrypted_secret_cache
.deserialize_memory_value(event.payload.value.clone())
.await?;
let serialized_value =
Arc::try_unwrap(event.payload.value).unwrap_or_else(|arc| (*arc).clone());
self.encrypted_secret_cache
.insert_from_cache_updates(event.key, memory_value, serialized_value)
.insert_from_cache_updates(event.key, serialized_value)
.await?;
}
}
Expand All @@ -194,14 +186,10 @@ where

crate::event::LayeredEventKind::RebaseBatchWrite => {
if !self.rebase_batch_cache.contains(&event.key) {
let memory_value = self
.rebase_batch_cache
.deserialize_memory_value(event.payload.value.clone())
.await?;
let serialized_value =
Arc::try_unwrap(event.payload.value).unwrap_or_else(|arc| (*arc).clone());
self.rebase_batch_cache
.insert_from_cache_updates(event.key, memory_value, serialized_value)
.insert_from_cache_updates(event.key, serialized_value)
.await?;
}
}
Expand All @@ -213,14 +201,10 @@ where

crate::event::LayeredEventKind::SnapshotWrite => {
if !self.snapshot_cache.contains(&event.key) {
let memory_value = self
.snapshot_cache
.deserialize_memory_value(event.payload.value.clone())
.await?;
let serialized_value =
Arc::try_unwrap(event.payload.value).unwrap_or_else(|arc| (*arc).clone());
self.snapshot_cache
.insert_from_cache_updates(event.key, memory_value, serialized_value)
.insert_from_cache_updates(event.key, serialized_value)
.await?;
}
}
Expand All @@ -230,25 +214,17 @@ where
.await?;
}
crate::event::LayeredEventKind::FuncRunWrite => {
let memory_value = self
.func_run_cache
.deserialize_memory_value(event.payload.value.clone())
.await?;
let serialized_value =
Arc::try_unwrap(event.payload.value).unwrap_or_else(|arc| (*arc).clone());
self.func_run_cache
.insert_or_update_from_cache_updates(event.key, memory_value, serialized_value)
.insert_or_update_from_cache_updates(event.key, serialized_value)
.await?;
}
crate::event::LayeredEventKind::FuncRunLogWrite => {
let memory_value = self
.func_run_log_cache
.deserialize_memory_value(event.payload.value.clone())
.await?;
let serialized_value =
Arc::try_unwrap(event.payload.value).unwrap_or_else(|arc| (*arc).clone());
self.func_run_log_cache
.insert_or_update_from_cache_updates(event.key, memory_value, serialized_value)
.insert_or_update_from_cache_updates(event.key, serialized_value)
.await?;
}
}
Expand Down
10 changes: 4 additions & 6 deletions lib/si-layer-cache/src/layer_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,10 +203,11 @@ where
pub async fn insert_from_cache_updates(
&self,
key: Arc<str>,
memory_value: V,
serialize_value: Vec<u8>,
) -> LayerDbResult<()> {
self.memory_cache.insert(key.clone(), memory_value).await;
self.memory_cache
.insert_raw_bytes(key.clone(), serialize_value.clone())
.await;
self.spawn_disk_cache_write_vec(key.clone(), serialize_value)
.await
}
Expand All @@ -218,12 +219,9 @@ where
pub async fn insert_or_update_from_cache_updates(
&self,
key: Arc<str>,
memory_value: V,
serialize_value: Vec<u8>,
) -> LayerDbResult<()> {
self.insert_or_update(key.clone(), memory_value).await;
self.spawn_disk_cache_write_vec(key.clone(), serialize_value)
.await
self.insert_from_cache_updates(key, serialize_value).await
}

pub async fn evict_from_cache_updates(&self, key: Arc<str>) -> LayerDbResult<()> {
Expand Down
54 changes: 48 additions & 6 deletions lib/si-layer-cache/src/memory_cache.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,32 @@
use serde::Deserialize;
use std::{sync::Arc, time::Duration};
use telemetry::prelude::*;

use moka::future::Cache;
use serde::{de::DeserializeOwned, Serialize};

use crate::db::serialize;

#[derive(Clone, Debug)]
enum MaybeDeserialized<V>
where
V: Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
{
RawBytes(Vec<u8>),
DeserializedValue(V),
}

#[derive(Clone, Debug)]
pub struct MemoryCache<V>
where
V: Serialize + DeserializeOwned + Clone + Send + Sync + Clone + 'static,
V: Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
{
cache: Cache<Arc<str>, V>,
cache: Cache<Arc<str>, MaybeDeserialized<V>>,
}

impl<V> Default for MemoryCache<V>
where
V: Serialize + DeserializeOwned + Clone + Send + Sync + Clone + 'static,
V: Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
{
fn default() -> Self {
Self::new(MemoryCacheConfig::default())
Expand All @@ -23,7 +35,7 @@ where

impl<V> MemoryCache<V>
where
V: Serialize + DeserializeOwned + Clone + Send + Sync + Clone + 'static,
V: Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
{
pub fn new(config: MemoryCacheConfig) -> Self {
Self {
Expand All @@ -34,11 +46,41 @@ where
}

pub async fn get(&self, key: &str) -> Option<V> {
self.cache.get(key).await
match self.cache.get(key).await {
Some(MaybeDeserialized::DeserializedValue(value)) => Some(value),
Some(MaybeDeserialized::RawBytes(bytes)) => {
// If we fail to deserialize the raw bytes for some reason, pretend that we never
// had the key in the first place, and also remove it from the cache.
match serialize::from_bytes_async::<V>(&bytes).await {
Ok(deserialized) => {
self.insert(key.into(), deserialized.clone()).await;
Some(deserialized)
}
Err(e) => {
error!(
"Failed to deserialize stored bytes from memory cache for key ({:?}): {}",
key,
e
);
self.remove(key).await;
None
}
}
}
None => None,
}
}

pub async fn insert(&self, key: Arc<str>, value: V) {
self.cache.insert(key, value).await;
self.cache
.insert(key, MaybeDeserialized::DeserializedValue(value))
.await;
}

pub async fn insert_raw_bytes(&self, key: Arc<str>, raw_bytes: Vec<u8>) {
self.cache
.insert(key, MaybeDeserialized::RawBytes(raw_bytes))
.await;
}

pub async fn remove(&self, key: &str) {
Expand Down

0 comments on commit 7a1a916

Please sign in to comment.