Skip to content

Commit

Permalink
DOSE-714 DataObject::get_block() fails unwrap due to agent restart/re…
Browse files Browse the repository at this point in the history
…sume issue (openzfs#498)

When we replay zio's, we reissue both reads and writes. The agent
collects all the writes and then merges them together with the
"recovered objects", which are part of the currently-syncing txg.
However, the reads are processed immediately, as they are received.  If
one of these reads is for a block that is in a "recovered object", we
don't realize this, and instead attempt to read from the last known
object in the pool (i.e. the last object that was written as part of the
last synced txg). Since the block is not in that object, this unwrap
fails.

The solution is to not process the reads until the "recovered
objects" have been processed.
  • Loading branch information
ahrens authored Oct 8, 2021
1 parent c23188b commit 1e3ffa1
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 5 deletions.
7 changes: 6 additions & 1 deletion cmd/zfs_object_agent/zettaobject/src/data_object.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,12 @@ impl DataObjectPhys {
}

pub fn get_block(&self, block: BlockId) -> &[u8] {
self.blocks.get(&block).unwrap()
match self.blocks.get(&block) {
Some(buf) => buf,
None => {
panic!("{:?} not found in {:?}: {:?}", block, self.object, self);
}
}
}

pub fn blocks_len(&self) -> u32 {
Expand Down
5 changes: 3 additions & 2 deletions cmd/zfs_object_agent/zettaobject/src/object_block_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ impl ObjectBlockMap {

pub fn block_to_object(&self, block: BlockId) -> ObjectId {
let state = self.state.read().unwrap();
assert_lt!(block, state.next_block);
state
.map
.range((Unbounded, Included(block)))
Expand Down Expand Up @@ -207,9 +208,9 @@ impl ObjectBlockMap {
state.map.is_empty()
}

pub fn for_each<CB>(&self, mut f: CB)
pub fn for_each<F>(&self, mut f: F)
where
CB: FnMut(&ObjectBlockMapEntry),
F: FnMut(&ObjectBlockMapEntry),
{
let state = self.state.read().unwrap();
for ent in state.map.iter() {
Expand Down
30 changes: 28 additions & 2 deletions cmd/zfs_object_agent/zettaobject/src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ use std::time::Duration;
use std::time::{Instant, SystemTime};
use stream_reduce::Reduce;
use tokio::sync::oneshot;
use tokio::sync::watch;
use tokio::task::JoinHandle;
use tokio::time::sleep;
use util::get_tunable;
Expand Down Expand Up @@ -370,6 +371,7 @@ pub struct PoolState {
object_block_map: ObjectBlockMap,
zettacache: Option<ZettaCache>,
pub shared_state: Arc<PoolSharedState>,
resuming: watch::Receiver<bool>,
_heartbeat_guard: Option<HeartbeatGuard>, // Used for RAII
}

Expand Down Expand Up @@ -497,6 +499,7 @@ struct PoolSyncingState {
cleanup_handle: Option<JoinHandle<()>>,
delete_objects_handle: Option<JoinHandle<()>>,
features: HashMap<FeatureFlag, u64>,
resuming: watch::Sender<bool>,
}

type SyncTask =
Expand Down Expand Up @@ -797,6 +800,7 @@ impl Pool {
cache: Option<ZettaCache>,
heartbeat_guard: Option<HeartbeatGuard>,
readonly: bool,
resuming: bool,
) -> Result<(Pool, UberblockPhys, BlockId), PoolOpenError> {
let phys = UberblockPhys::get(object_access, pool_phys.guid, txg).await?;

Expand Down Expand Up @@ -830,6 +834,7 @@ impl Pool {
),
});
}
let (tx, rx) = watch::channel(resuming);
let pool = Pool {
state: Arc::new(PoolState {
shared_state: shared_state.clone(),
Expand All @@ -853,9 +858,11 @@ impl Pool {
cleanup_handle: None,
delete_objects_handle: None,
features: phys.features.iter().cloned().collect(),
resuming: tx,
})),
zettacache: cache,
object_block_map,
resuming: rx,
_heartbeat_guard: heartbeat_guard,
}),
};
Expand Down Expand Up @@ -884,7 +891,7 @@ impl Pool {
txg: Option<Txg>,
cache: Option<ZettaCache>,
id: Uuid,
resume: bool,
resuming: bool,
) -> Result<(Pool, Option<UberblockPhys>, BlockId), PoolOpenError> {
let phys = PoolPhys::get(object_access, guid).await?;
if phys.last_txg.0 == 0 {
Expand Down Expand Up @@ -919,6 +926,8 @@ impl Pool {
),
});

let (tx, rx) = watch::channel(false);

let mut pool = Pool {
state: Arc::new(PoolState {
shared_state: shared_state.clone(),
Expand All @@ -942,9 +951,11 @@ impl Pool {
cleanup_handle: None,
delete_objects_handle: None,
features: Default::default(),
resuming: tx,
})),
zettacache: cache,
object_block_map,
resuming: rx,
_heartbeat_guard: if !object_access.readonly() {
Some(heartbeat::start_heartbeat(object_access.clone(), id).await)
} else {
Expand All @@ -969,6 +980,7 @@ impl Pool {
None
},
object_access.readonly(),
resuming,
)
.await?;

Expand Down Expand Up @@ -1003,7 +1015,7 @@ impl Pool {
join3(
pool.state.clone().cleanup_log_objects(),
pool.state.clone().cleanup_uberblock_objects(last_txg),
if resume {
if resuming {
Either::Left(future::ready(()))
} else {
Either::Right(state.cleanup_data_objects())
Expand Down Expand Up @@ -1189,6 +1201,10 @@ impl Pool {
);
Self::initiate_flush_object_impl(state, syncing_state);

assert!(*syncing_state.resuming.borrow());
// This unwrap is safe because there's a receiver: state.resuming
syncing_state.resuming.send(false).unwrap();

info!("resume: completed");
})
}
Expand Down Expand Up @@ -1561,6 +1577,16 @@ impl Pool {
}

async fn read_block_impl(&self, block: BlockId, bypass_cache: bool) -> Vec<u8> {
// If we are in the middle of resuming, wait for that to complete before
// processing this read. This is needed because we may be reading from
// a block that hasn't yet been added to the ObjectBlockMap.
if *self.state.resuming.borrow() {
let mut resuming = self.state.resuming.clone();
while *resuming.borrow_and_update() {
resuming.changed().await.unwrap();
}
}

let object = self.state.object_block_map.block_to_object(block);
let shared_state = self.state.shared_state.clone();

Expand Down

0 comments on commit 1e3ffa1

Please sign in to comment.