diff --git a/cmd/zfs_object_agent/zettacache/src/zettacache/merge.rs b/cmd/zfs_object_agent/zettacache/src/zettacache/merge.rs index 6b695b84df54..72831665fabc 100644 --- a/cmd/zfs_object_agent/zettacache/src/zettacache/merge.rs +++ b/cmd/zfs_object_agent/zettacache/src/zettacache/merge.rs @@ -266,7 +266,9 @@ impl<'a> Progress<'a> { state.stats.track_bytes(Evictions, size); } // The index cache does not need to be updated, because the entry will continue - // to fail validation (`Locked::validate()`) based on its atime. + // to fail validation (`Locked::validate()`) based on its atime, or if it was + // removed due to remapping to None, it has already been added to + // `cache_updates`. } EntryDisposition::RemoveFreed => { if let Some(extent) = entry.value.extent() { diff --git a/cmd/zfs_object_agent/zettacache/src/zettacache/mod.rs b/cmd/zfs_object_agent/zettacache/src/zettacache/mod.rs index 7596928ea3d1..16a2db86cb9d 100644 --- a/cmd/zfs_object_agent/zettacache/src/zettacache/mod.rs +++ b/cmd/zfs_object_agent/zettacache/src/zettacache/mod.rs @@ -928,6 +928,7 @@ impl Inner { let mut free_count = 0; let mut cache_updates_count = 0; let mut handles = Vec::new(); + let mut handle_duration = Duration::ZERO; let cache_update_duration = Arc::new(std::sync::Mutex::new(Duration::ZERO)); let locked_held = Arc::new(std::sync::Mutex::new(Duration::ZERO)); // we have a channel to an active merge task, check it for messages @@ -985,39 +986,36 @@ impl Inner { })); // Spawn a new "blocking" (CPU-bound) task to process the index cache - // updates. Because this is CPU bound, if we didn't use a blocking - // task, it could starve the async executor thread. + // updates. Because this takes lots of CPU, if we didn't use a + // blocking task, it could starve the async executor thread, + // impacting overall agent performance. let inner = self.clone(); let duration = cache_update_duration.clone(); handles.push(tokio::task::spawn_blocking(move || { - // Here is where we populate the index cache to contain any new - // keys that may have been inserted or updated, as well as ensure - // any existing keys are consistent w.r.t. a remap (i.e. ensuring - // the cache references the key's new/remapped location on disk). + // Populate the index cache to contain any new keys that may have + // been inserted, as well as ensure any existing keys are + // consistent w.r.t. a remap (i.e. ensuring the cache references + // the key's new/remapped location on disk). // - // Note that keys that are already in the cache will have their - // associated values updated, and keys that are not will be added - // (with their values). Also note that any keys with no - // associated values (ghost keys) will be removed from the index. - // This is important for keys which may have been "valid", but - // were evicted because they could not be remapped. + // Keys that are already in the cache will have their associated + // values updated, and keys that are not will be added (with + // their values). Any keys with no associated values (ghost keys) + // will be removed from the index. This is important for keys + // which may have been "valid", but were evicted because they + // could not be remapped. let start = Instant::now(); - with_alloctag("Locked::index_cache", || { - for entry in progress.cache_updates.into_iter() { - match entry.value.location() { - // It's possible the key wasn't already in the cache, so - // this may add or update the key. - Some(_) => { - inner.index_cache.insert(entry.key, entry.value); - } - // It's possible the key isn't in the cache; .remove() - // doesn't fail in that case. - None => { - inner.index_cache.remove(&entry.key); - } - }; - } - }); + for entry in progress.cache_updates.into_iter() { + match entry.value.location() { + Some(_) => { + // insert or update the value + inner.index_cache.insert(entry.key, entry.value); + } + None => { + // remove if present + inner.index_cache.remove(&entry.key); + } + }; + } *(duration.lock().unwrap()) += start.elapsed(); })); @@ -1034,6 +1032,17 @@ impl Inner { } // merge task complete, replace the current index with the new index Some(MergeMessage::Complete(new_index)) => { + // Index cache updates must take effect before the indices are + // rotated. Rotation clears the MergeState and the PendingChange's + // Remap, so we can't be allowed to see any stale locations in the + // index cache, because they couldn't be corrected by + // `MergeState::entry_disposition()`'s freeing() check and + // `PendingChanges::update()`'s call to `Remap::remap()`. + let handle_start = Instant::now(); + for handle in handles.drain(..) { + handle.await.unwrap(); + } + handle_duration += handle_start.elapsed(); let mut indices = self.indices.write().await; let mut locked = lock_non_send_measured!(&self.locked).await; locked.block_allocator.finish_evacuation(new_index.id()); @@ -1051,19 +1060,20 @@ impl Inner { } } - // Frees and index cache updates must take effect before the checkpoint is - // written, so we don't try to read from old (freed) locations. + // Frees must take effect before the checkpoint is written, so we don't leak + // space if we crash. let handle_start = Instant::now(); for handle in handles { handle.await.unwrap(); } + handle_duration += handle_start.elapsed(); debug!( "processed {msg_count} merge messages with {free_count} frees and \ {cache_updates_count} cache updates in {}ms (state lock held for {}ms, cache updated in {} CPU-ms, waited {}ms)", begin.elapsed().as_millis(), locked_held.lock().unwrap().as_millis(), cache_update_duration.lock().unwrap().as_millis(), - handle_start.elapsed().as_millis(), + handle_duration.as_millis(), ); } else { loop {