From 3a1595c04bc428e538825878000326213f3bc824 Mon Sep 17 00:00:00 2001 From: Matthew Ahrens Date: Tue, 9 May 2023 13:49:19 -0700 Subject: [PATCH] DLPX-85889 zoa panic: UpdateAtime has no corresponding entry in the index run (#845) The zettacache index cache is updated as part of merging the PendingChanges into the on-disk index. The merge task sends the updates to the checkpoint task, as part of a `MergeProgress` message. The index cache updates are then made from a spawned blocking (CPU-bound) task. The updates are completed (waited for) before the next checkpoint completes. During the merge, it's expected that lookups can see IndexEntry's from the old index, either from reading the old index itself, or from the index entry cache. These stale entries are "corrected" by either `PendingChanges::update()`'s call to `Remap::remap()`, or `MergeState::entry_disposition()`'s check of `PendingChanges::freeing()`. When the `MergeMessage::Complete` is received it calls `Locked::rotate_index()` which deletes the old on-disk index, and calls `PendingChanges::set_remap(None)` and `Locked::merge.take()`. This ends the stale entry "corrections" mentioned above, which are no longer necessary because we can no longer see stale entries from the old on-disk index. The problem occurs when the `MergeMessage::Complete` is received and processed before the spawned blocking task completes. In this case, we end the stale entry "corrections", but we can still see stale entries from the index cache. This PR addresses the problem by waiting for the index cache updates to complete before processing the `MergeMessage::Complete`. The problem was introduced by #808. --- .../zettacache/src/zettacache/merge.rs | 4 +- .../zettacache/src/zettacache/mod.rs | 72 +++++++++++-------- 2 files changed, 44 insertions(+), 32 deletions(-) diff --git a/cmd/zfs_object_agent/zettacache/src/zettacache/merge.rs b/cmd/zfs_object_agent/zettacache/src/zettacache/merge.rs index 6b695b84df54..72831665fabc 100644 --- a/cmd/zfs_object_agent/zettacache/src/zettacache/merge.rs +++ b/cmd/zfs_object_agent/zettacache/src/zettacache/merge.rs @@ -266,7 +266,9 @@ impl<'a> Progress<'a> { state.stats.track_bytes(Evictions, size); } // The index cache does not need to be updated, because the entry will continue - // to fail validation (`Locked::validate()`) based on its atime. + // to fail validation (`Locked::validate()`) based on its atime, or if it was + // removed due to remapping to None, it has already been added to + // `cache_updates`. } EntryDisposition::RemoveFreed => { if let Some(extent) = entry.value.extent() { diff --git a/cmd/zfs_object_agent/zettacache/src/zettacache/mod.rs b/cmd/zfs_object_agent/zettacache/src/zettacache/mod.rs index 7596928ea3d1..16a2db86cb9d 100644 --- a/cmd/zfs_object_agent/zettacache/src/zettacache/mod.rs +++ b/cmd/zfs_object_agent/zettacache/src/zettacache/mod.rs @@ -928,6 +928,7 @@ impl Inner { let mut free_count = 0; let mut cache_updates_count = 0; let mut handles = Vec::new(); + let mut handle_duration = Duration::ZERO; let cache_update_duration = Arc::new(std::sync::Mutex::new(Duration::ZERO)); let locked_held = Arc::new(std::sync::Mutex::new(Duration::ZERO)); // we have a channel to an active merge task, check it for messages @@ -985,39 +986,36 @@ impl Inner { })); // Spawn a new "blocking" (CPU-bound) task to process the index cache - // updates. Because this is CPU bound, if we didn't use a blocking - // task, it could starve the async executor thread. + // updates. Because this takes lots of CPU, if we didn't use a + // blocking task, it could starve the async executor thread, + // impacting overall agent performance. let inner = self.clone(); let duration = cache_update_duration.clone(); handles.push(tokio::task::spawn_blocking(move || { - // Here is where we populate the index cache to contain any new - // keys that may have been inserted or updated, as well as ensure - // any existing keys are consistent w.r.t. a remap (i.e. ensuring - // the cache references the key's new/remapped location on disk). + // Populate the index cache to contain any new keys that may have + // been inserted, as well as ensure any existing keys are + // consistent w.r.t. a remap (i.e. ensuring the cache references + // the key's new/remapped location on disk). // - // Note that keys that are already in the cache will have their - // associated values updated, and keys that are not will be added - // (with their values). Also note that any keys with no - // associated values (ghost keys) will be removed from the index. - // This is important for keys which may have been "valid", but - // were evicted because they could not be remapped. + // Keys that are already in the cache will have their associated + // values updated, and keys that are not will be added (with + // their values). Any keys with no associated values (ghost keys) + // will be removed from the index. This is important for keys + // which may have been "valid", but were evicted because they + // could not be remapped. let start = Instant::now(); - with_alloctag("Locked::index_cache", || { - for entry in progress.cache_updates.into_iter() { - match entry.value.location() { - // It's possible the key wasn't already in the cache, so - // this may add or update the key. - Some(_) => { - inner.index_cache.insert(entry.key, entry.value); - } - // It's possible the key isn't in the cache; .remove() - // doesn't fail in that case. - None => { - inner.index_cache.remove(&entry.key); - } - }; - } - }); + for entry in progress.cache_updates.into_iter() { + match entry.value.location() { + Some(_) => { + // insert or update the value + inner.index_cache.insert(entry.key, entry.value); + } + None => { + // remove if present + inner.index_cache.remove(&entry.key); + } + }; + } *(duration.lock().unwrap()) += start.elapsed(); })); @@ -1034,6 +1032,17 @@ impl Inner { } // merge task complete, replace the current index with the new index Some(MergeMessage::Complete(new_index)) => { + // Index cache updates must take effect before the indices are + // rotated. Rotation clears the MergeState and the PendingChange's + // Remap, so we can't be allowed to see any stale locations in the + // index cache, because they couldn't be corrected by + // `MergeState::entry_disposition()`'s freeing() check and + // `PendingChanges::update()`'s call to `Remap::remap()`. + let handle_start = Instant::now(); + for handle in handles.drain(..) { + handle.await.unwrap(); + } + handle_duration += handle_start.elapsed(); let mut indices = self.indices.write().await; let mut locked = lock_non_send_measured!(&self.locked).await; locked.block_allocator.finish_evacuation(new_index.id()); @@ -1051,19 +1060,20 @@ impl Inner { } } - // Frees and index cache updates must take effect before the checkpoint is - // written, so we don't try to read from old (freed) locations. + // Frees must take effect before the checkpoint is written, so we don't leak + // space if we crash. let handle_start = Instant::now(); for handle in handles { handle.await.unwrap(); } + handle_duration += handle_start.elapsed(); debug!( "processed {msg_count} merge messages with {free_count} frees and \ {cache_updates_count} cache updates in {}ms (state lock held for {}ms, cache updated in {} CPU-ms, waited {}ms)", begin.elapsed().as_millis(), locked_held.lock().unwrap().as_millis(), cache_update_duration.lock().unwrap().as_millis(), - handle_start.elapsed().as_millis(), + handle_duration.as_millis(), ); } else { loop {