Skip to content

Commit

Permalink
Fix to potential data inconsistency bug (#187)
Browse files Browse the repository at this point in the history
* During `LogMgr::sync`, if individual file's flush fails, we should
not update `syncedSeqNum` of that file. Otherwise, there can be a
risk that the manifest might write that incorrect number in another
call stack, and that results in data inconsistency.
  • Loading branch information
greensky00 authored Dec 23, 2024
1 parent 447e138 commit 7789be4
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 11 deletions.
4 changes: 2 additions & 2 deletions src/log_file.cc
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ Status LogFile::getPrefix(const uint64_t chk,
return Status();
}

Status LogFile::flushMemTable(uint64_t upto) {
Status LogFile::flushMemTable(uint64_t upto, uint64_t& flushed_seq_out) {
touch();
// Skip unnecessary flushing
if (immutable && !fHandle && isSynced()) {
Expand Down Expand Up @@ -509,7 +509,7 @@ Status LogFile::flushMemTable(uint64_t upto) {

RwSerializer rws(fOps, fHandle, true);

TC( mTable->flush(rws, upto) );
TC( mTable->flush(rws, upto, flushed_seq_out) );
TC( mTable->appendFlushMarker(rws) );

TC( fOps->flush(fHandle) );
Expand Down
2 changes: 1 addition & 1 deletion src/log_file.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ class LogFile {
bool allow_flushed_log,
bool allow_tombstone);

Status flushMemTable(uint64_t upto = NOT_INITIALIZED);
Status flushMemTable(uint64_t upto, uint64_t& flushed_seq_out);

Status purgeMemTable();

Expand Down
20 changes: 15 additions & 5 deletions src/log_mgr.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1202,8 +1202,20 @@ Status LogMgr::syncInternal(bool call_fsync) {
if (li.empty() || li.ptr->isRemoved()) continue;

uint64_t before_sync = li->file->getSyncedSeqNum();
EP( li->file->flushMemTable( seq_barrier ? seq_barrier : NOT_INITIALIZED ) );
uint64_t after_sync = li->file->getSyncedSeqNum();
uint64_t after_sync = NOT_INITIALIZED;
EP( li->file->flushMemTable( (seq_barrier ? seq_barrier : NOT_INITIALIZED),
after_sync ) );
if (call_fsync) {
EP( li->file->sync() );
}

// WARNING:
// We should update the syncedSeqNum after the sync() operation.
// If sync() fails and we update syncedSeqNum beforehand, there's a
// risk that the manifest might write an incorrect syncedSeqNum in
// another call stack (e.g., addNewLogFile()). This could lead to
// data loss in the event of a crash.
li->file->setSyncedSeqNum(after_sync);
_log_( log_level, myLog, "synced log file %zu, min seq %s, "
"flush seq %s, sync seq %s -> %s, max seq %s",
ii,
Expand All @@ -1212,9 +1224,7 @@ Status LogMgr::syncInternal(bool call_fsync) {
_seq_str( before_sync ).c_str(),
_seq_str( after_sync ).c_str(),
_seq_str( li->file->getMaxSeqNum() ).c_str() );
if (call_fsync) {
EP( li->file->sync() );
}

if (valid_number(after_sync)) {
last_synced_log = ii;
}
Expand Down
4 changes: 2 additions & 2 deletions src/memtable.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1233,7 +1233,7 @@ Status MemTable::findOffsetOfSeq(SimpleLogger* logger,
}

// MemTable flush: skiplist (memory) -> log file. (disk)
Status MemTable::flush(RwSerializer& rws, uint64_t upto)
Status MemTable::flush(RwSerializer& rws, uint64_t upto, uint64_t& flushed_seq_out)
{
if (minSeqNum == NOT_INITIALIZED) {
// No log in this file. Just do nothing and return OK.
Expand Down Expand Up @@ -1450,7 +1450,7 @@ Status MemTable::flush(RwSerializer& rws, uint64_t upto)
skiplist_get_size(idxBySeq),
start_seqnum, seqnum_upto);

syncedSeqNum = seqnum_upto;
flushed_seq_out = seqnum_upto;
return Status();

} catch (Status s) {
Expand Down
2 changes: 1 addition & 1 deletion src/memtable.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ class MemTable {
uint64_t& offset_out,
uint64_t* padding_start_pos_out = nullptr);

Status flush(RwSerializer& rws, uint64_t upto = NOT_INITIALIZED);
Status flush(RwSerializer& rws, uint64_t upto, uint64_t& flushed_seq_out);
Status checkpoint(uint64_t& seq_num_out);
Status getLogsToFlush(const uint64_t seq_num,
std::list<Record*>& list_out,
Expand Down

0 comments on commit 7789be4

Please sign in to comment.