Skip to content

Commit

Permalink
Make ApplyCloudManifestDelta idempotent for old epoch (facebook#223)
Browse files Browse the repository at this point in the history
We should return Status::OK and return additional information through the bool* update_applied argument for ApplyCloudManifestDelta.

Also, now that it's possible to have one filenum showing more than once in CLOUDMANFIEST with different epochs, we should only check filenum is sorted in ascending order when loading CM from file.
  • Loading branch information
seckcoder authored Nov 16, 2022
1 parent b58b8c5 commit 1e7a016
Show file tree
Hide file tree
Showing 7 changed files with 96 additions and 31 deletions.
14 changes: 9 additions & 5 deletions cloud/cloud_env_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1952,7 +1952,12 @@ Status CloudEnvImpl::RollNewEpoch(const std::string& local_dbname) {
st = RollNewCookie(local_dbname, newCookie, cloudManifestDelta);
if (st.ok()) {
// Apply the delta to our in-memory state, too.
st = ApplyCloudManifestDelta(cloudManifestDelta);
bool updateApplied = true;
st = ApplyCloudManifestDelta(cloudManifestDelta, &updateApplied);
// We know for sure that <maxFileNumber, newEpoch> hasn't been applied
// in current CLOUDMANFIEST yet since maxFileNumber >= filenumber in
// CLOUDMANIFEST and epoch is generated randomly
assert(updateApplied);
}

return st;
Expand Down Expand Up @@ -1996,10 +2001,9 @@ size_t CloudEnvImpl::TEST_NumScheduledJobs() const {
return scheduler_->TEST_NumScheduledJobs();
};

Status CloudEnvImpl::ApplyCloudManifestDelta(const CloudManifestDelta& delta) {
if (!cloud_manifest_->AddEpoch(delta.file_num, delta.epoch)) {
return Status::InvalidArgument("Delta already applied in cloud manifest");
}
Status CloudEnvImpl::ApplyCloudManifestDelta(const CloudManifestDelta& delta,
bool* delta_applied) {
*delta_applied = cloud_manifest_->AddEpoch(delta.file_num, delta.epoch);
return Status::OK();
}

Expand Down
3 changes: 2 additions & 1 deletion cloud/cloud_env_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,8 @@ class CloudEnvImpl : public CloudEnv {

// Apply cloud manifest delta to in-memory cloud manifest. Does not change the
// on-disk state.
Status ApplyCloudManifestDelta(const CloudManifestDelta& delta) override;
Status ApplyCloudManifestDelta(const CloudManifestDelta& delta,
bool* delta_applied) override;

// See comments in the parent class
Status RollNewCookie(const std::string& local_dbname,
Expand Down
13 changes: 11 additions & 2 deletions cloud/cloud_manifest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,9 @@ Status CloudManifest::LoadFromLog(std::unique_ptr<SequentialFileReader> log,
return Status::Corruption(
"Records read does not match the number of expected records");
}
if (!std::is_sorted(pastEpochs.begin(), pastEpochs.end())) {
// is sorted by filenum in ascending order
if (!std::is_sorted(pastEpochs.begin(), pastEpochs.end(),
[](auto& e1, auto& e2) { return e1.first < e2.first; })) {
return Status::Corruption("Cloud manifest records not sorted");
}
manifest->reset(
Expand Down Expand Up @@ -136,7 +138,7 @@ std::unique_ptr<CloudManifest> CloudManifest::clone() const {
// varint)
//
// Header comes first, and is followed with number_of_records Records.
Status CloudManifest::WriteToLog(std::unique_ptr<WritableFileWriter> log) {
Status CloudManifest::WriteToLog(std::unique_ptr<WritableFileWriter> log) const {
Status status;
log::Writer writer(std::move(log), 0, false);
std::string record;
Expand Down Expand Up @@ -194,6 +196,7 @@ bool CloudManifest::AddEpoch(uint64_t startFileNumber, std::string epochId) {
}
nxtEpoch = &(rit->second);
}

pastEpochs_.emplace_back(startFileNumber, std::move(currentEpoch_));
currentEpoch_ = std::move(epochId);
return true;
Expand All @@ -218,6 +221,12 @@ std::string CloudManifest::GetCurrentEpoch() {
return currentEpoch_;
}

std::vector<std::pair<uint64_t, std::string>>
CloudManifest::TEST_GetPastEpochs() const {
ReadLock lck(&mutex_);
return pastEpochs_;
}

std::string CloudManifest::ToString(bool include_past_epochs) {
ReadLock lck(&mutex_);
std::ostringstream oss;
Expand Down
3 changes: 2 additions & 1 deletion cloud/cloud_manifest.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class CloudManifest {

std::unique_ptr<CloudManifest> clone() const;

Status WriteToLog(std::unique_ptr<WritableFileWriter> log);
Status WriteToLog(std::unique_ptr<WritableFileWriter> log) const;

// Add an epoch that starts with startFileNumber and is identified by epochId.
// GetEpoch(startFileNumber) == epochId
Expand All @@ -61,6 +61,7 @@ class CloudManifest {

std::string GetCurrentEpoch();
std::string ToString(bool include_past_epochs=false);
std::vector<std::pair<uint64_t, std::string>> TEST_GetPastEpochs() const;

private:
CloudManifest(std::vector<std::pair<uint64_t, std::string>> pastEpochs,
Expand Down
70 changes: 55 additions & 15 deletions cloud/cloud_manifest_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,33 @@ class CloudManifestTest : public testing::Test {
}

protected:
Status DumpToRandomFile(const CloudManifest* manifest, std::string *filepath) {
Random rnd(301);
std::string filename = "CLOUDMANIFEST" + rnd.RandomString(7);
*filepath = tmp_dir_ + "/" + filename;
std::unique_ptr<WritableFileWriter> writer;
Status st = WritableFileWriter::Create(env_->GetFileSystem(), *filepath,
FileOptions(), &writer, nullptr);
if (!st.ok()) {
return st;
}

st = manifest->WriteToLog(std::move(writer));
return st;
}

Status LoadFromFile(const std::string& filepath,
std::unique_ptr<CloudManifest>* manifest) {
std::unique_ptr<SequentialFileReader> reader;
Status st = SequentialFileReader::Create(env_->GetFileSystem(), filepath,
FileOptions(), &reader, nullptr);
if (!st.ok()) {
return st;
}
st = CloudManifest::LoadFromLog(std::move(reader), manifest);
return st;
}

std::string tmp_dir_;
Env* env_;
};
Expand Down Expand Up @@ -45,22 +72,10 @@ TEST_F(CloudManifestTest, BasicTest) {
ASSERT_EQ(manifest->GetEpoch(40), "fourthEpoch");
ASSERT_EQ(manifest->GetEpoch(41), "fourthEpoch");

// serialize and deserialize
auto tmpfile = tmp_dir_ + "/cloudmanifest";
{
std::unique_ptr<WritableFileWriter> writer;
ASSERT_OK(WritableFileWriter::Create(env_->GetFileSystem(), tmpfile,
FileOptions(), &writer, nullptr));
ASSERT_OK(manifest->WriteToLog(std::move(writer)));
}

std::string filepath;
ASSERT_OK(DumpToRandomFile(manifest.get(), &filepath));
manifest.reset();
{
std::unique_ptr<SequentialFileReader> reader;
ASSERT_OK(SequentialFileReader::Create(
env_->GetFileSystem(), tmpfile, FileOptions(), &reader, nullptr));
ASSERT_OK(CloudManifest::LoadFromLog(std::move(reader), &manifest));
}
ASSERT_OK(LoadFromFile(filepath, &manifest));
}
}
}
Expand All @@ -73,8 +88,33 @@ TEST_F(CloudManifestTest, IdempotencyTest) {
EXPECT_FALSE(manifest->AddEpoch(9, "epoch3"));
// same file number, same epoch
EXPECT_FALSE(manifest->AddEpoch(10, "epoch2"));

EXPECT_EQ(manifest->GetCurrentEpoch(), "epoch2");

// same file number, different epoch
EXPECT_TRUE(manifest->AddEpoch(10, "epoch3"));
EXPECT_EQ(manifest->GetCurrentEpoch(), "epoch3");

// idempotency for old cm delta
EXPECT_FALSE(manifest->AddEpoch(10, "epoch2"));
EXPECT_EQ(manifest->GetCurrentEpoch(), "epoch3");

EXPECT_TRUE(manifest->AddEpoch(11, "epoch4"));

EXPECT_EQ(manifest->GetCurrentEpoch(), "epoch4");

std::vector<std::pair<uint64_t, std::string>> pastEpochs{{10, "epoch1"},
{10, "epoch2"},
{11, "epoch3"}};
EXPECT_EQ(manifest->TEST_GetPastEpochs(), pastEpochs);

EXPECT_EQ(manifest->GetEpoch(9), "epoch1");
EXPECT_EQ(manifest->GetEpoch(10), "epoch3");

std::string filepath;
ASSERT_OK(DumpToRandomFile(manifest.get(), &filepath));
manifest.reset();
ASSERT_OK(LoadFromFile(filepath, &manifest));
}

} // namespace ROCKSDB_NAMESPACE
Expand Down
17 changes: 13 additions & 4 deletions cloud/db_cloud_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,9 @@ class CloudTest : public testing::Test {
if (!st.ok()) {
return st;
}
st = aenv_->ApplyCloudManifestDelta(delta);
bool applied = false;
st = aenv_->ApplyCloudManifestDelta(delta, &applied);
assert(applied);
if (!st.ok()) {
return st;
}
Expand Down Expand Up @@ -2063,7 +2065,9 @@ TEST_F(CloudTest, LiveFilesConsistentAfterApplyCloudManifestDeltaTest) {
std::string new_epoch = "dca7f3e19212c4b3";
auto delta = CloudManifestDelta{GetDBImpl()->GetNextFileNumber(), new_epoch};
ASSERT_OK(GetCloudEnvImpl()->RollNewCookie(dbname_, new_cookie, delta));
ASSERT_OK(GetCloudEnvImpl()->ApplyCloudManifestDelta(delta));
bool applied = false;
ASSERT_OK(GetCloudEnvImpl()->ApplyCloudManifestDelta(delta, &applied));
ASSERT_TRUE(applied);

std::vector<std::string> live_sst_files2;
std::string manifest_file2;
Expand All @@ -2090,7 +2094,9 @@ TEST_F(CloudTest, WriteAfterUpdateCloudManifestArePersistedInNewEpoch) {

auto delta = CloudManifestDelta{GetDBImpl()->GetNextFileNumber(), new_epoch};
ASSERT_OK(GetCloudEnvImpl()->RollNewCookie(dbname_, new_cookie, delta));
ASSERT_OK(GetCloudEnvImpl()->ApplyCloudManifestDelta(delta));
bool applied = false;
ASSERT_OK(GetCloudEnvImpl()->ApplyCloudManifestDelta(delta, &applied));
ASSERT_TRUE(applied);
GetDBImpl()->NewManifestOnNextUpdate();

// following writes are not visible for old cookie
Expand Down Expand Up @@ -3194,7 +3200,10 @@ TEST_F(CloudTest, ReplayCloudManifestDeltaTest) {

// replay the deltas one more time
for (const auto& delta : deltas) {
EXPECT_TRUE(ApplyCMDeltaToCloudDB(delta).IsInvalidArgument());
EXPECT_TRUE(aenv_->RollNewCookie(dbname_, delta.epoch, delta).IsInvalidArgument());
bool applied = false;
ASSERT_OK(aenv_->ApplyCloudManifestDelta(delta, &applied));
EXPECT_FALSE(applied);
// current epoch not changed
EXPECT_EQ(GetCloudEnvImpl()->GetCloudManifest()->GetCurrentEpoch(),
currentEpoch);
Expand Down
7 changes: 4 additions & 3 deletions include/rocksdb/cloud/cloud_env_options.h
Original file line number Diff line number Diff line change
Expand Up @@ -617,9 +617,10 @@ class CloudEnv : public Env {
// Apply cloud manifest delta to in-memory cloud manifest. Does not change the
// on-disk state.
//
// Return InvalidArgument status if the delta has been applied in current
// CloudManifest
virtual Status ApplyCloudManifestDelta(const CloudManifestDelta& delta) = 0;
// If delta has already been applied in cloud manifest, delta_applied would be
// `false`
virtual Status ApplyCloudManifestDelta(const CloudManifestDelta& delta,
bool* delta_applied) = 0;

// This function does several things:
// * Writes CLOUDMANIFEST-cookie file based on existing in-memory
Expand Down

0 comments on commit 1e7a016

Please sign in to comment.