From a66ad5a0e9ccebd279fbbca12464a613e28bb611 Mon Sep 17 00:00:00 2001 From: Zhongyi Xie Date: Thu, 19 Apr 2018 14:00:52 -0700 Subject: [PATCH 1/4] check return status for Sync() and Append() calls to avoid corruption Summary: Right now in `SyncClosedLogs`, `CopyFile`, and `AddRecord`, where `Sync` and `Append` are invoked in a loop, the error status are not checked. This could lead to potential corruption as later calls will overwrite the error status. Closes https://github.com/facebook/rocksdb/pull/3740 Differential Revision: D7678848 Pulled By: miasantreble fbshipit-source-id: 4b0b412975989dfe80348f73217b9c4122a4bd77 --- db/db_impl_compaction_flush.cc | 3 +++ db/log_writer.cc | 8 +++++--- util/file_util.cc | 3 +-- 3 files changed, 9 insertions(+), 5 deletions(-) diff --git a/db/db_impl_compaction_flush.cc b/db/db_impl_compaction_flush.cc index c1faccb2909..dfe64f933cc 100644 --- a/db/db_impl_compaction_flush.cc +++ b/db/db_impl_compaction_flush.cc @@ -50,6 +50,9 @@ Status DBImpl::SyncClosedLogs(JobContext* job_context) { "[JOB %d] Syncing log #%" PRIu64, job_context->job_id, log->get_log_number()); s = log->file()->Sync(immutable_db_options_.use_fsync); + if (!s.ok()) { + break; + } } if (s.ok()) { s = directories_.GetWalDir()->Fsync(); diff --git a/db/log_writer.cc b/db/log_writer.cc index a767f19160c..999f9c58022 100644 --- a/db/log_writer.cc +++ b/db/log_writer.cc @@ -57,9 +57,11 @@ Status Writer::AddRecord(const Slice& slice) { // Fill the trailer (literal below relies on kHeaderSize and // kRecyclableHeaderSize being <= 11) assert(header_size <= 11); - dest_->Append( - Slice("\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00", - static_cast(leftover))); + s = dest_->Append(Slice("\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00", + static_cast(leftover))); + if (!s.ok()) { + break; + } } block_offset_ = 0; } diff --git a/util/file_util.cc b/util/file_util.cc index 8a1adf2bd78..446a9200492 100644 --- a/util/file_util.cc +++ b/util/file_util.cc @@ -62,8 +62,7 @@ Status CopyFile(Env* env, const std::string& source, } size -= slice.size(); } - dest_writer->Sync(use_fsync); - return Status::OK(); + return dest_writer->Sync(use_fsync); } // Utility function to create a file with the provided contents From fb75bc2c343fb0311088ae634271bc2a59792a6d Mon Sep 17 00:00:00 2001 From: Andrew Kryczka Date: Thu, 22 Mar 2018 15:56:52 -0700 Subject: [PATCH 2/4] Rename function for handling WAL write error Summary: It was misnamed. It actually updates `bg_error_` if `PreprocessWrite()` or `WriteToWAL()` fail, not related to the user callback. Closes https://github.com/facebook/rocksdb/pull/3485 Differential Revision: D6955787 Pulled By: ajkr fbshipit-source-id: bd7afc3fdb7a52830c021cbfc25fcbc3ab7d5e10 --- db/db_impl.h | 2 +- db/db_impl_write.cc | 8 ++++---- db/db_write_test.cc | 16 ++++++++++++++++ 3 files changed, 21 insertions(+), 5 deletions(-) diff --git a/db/db_impl.h b/db/db_impl.h index 4c51d9fe52f..2010cae643e 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -874,7 +874,7 @@ class DBImpl : public DB { size_t seq_inc); // Used by WriteImpl to update bg_error_ if paranoid check is enabled. - void WriteCallbackStatusCheck(const Status& status); + void WriteStatusCheck(const Status& status); // Used by WriteImpl to update bg_error_ in case of memtable insert error. void MemTableInsertStatusCheck(const Status& memtable_insert_status); diff --git a/db/db_impl_write.cc b/db/db_impl_write.cc index 9f5a80faad3..c8bdf31ef3c 100644 --- a/db/db_impl_write.cc +++ b/db/db_impl_write.cc @@ -339,7 +339,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, PERF_TIMER_START(write_pre_and_post_process_time); if (!w.CallbackFailed()) { - WriteCallbackStatusCheck(status); + WriteStatusCheck(status); } if (need_log_sync) { @@ -462,7 +462,7 @@ Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options, } if (!w.CallbackFailed()) { - WriteCallbackStatusCheck(w.status); + WriteStatusCheck(w.status); } if (need_log_sync) { @@ -623,7 +623,7 @@ Status DBImpl::WriteImplWALOnly(const WriteOptions& write_options, PERF_TIMER_START(write_pre_and_post_process_time); if (!w.CallbackFailed()) { - WriteCallbackStatusCheck(status); + WriteStatusCheck(status); } if (status.ok()) { for (auto* writer : write_group) { @@ -647,7 +647,7 @@ Status DBImpl::WriteImplWALOnly(const WriteOptions& write_options, return status; } -void DBImpl::WriteCallbackStatusCheck(const Status& status) { +void DBImpl::WriteStatusCheck(const Status& status) { // Is setting bg_error_ enough here? This will at least stop // compaction and fail any further writes. if (immutable_db_options_.paranoid_checks && !status.ok() && diff --git a/db/db_write_test.cc b/db/db_write_test.cc index 1a27f470ec7..917aef550e2 100644 --- a/db/db_write_test.cc +++ b/db/db_write_test.cc @@ -80,6 +80,22 @@ TEST_P(DBWriteTest, IOErrorOnWALWritePropagateToWriteThreadFollower) { Close(); } +TEST_P(DBWriteTest, IOErrorOnWALWriteTriggersReadOnlyMode) { + std::unique_ptr mock_env( + new FaultInjectionTestEnv(Env::Default())); + Options options = GetOptions(); + options.env = mock_env.get(); + Reopen(options); + for (int i = 0; i < 2; i++) { + // Forcibly fail WAL write for the first Put only. Subsequent Puts should + // fail due to read-only mode + mock_env->SetFilesystemActive(i != 0); + ASSERT_FALSE(Put("key" + ToString(i), "value").ok()); + } + // Close before mock_env destruct. + Close(); +} + INSTANTIATE_TEST_CASE_P(DBWriteTestInstance, DBWriteTest, testing::Values(DBTestBase::kDefault, DBTestBase::kConcurrentWALWrites, From 45fbaf2f9d9b7659691e37fb410c6abb15a8a32a Mon Sep 17 00:00:00 2001 From: Maysam Yabandeh Date: Mon, 14 May 2018 10:53:32 -0700 Subject: [PATCH 3/4] Pass manual_wal_flush also to the first wal file Summary: Currently manual_wal_flush if set in the options will be used only for the wal files created during wal switch. The configuration thus does not affect the first wal file. The patch fixes that and also update the related unit tests. This PR is built on top of https://github.com/facebook/rocksdb/pull/3756 Closes https://github.com/facebook/rocksdb/pull/3824 Differential Revision: D7909153 Pulled By: maysamyabandeh fbshipit-source-id: 024ed99d2555db06bf096c902b998e432bb7b9ce --- db/db_impl.cc | 5 +++++ db/db_impl.h | 1 + db/db_impl_debug.cc | 6 ++++++ db/db_impl_open.cc | 6 +++++- db/db_write_test.cc | 35 +++++++++++++++++++++++++++++++++-- db/log_writer.cc | 2 ++ db/log_writer.h | 2 ++ options/options_helper.cc | 2 ++ util/file_reader_writer.h | 2 ++ 9 files changed, 58 insertions(+), 3 deletions(-) diff --git a/db/db_impl.cc b/db/db_impl.cc index 67cceadd3a8..fa611f9439f 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -701,6 +701,11 @@ Status DBImpl::FlushWAL(bool sync) { if (!s.ok()) { ROCKS_LOG_ERROR(immutable_db_options_.info_log, "WAL flush error %s", s.ToString().c_str()); + // In case there is a fs error we should set it globally to prevent the + // future writes + WriteStatusCheck(s); + // whether sync or not, we should abort the rest of function upon error + return s; } if (!sync) { ROCKS_LOG_DEBUG(immutable_db_options_.info_log, "FlushWAL sync=false"); diff --git a/db/db_impl.h b/db/db_impl.h index 2010cae643e..6d24d53d0c1 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -220,6 +220,7 @@ class DBImpl : public DB { virtual Status Flush(const FlushOptions& options, ColumnFamilyHandle* column_family) override; virtual Status FlushWAL(bool sync) override; + bool TEST_WALBufferIsEmpty(); virtual Status SyncWAL() override; virtual SequenceNumber GetLatestSequenceNumber() const override; diff --git a/db/db_impl_debug.cc b/db/db_impl_debug.cc index 9d87f5c29a2..d4e47d86866 100644 --- a/db/db_impl_debug.cc +++ b/db/db_impl_debug.cc @@ -25,6 +25,12 @@ void DBImpl::TEST_SwitchWAL() { SwitchWAL(&write_context); } +bool DBImpl::TEST_WALBufferIsEmpty() { + InstrumentedMutexLock wl(&log_write_mutex_); + log::Writer* cur_log_writer = logs_.back().writer; + return cur_log_writer->TEST_BufferIsEmpty(); +} + int64_t DBImpl::TEST_MaxNextLevelOverlappingBytes( ColumnFamilyHandle* column_family) { ColumnFamilyData* cfd; diff --git a/db/db_impl_open.cc b/db/db_impl_open.cc index 047a17b21fd..6a3863ba748 100644 --- a/db/db_impl_open.cc +++ b/db/db_impl_open.cc @@ -1071,7 +1071,8 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname, new_log_number, new log::Writer( std::move(file_writer), new_log_number, - impl->immutable_db_options_.recycle_log_file_num > 0)); + impl->immutable_db_options_.recycle_log_file_num > 0, + impl->immutable_db_options_.manual_wal_flush)); } // set column family handles @@ -1187,6 +1188,9 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname, if (s.ok()) { ROCKS_LOG_INFO(impl->immutable_db_options_.info_log, "DB pointer %p", impl); LogFlush(impl->immutable_db_options_.info_log); + assert(impl->TEST_WALBufferIsEmpty()); + // If the assert above fails then we need to FlushWAL before returning + // control back to the user. if (!persist_options_status.ok()) { s = Status::IOError( "DB::Open() failed --- Unable to persist Options file", diff --git a/db/db_write_test.cc b/db/db_write_test.cc index 917aef550e2..200397681c8 100644 --- a/db/db_write_test.cc +++ b/db/db_write_test.cc @@ -50,6 +50,7 @@ TEST_P(DBWriteTest, IOErrorOnWALWritePropagateToWriteThreadFollower) { std::atomic leader_count{0}; std::vector threads; mock_env->SetFilesystemActive(false); + // Wait until all threads linked to write threads, to make sure // all threads join the same batch group. SyncPoint::GetInstance()->SetCallBack( @@ -68,7 +69,13 @@ TEST_P(DBWriteTest, IOErrorOnWALWritePropagateToWriteThreadFollower) { threads.push_back(port::Thread( [&](int index) { // All threads should fail. - ASSERT_FALSE(Put("key" + ToString(index), "value").ok()); + auto res = Put("key" + ToString(index), "value"); + if (options.manual_wal_flush) { + ASSERT_TRUE(res.ok()); + // we should see fs error when we do the flush + res = dbfull()->FlushWAL(false); + } + ASSERT_FALSE(res.ok()); }, i)); } @@ -80,6 +87,22 @@ TEST_P(DBWriteTest, IOErrorOnWALWritePropagateToWriteThreadFollower) { Close(); } +TEST_P(DBWriteTest, ManualWalFlushInEffect) { + Options options = GetOptions(); + Reopen(options); + // try the 1st WAL created during open + ASSERT_TRUE(Put("key" + ToString(0), "value").ok()); + ASSERT_TRUE(options.manual_wal_flush != dbfull()->TEST_WALBufferIsEmpty()); + ASSERT_TRUE(dbfull()->FlushWAL(false).ok()); + ASSERT_TRUE(dbfull()->TEST_WALBufferIsEmpty()); + // try the 2nd wal created during SwitchWAL + dbfull()->TEST_SwitchWAL(); + ASSERT_TRUE(Put("key" + ToString(0), "value").ok()); + ASSERT_TRUE(options.manual_wal_flush != dbfull()->TEST_WALBufferIsEmpty()); + ASSERT_TRUE(dbfull()->FlushWAL(false).ok()); + ASSERT_TRUE(dbfull()->TEST_WALBufferIsEmpty()); +} + TEST_P(DBWriteTest, IOErrorOnWALWriteTriggersReadOnlyMode) { std::unique_ptr mock_env( new FaultInjectionTestEnv(Env::Default())); @@ -90,7 +113,15 @@ TEST_P(DBWriteTest, IOErrorOnWALWriteTriggersReadOnlyMode) { // Forcibly fail WAL write for the first Put only. Subsequent Puts should // fail due to read-only mode mock_env->SetFilesystemActive(i != 0); - ASSERT_FALSE(Put("key" + ToString(i), "value").ok()); + auto res = Put("key" + ToString(i), "value"); + if (options.manual_wal_flush && i == 0) { + // even with manual_wal_flush the 2nd Put should return error because of + // the read-only mode + ASSERT_TRUE(res.ok()); + // we should see fs error when we do the flush + res = dbfull()->FlushWAL(false); + } + ASSERT_FALSE(res.ok()); } // Close before mock_env destruct. Close(); diff --git a/db/log_writer.cc b/db/log_writer.cc index 999f9c58022..c31adbec5f0 100644 --- a/db/log_writer.cc +++ b/db/log_writer.cc @@ -92,6 +92,8 @@ Status Writer::AddRecord(const Slice& slice) { return s; } +bool Writer::TEST_BufferIsEmpty() { return dest_->TEST_BufferIsEmpty(); } + Status Writer::EmitPhysicalRecord(RecordType t, const char* ptr, size_t n) { assert(n <= 0xffff); // Must fit in two bytes diff --git a/db/log_writer.h b/db/log_writer.h index 143ad2674de..abd7977b94f 100644 --- a/db/log_writer.h +++ b/db/log_writer.h @@ -85,6 +85,8 @@ class Writer { Status WriteBuffer(); + bool TEST_BufferIsEmpty(); + private: unique_ptr dest_; size_t block_offset_; // Current offset in block diff --git a/options/options_helper.cc b/options/options_helper.cc index b93f3454bb5..4235bc635dc 100644 --- a/options/options_helper.cc +++ b/options/options_helper.cc @@ -124,6 +124,8 @@ DBOptions BuildDBOptions(const ImmutableDBOptions& immutable_db_options, immutable_db_options.allow_ingest_behind; options.preserve_deletes = immutable_db_options.preserve_deletes; + options.two_write_queues = immutable_db_options.two_write_queues; + options.manual_wal_flush = immutable_db_options.manual_wal_flush; return options; } diff --git a/util/file_reader_writer.h b/util/file_reader_writer.h index 9bc3b9437c3..5870a55cdf0 100644 --- a/util/file_reader_writer.h +++ b/util/file_reader_writer.h @@ -187,6 +187,8 @@ class WritableFileWriter { bool use_direct_io() { return writable_file_->use_direct_io(); } + bool TEST_BufferIsEmpty() { return buf_.CurrentSize() == 0; } + private: // Used when os buffering is OFF and we are writing // DMA such as in Direct I/O mode From b4db6b43a9237b32d6bc693e2ab0367cbdb84061 Mon Sep 17 00:00:00 2001 From: Peter Mattis Date: Fri, 30 Nov 2018 09:33:18 -0500 Subject: [PATCH 4/4] Remove unused variables that the compiler is complaining about --- utilities/transactions/transaction_test.cc | 5 ----- 1 file changed, 5 deletions(-) diff --git a/utilities/transactions/transaction_test.cc b/utilities/transactions/transaction_test.cc index bd0f6a4393b..933e13df703 100644 --- a/utilities/transactions/transaction_test.cc +++ b/utilities/transactions/transaction_test.cc @@ -2606,7 +2606,6 @@ TEST_P(TransactionTest, ColumnFamiliesTest) { TEST_P(TransactionTest, ColumnFamiliesTest2) { WriteOptions write_options; ReadOptions read_options, snapshot_read_options; - TransactionOptions txn_options; string value; Status s; @@ -3146,7 +3145,6 @@ TEST_P(TransactionTest, Rollback) { TEST_P(TransactionTest, LockLimitTest) { WriteOptions write_options; ReadOptions read_options, snapshot_read_options; - TransactionOptions txn_options; string value; Status s; @@ -3252,7 +3250,6 @@ TEST_P(TransactionTest, LockLimitTest) { TEST_P(TransactionTest, IteratorTest) { WriteOptions write_options; ReadOptions read_options, snapshot_read_options; - TransactionOptions txn_options; string value; Status s; @@ -3433,7 +3430,6 @@ TEST_P(TransactionTest, DisableIndexingTest) { TEST_P(TransactionTest, SavepointTest) { WriteOptions write_options; ReadOptions read_options, snapshot_read_options; - TransactionOptions txn_options; string value; Status s; @@ -4688,7 +4684,6 @@ TEST_P(TransactionTest, ClearSnapshotTest) { TEST_P(TransactionTest, ToggleAutoCompactionTest) { Status s; - TransactionOptions txn_options; ColumnFamilyHandle *cfa, *cfb; ColumnFamilyOptions cf_options;