diff --git a/db/db_flush_test.cc b/db/db_flush_test.cc index c586cd322..9e13a71a5 100644 --- a/db/db_flush_test.cc +++ b/db/db_flush_test.cc @@ -9,12 +9,20 @@ #include +#include +#include +#include + #include "db/db_impl/db_impl.h" #include "db/db_test_util.h" +#include "env/mock_env.h" +#include "file/filename.h" +#include "include/rocksdb/utilities/checkpoint.h" #include "port/port.h" #include "port/stack_trace.h" #include "test_util/fault_injection_test_env.h" #include "test_util/sync_point.h" +#include "test_util/testutil.h" #include "util/cast_util.h" #include "util/mutexlock.h" @@ -68,6 +76,75 @@ TEST_F(DBFlushTest, FlushWhileWritingManifest) { #endif // ROCKSDB_LITE } +// We had issue when a flush with wait triggered due to ExportColumnFamily can +// finish before the result is committed to manifest. This is happening because +// of a simultaneous flush on the column family. Run in loop to reproduce the +// failure. +TEST_F(DBFlushTest, FlushBeforeWritingManifestWithCheckpoint) { + Options options; + options.disable_auto_compactions = true; + options.max_background_flushes = 3; + options.max_write_buffer_number = 3; + options.create_if_missing = true; + env_->SetBackgroundThreads(3, Env::HIGH); + options.env = env_; + DestroyAndReopen(options); + + SyncPoint::GetInstance()->LoadDependency( + {{"DBImpl::FlushMemTable:AfterScheduleNonExportFlush", + "FlushJob::WriteLevel0Table:flush_started2"}}); + + bool processed = false; + SyncPoint::GetInstance()->SetCallBack( + "FlushJob::WriteLevel0Table:flush_started1", [&](void* /*arg*/) { + // Add second write and flush without wait. + if (processed) { + return; + } + + processed = true; + FlushOptions no_wait; + no_wait.wait = false; + no_wait.allow_write_stall = true; + ASSERT_OK(Put("foo", "v")); + ASSERT_OK(dbfull()->Flush(no_wait)); + }); + + SyncPoint::GetInstance()->SetCallBack( + "FlushJob::WriteLevel0Table:add_delay", [&](void* /*arg*/) { + std::this_thread::sleep_for(std::chrono::milliseconds(rand() % 100)); + }); + + SyncPoint::GetInstance()->EnableProcessing(); + + rocksdb::ExportImportFilesMetaData* cf_sst_files_metadata = nullptr; + rocksdb::Checkpoint* checkpoint; + auto status = rocksdb::Checkpoint::Create(dbfull(), &checkpoint); + ASSERT_OK(status); + + // Add first write and trigger an ExportColumnFamily which will trigger a + // flush with wait. + ASSERT_OK(Put("bar", "v")); + uint64_t timeSinceEpochMilliseconds = + std::chrono::duration_cast( + std::chrono::system_clock::now().time_since_epoch()) + .count(); + std::string dir_path = + "/tmp/checkpoint1_" + std::to_string(timeSinceEpochMilliseconds); + status = checkpoint->ExportColumnFamily(dbfull()->DefaultColumnFamily(), + dir_path, &cf_sst_files_metadata); + ASSERT_OK(status); + + int total_files = cf_sst_files_metadata->files.size(); + ASSERT_OK(test::DestroyDir(env_, dir_path)); +#ifndef ROCKSDB_LITE + ASSERT_GT(total_files, 0); +#endif // ROCKSDB_LITE + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); +} + // Disable this test temporarily on Travis as it fails intermittently. // Github issue: #4151 TEST_F(DBFlushTest, SyncFail) { diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 1e7261e2f..0a5a04f9f 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -1336,12 +1336,15 @@ class DBImpl : public DB { // memtable pending flush. // resuming_from_bg_err indicates whether the caller is attempting to resume // from background error. + public: Status WaitForFlushMemTable(ColumnFamilyData* cfd, const uint64_t* flush_memtable_id = nullptr, bool resuming_from_bg_err = false) { return WaitForFlushMemTables({cfd}, {flush_memtable_id}, resuming_from_bg_err); } + + private: // Wait for memtables to be flushed for multiple column families. Status WaitForFlushMemTables( const autovector& cfds, diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index 7728eab5f..59dd97ade 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -1596,6 +1596,12 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd, MaybeScheduleFlushOrCompaction(); } + if (flush_options.wait) { + TEST_SYNC_POINT("DBImpl::FlushMemTable:AfterScheduleExportFlush"); + } else { + TEST_SYNC_POINT("DBImpl::FlushMemTable:AfterScheduleNonExportFlush"); + } + if (!writes_stopped) { write_thread_.ExitUnbatched(&w); } @@ -1810,16 +1816,30 @@ Status DBImpl::WaitForFlushMemTables( int num = static_cast(cfds.size()); // Wait until the compaction completes InstrumentedMutexLock l(&mutex_); + Status s; // If the caller is trying to resume from bg error, then // error_handler_.IsDBStopped() is true. while (resuming_from_bg_err || !error_handler_.IsDBStopped()) { if (shutting_down_.load(std::memory_order_acquire)) { - return Status::ShutdownInProgress(); + s = Status::ShutdownInProgress(); + return s; } // If an error has occurred during resumption, then no need to wait. + // But flush operation may fail because of this error, so need to + // return the status. if (!error_handler_.GetRecoveryError().ok()) { + s = error_handler_.GetRecoveryError(); break; } + + // If BGWorkStopped, which indicate that there is a BG error and + // 1) soft error but requires no BG work, 2) no in auto_recovery_ + if (!resuming_from_bg_err && error_handler_.IsBGWorkStopped() && + error_handler_.GetBGError().severity() < Status::Severity::kHardError) { + s = error_handler_.GetBGError(); + return s; + } + // Number of column families that have been dropped. int num_dropped = 0; // Number of column families that have finished flush. @@ -1835,7 +1855,8 @@ Status DBImpl::WaitForFlushMemTables( } } if (1 == num_dropped && 1 == num) { - return Status::InvalidArgument("Cannot flush a dropped CF"); + s = Status::ColumnFamilyDropped(); + return s; } // Column families involved in this flush request have either been dropped // or finished flush. Then it's time to finish waiting. @@ -1844,7 +1865,6 @@ Status DBImpl::WaitForFlushMemTables( } bg_cv_.Wait(); } - Status s; // If not resuming from bg error, and an error has caused the DB to stop, // then report the bg error to caller. if (!resuming_from_bg_err && error_handler_.IsDBStopped()) { diff --git a/db/flush_job.cc b/db/flush_job.cc index 716e21e97..3bd09b743 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -348,6 +348,8 @@ Status FlushJob::WriteLevel0Table() { cfd_->GetName().c_str(), job_context_->job_id, meta_.fd.GetNumber()); + TEST_SYNC_POINT("FlushJob::WriteLevel0Table:flush_started1"); + TEST_SYNC_POINT("FlushJob::WriteLevel0Table:flush_started2"); TEST_SYNC_POINT_CALLBACK("FlushJob::WriteLevel0Table:output_compression", &output_compression_); int64_t _current_time = 0; @@ -397,6 +399,8 @@ Status FlushJob::WriteLevel0Table() { } base_->Unref(); + TEST_SYNC_POINT("FlushJob::WriteLevel0Table:add_delay"); + // Note that if file_size is zero, the file has been deleted and // should not be added to the manifest. if (s.ok() && meta_.fd.GetFileSize() > 0) { diff --git a/utilities/checkpoint/checkpoint_impl.cc b/utilities/checkpoint/checkpoint_impl.cc index 0639ed2f2..fcc31993c 100644 --- a/utilities/checkpoint/checkpoint_impl.cc +++ b/utilities/checkpoint/checkpoint_impl.cc @@ -17,6 +17,7 @@ #include #include "db/wal_manager.h" +#include "db/db_impl/db_impl.h" #include "file/file_util.h" #include "file/filename.h" #include "port/port.h" @@ -363,6 +364,13 @@ Status CheckpointImpl::ExportColumnFamily( if (s.ok()) { s = db_->Flush(rocksdb::FlushOptions(), handle); + ROCKS_LOG_INFO(db_options.info_log, + "[%s] export column wait for flush to finish", + cf_name.c_str()); + // Wait for flushing the tables. + if (s.ok()) { + (reinterpret_cast(db_))->WaitForFlushMemTable(cfh->cfd()); + } } ColumnFamilyMetaData db_metadata;