Skip to content

Commit

Permalink
Merge pull request facebook#21 from petermattis/pmattis/check-errors
Browse files Browse the repository at this point in the history
check return status for Sync() and Append() calls to avoid corruption
  • Loading branch information
petermattis authored Nov 30, 2018
2 parents 7410b3e + b4db6b4 commit 94691be
Show file tree
Hide file tree
Showing 13 changed files with 87 additions and 17 deletions.
5 changes: 5 additions & 0 deletions db/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -701,6 +701,11 @@ Status DBImpl::FlushWAL(bool sync) {
if (!s.ok()) {
ROCKS_LOG_ERROR(immutable_db_options_.info_log, "WAL flush error %s",
s.ToString().c_str());
// In case there is a fs error we should set it globally to prevent the
// future writes
WriteStatusCheck(s);
// whether sync or not, we should abort the rest of function upon error
return s;
}
if (!sync) {
ROCKS_LOG_DEBUG(immutable_db_options_.info_log, "FlushWAL sync=false");
Expand Down
3 changes: 2 additions & 1 deletion db/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ class DBImpl : public DB {
virtual Status Flush(const FlushOptions& options,
ColumnFamilyHandle* column_family) override;
virtual Status FlushWAL(bool sync) override;
bool TEST_WALBufferIsEmpty();
virtual Status SyncWAL() override;

virtual SequenceNumber GetLatestSequenceNumber() const override;
Expand Down Expand Up @@ -874,7 +875,7 @@ class DBImpl : public DB {
size_t seq_inc);

// Used by WriteImpl to update bg_error_ if paranoid check is enabled.
void WriteCallbackStatusCheck(const Status& status);
void WriteStatusCheck(const Status& status);

// Used by WriteImpl to update bg_error_ in case of memtable insert error.
void MemTableInsertStatusCheck(const Status& memtable_insert_status);
Expand Down
3 changes: 3 additions & 0 deletions db/db_impl_compaction_flush.cc
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ Status DBImpl::SyncClosedLogs(JobContext* job_context) {
"[JOB %d] Syncing log #%" PRIu64, job_context->job_id,
log->get_log_number());
s = log->file()->Sync(immutable_db_options_.use_fsync);
if (!s.ok()) {
break;
}
}
if (s.ok()) {
s = directories_.GetWalDir()->Fsync();
Expand Down
6 changes: 6 additions & 0 deletions db/db_impl_debug.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@ void DBImpl::TEST_SwitchWAL() {
SwitchWAL(&write_context);
}

bool DBImpl::TEST_WALBufferIsEmpty() {
InstrumentedMutexLock wl(&log_write_mutex_);
log::Writer* cur_log_writer = logs_.back().writer;
return cur_log_writer->TEST_BufferIsEmpty();
}

int64_t DBImpl::TEST_MaxNextLevelOverlappingBytes(
ColumnFamilyHandle* column_family) {
ColumnFamilyData* cfd;
Expand Down
6 changes: 5 additions & 1 deletion db/db_impl_open.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1071,7 +1071,8 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
new_log_number,
new log::Writer(
std::move(file_writer), new_log_number,
impl->immutable_db_options_.recycle_log_file_num > 0));
impl->immutable_db_options_.recycle_log_file_num > 0,
impl->immutable_db_options_.manual_wal_flush));
}

// set column family handles
Expand Down Expand Up @@ -1187,6 +1188,9 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
if (s.ok()) {
ROCKS_LOG_INFO(impl->immutable_db_options_.info_log, "DB pointer %p", impl);
LogFlush(impl->immutable_db_options_.info_log);
assert(impl->TEST_WALBufferIsEmpty());
// If the assert above fails then we need to FlushWAL before returning
// control back to the user.
if (!persist_options_status.ok()) {
s = Status::IOError(
"DB::Open() failed --- Unable to persist Options file",
Expand Down
8 changes: 4 additions & 4 deletions db/db_impl_write.cc
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
PERF_TIMER_START(write_pre_and_post_process_time);

if (!w.CallbackFailed()) {
WriteCallbackStatusCheck(status);
WriteStatusCheck(status);
}

if (need_log_sync) {
Expand Down Expand Up @@ -462,7 +462,7 @@ Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options,
}

if (!w.CallbackFailed()) {
WriteCallbackStatusCheck(w.status);
WriteStatusCheck(w.status);
}

if (need_log_sync) {
Expand Down Expand Up @@ -623,7 +623,7 @@ Status DBImpl::WriteImplWALOnly(const WriteOptions& write_options,
PERF_TIMER_START(write_pre_and_post_process_time);

if (!w.CallbackFailed()) {
WriteCallbackStatusCheck(status);
WriteStatusCheck(status);
}
if (status.ok()) {
for (auto* writer : write_group) {
Expand All @@ -647,7 +647,7 @@ Status DBImpl::WriteImplWALOnly(const WriteOptions& write_options,
return status;
}

void DBImpl::WriteCallbackStatusCheck(const Status& status) {
void DBImpl::WriteStatusCheck(const Status& status) {
// Is setting bg_error_ enough here? This will at least stop
// compaction and fail any further writes.
if (immutable_db_options_.paranoid_checks && !status.ok() &&
Expand Down
49 changes: 48 additions & 1 deletion db/db_write_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ TEST_P(DBWriteTest, IOErrorOnWALWritePropagateToWriteThreadFollower) {
std::atomic<int> leader_count{0};
std::vector<port::Thread> threads;
mock_env->SetFilesystemActive(false);

// Wait until all threads linked to write threads, to make sure
// all threads join the same batch group.
SyncPoint::GetInstance()->SetCallBack(
Expand All @@ -68,7 +69,13 @@ TEST_P(DBWriteTest, IOErrorOnWALWritePropagateToWriteThreadFollower) {
threads.push_back(port::Thread(
[&](int index) {
// All threads should fail.
ASSERT_FALSE(Put("key" + ToString(index), "value").ok());
auto res = Put("key" + ToString(index), "value");
if (options.manual_wal_flush) {
ASSERT_TRUE(res.ok());
// we should see fs error when we do the flush
res = dbfull()->FlushWAL(false);
}
ASSERT_FALSE(res.ok());
},
i));
}
Expand All @@ -80,6 +87,46 @@ TEST_P(DBWriteTest, IOErrorOnWALWritePropagateToWriteThreadFollower) {
Close();
}

TEST_P(DBWriteTest, ManualWalFlushInEffect) {
Options options = GetOptions();
Reopen(options);
// try the 1st WAL created during open
ASSERT_TRUE(Put("key" + ToString(0), "value").ok());
ASSERT_TRUE(options.manual_wal_flush != dbfull()->TEST_WALBufferIsEmpty());
ASSERT_TRUE(dbfull()->FlushWAL(false).ok());
ASSERT_TRUE(dbfull()->TEST_WALBufferIsEmpty());
// try the 2nd wal created during SwitchWAL
dbfull()->TEST_SwitchWAL();
ASSERT_TRUE(Put("key" + ToString(0), "value").ok());
ASSERT_TRUE(options.manual_wal_flush != dbfull()->TEST_WALBufferIsEmpty());
ASSERT_TRUE(dbfull()->FlushWAL(false).ok());
ASSERT_TRUE(dbfull()->TEST_WALBufferIsEmpty());
}

TEST_P(DBWriteTest, IOErrorOnWALWriteTriggersReadOnlyMode) {
std::unique_ptr<FaultInjectionTestEnv> mock_env(
new FaultInjectionTestEnv(Env::Default()));
Options options = GetOptions();
options.env = mock_env.get();
Reopen(options);
for (int i = 0; i < 2; i++) {
// Forcibly fail WAL write for the first Put only. Subsequent Puts should
// fail due to read-only mode
mock_env->SetFilesystemActive(i != 0);
auto res = Put("key" + ToString(i), "value");
if (options.manual_wal_flush && i == 0) {
// even with manual_wal_flush the 2nd Put should return error because of
// the read-only mode
ASSERT_TRUE(res.ok());
// we should see fs error when we do the flush
res = dbfull()->FlushWAL(false);
}
ASSERT_FALSE(res.ok());
}
// Close before mock_env destruct.
Close();
}

INSTANTIATE_TEST_CASE_P(DBWriteTestInstance, DBWriteTest,
testing::Values(DBTestBase::kDefault,
DBTestBase::kConcurrentWALWrites,
Expand Down
10 changes: 7 additions & 3 deletions db/log_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,11 @@ Status Writer::AddRecord(const Slice& slice) {
// Fill the trailer (literal below relies on kHeaderSize and
// kRecyclableHeaderSize being <= 11)
assert(header_size <= 11);
dest_->Append(
Slice("\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00",
static_cast<size_t>(leftover)));
s = dest_->Append(Slice("\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00",
static_cast<size_t>(leftover)));
if (!s.ok()) {
break;
}
}
block_offset_ = 0;
}
Expand Down Expand Up @@ -90,6 +92,8 @@ Status Writer::AddRecord(const Slice& slice) {
return s;
}

bool Writer::TEST_BufferIsEmpty() { return dest_->TEST_BufferIsEmpty(); }

Status Writer::EmitPhysicalRecord(RecordType t, const char* ptr, size_t n) {
assert(n <= 0xffff); // Must fit in two bytes

Expand Down
2 changes: 2 additions & 0 deletions db/log_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ class Writer {

Status WriteBuffer();

bool TEST_BufferIsEmpty();

private:
unique_ptr<WritableFileWriter> dest_;
size_t block_offset_; // Current offset in block
Expand Down
2 changes: 2 additions & 0 deletions options/options_helper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,8 @@ DBOptions BuildDBOptions(const ImmutableDBOptions& immutable_db_options,
immutable_db_options.allow_ingest_behind;
options.preserve_deletes =
immutable_db_options.preserve_deletes;
options.two_write_queues = immutable_db_options.two_write_queues;
options.manual_wal_flush = immutable_db_options.manual_wal_flush;

return options;
}
Expand Down
2 changes: 2 additions & 0 deletions util/file_reader_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,8 @@ class WritableFileWriter {

bool use_direct_io() { return writable_file_->use_direct_io(); }

bool TEST_BufferIsEmpty() { return buf_.CurrentSize() == 0; }

private:
// Used when os buffering is OFF and we are writing
// DMA such as in Direct I/O mode
Expand Down
3 changes: 1 addition & 2 deletions util/file_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,7 @@ Status CopyFile(Env* env, const std::string& source,
}
size -= slice.size();
}
dest_writer->Sync(use_fsync);
return Status::OK();
return dest_writer->Sync(use_fsync);
}

// Utility function to create a file with the provided contents
Expand Down
5 changes: 0 additions & 5 deletions utilities/transactions/transaction_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2606,7 +2606,6 @@ TEST_P(TransactionTest, ColumnFamiliesTest) {
TEST_P(TransactionTest, ColumnFamiliesTest2) {
WriteOptions write_options;
ReadOptions read_options, snapshot_read_options;
TransactionOptions txn_options;
string value;
Status s;

Expand Down Expand Up @@ -3146,7 +3145,6 @@ TEST_P(TransactionTest, Rollback) {
TEST_P(TransactionTest, LockLimitTest) {
WriteOptions write_options;
ReadOptions read_options, snapshot_read_options;
TransactionOptions txn_options;
string value;
Status s;

Expand Down Expand Up @@ -3252,7 +3250,6 @@ TEST_P(TransactionTest, LockLimitTest) {
TEST_P(TransactionTest, IteratorTest) {
WriteOptions write_options;
ReadOptions read_options, snapshot_read_options;
TransactionOptions txn_options;
string value;
Status s;

Expand Down Expand Up @@ -3433,7 +3430,6 @@ TEST_P(TransactionTest, DisableIndexingTest) {
TEST_P(TransactionTest, SavepointTest) {
WriteOptions write_options;
ReadOptions read_options, snapshot_read_options;
TransactionOptions txn_options;
string value;
Status s;

Expand Down Expand Up @@ -4688,7 +4684,6 @@ TEST_P(TransactionTest, ClearSnapshotTest) {
TEST_P(TransactionTest, ToggleAutoCompactionTest) {
Status s;

TransactionOptions txn_options;
ColumnFamilyHandle *cfa, *cfb;
ColumnFamilyOptions cf_options;

Expand Down

0 comments on commit 94691be

Please sign in to comment.