Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: successive exec of flushdb may cause delete old db fail #2790

Merged
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 15 additions & 6 deletions include/pika_admin.h
Original file line number Diff line number Diff line change
Expand Up @@ -185,11 +185,15 @@ class FlushallCmd : public Cmd {
void Split(const HintKeys& hint_keys) override{};
void Merge() override{};
Cmd* Clone() override { return new FlushallCmd(*this); }
void FlushAllWithoutLock();
bool FlushAllWithoutLock();
void DoBinlog() override;

private:
void DoInitial() override;
void DoWithoutLock(std::shared_ptr<DB> db);
bool DoWithoutLock(std::shared_ptr<DB> db);
void Clear() override { flushall_succeed_ = false; }

bool flushall_succeed_{false};
};

class FlushdbCmd : public Cmd {
Expand All @@ -204,14 +208,19 @@ class FlushdbCmd : public Cmd {
void Split(const HintKeys& hint_keys) override{};
void Merge() override{};
Cmd* Clone() override { return new FlushdbCmd(*this); }
void FlushAllDBsWithoutLock();
std::string GetFlushDBname() { return db_name_; }
void DoBinlog() override;
bool DoWithoutLock();

private:
std::string db_name_;
void DoInitial() override;
void Clear() override { db_name_.clear(); }
void DoWithoutLock();
void Clear() override {
db_name_.clear();
flush_succeed_ = false;
}

bool flush_succeed_{false};
std::string db_name_;
};

class ClientCmd : public Cmd {
Expand Down
105 changes: 68 additions & 37 deletions src/pika_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,7 @@ void SelectCmd::Do() {
}

void FlushallCmd::DoInitial() {
flushall_succeed_ = false;
if (!CheckArg(argv_.size())) {
res_.SetRes(CmdRes::kWrongNum, kCmdNameFlushall);
return;
Expand All @@ -510,13 +511,17 @@ void FlushallCmd::Do() {
for (const auto& db_item : g_pika_server->GetDB()) {
db_item.second->DBLock();
}
FlushAllWithoutLock();
flushall_succeed_ = FlushAllWithoutLock();
for (const auto& db_item : g_pika_server->GetDB()) {
db_item.second->DBUnlock();
}
g_pika_rm->DBUnlock();
if (res_.ok()) {
if (flushall_succeed_) {
res_.SetRes(CmdRes::kOk);
} else {
res_.SetRes(CmdRes::kErrOther,
"Flushall failed, maybe only some of the dbs successfully flushed while some not, check WARNING/ERROR log to know "
"more, you can try again moment later");
}
}

Expand All @@ -525,37 +530,52 @@ void FlushallCmd::DoThroughDB() {
}

void FlushallCmd::DoUpdateCache(std::shared_ptr<DB> db) {
if (!flushall_succeed_) {
//flushdb failed, also don't clear the cache
return;
}
// clear cache
if (PIKA_CACHE_NONE != g_pika_conf->cache_mode()) {
g_pika_server->ClearCacheDbAsync(db);
}
}

void FlushallCmd::FlushAllWithoutLock() {
bool FlushallCmd::FlushAllWithoutLock() {
for (const auto& db_item : g_pika_server->GetDB()) {
std::shared_ptr<DB> db = db_item.second;
DBInfo p_info(db->GetDBName());
if (g_pika_rm->GetSyncMasterDBs().find(p_info) == g_pika_rm->GetSyncMasterDBs().end()) {
res_.SetRes(CmdRes::kErrOther, "DB not found");
return;
LOG(ERROR) << p_info.db_name_ << " not found when flushall db";
cheniujh marked this conversation as resolved.
Show resolved Hide resolved
return false;
}
DoWithoutLock(db);
}
if (res_.ok()) {
cheniujh marked this conversation as resolved.
Show resolved Hide resolved
res_.SetRes(CmdRes::kOk);
bool success = DoWithoutLock(db);
if (!success) { return false; }
}
return true;
}

void FlushallCmd::DoWithoutLock(std::shared_ptr<DB> db) {
bool FlushallCmd::DoWithoutLock(std::shared_ptr<DB> db) {
if (!db) {
LOG(INFO) << "Flushall, but DB not found";
} else {
db->FlushDBWithoutLock();
DoUpdateCache(db);
LOG(ERROR) << "Flushall, but DB not found";
return false;
}
bool success = db->FlushDBWithoutLock();
if (!success) {
// if the db is not flushed, return before clear the cache
return success;
cheniujh marked this conversation as resolved.
Show resolved Hide resolved
}
DoUpdateCache(db);

return true;
}
void FlushallCmd::DoBinlog() {
if (flushall_succeed_) {
Cmd::DoBinlog();
}
}

void FlushdbCmd::DoInitial() {
flush_succeed_ = false;
if (!CheckArg(argv_.size())) {
res_.SetRes(CmdRes::kWrongNum, kCmdNameFlushdb);
return;
Expand All @@ -570,16 +590,20 @@ void FlushdbCmd::DoInitial() {

void FlushdbCmd::Do() {
if (!db_) {
res_.SetRes(CmdRes::kInvalidDB);
res_.SetRes(CmdRes::kInvalidDB, "DB not found while flushdb");
return;
}
if (db_->IsKeyScaning()) {
res_.SetRes(CmdRes::kErrOther, "The keyscan operation is executing, Try again later");
return;
}
std::lock_guard s_prw(g_pika_rm->GetDBLock());
std::lock_guard l_prw(db_->GetDBLock());
flush_succeed_ = DoWithoutLock();
if (flush_succeed_) {
res_.SetRes(CmdRes::kOk);
} else {
if (db_->IsKeyScaning()) {
res_.SetRes(CmdRes::kErrOther, "The keyscan operation is executing, Try again later");
} else {
std::lock_guard s_prw(g_pika_rm->GetDBLock());
std::lock_guard l_prw(db_->GetDBLock());
FlushAllDBsWithoutLock();
res_.SetRes(CmdRes::kOk);
}
res_.SetRes(CmdRes::kErrOther, "flushdb failed, Try again later(check WARNING/ERROR log to know more)");
}
}

Expand All @@ -588,31 +612,38 @@ void FlushdbCmd::DoThroughDB() {
}

void FlushdbCmd::DoUpdateCache() {
if (!flush_succeed_) {
//if flushdb failed, also do not clear the cache
return;
}
// clear cache
if (g_pika_conf->cache_mode() != PIKA_CACHE_NONE) {
g_pika_server->ClearCacheDbAsync(db_);
}
}

void FlushdbCmd::FlushAllDBsWithoutLock() {
bool FlushdbCmd::DoWithoutLock() {
if (!db_) {
LOG(ERROR) << "Flushdb, but DB not found";
return false;
}
DBInfo p_info(db_->GetDBName());
if (g_pika_rm->GetSyncMasterDBs().find(p_info) == g_pika_rm->GetSyncMasterDBs().end()) {
res_.SetRes(CmdRes::kErrOther, "DB not found");
return;
LOG(ERROR) << "DB not found when flushing " << db_->GetDBName();
return false;
}
DoWithoutLock();
if (db_name_ != "all") {
cheniujh marked this conversation as resolved.
Show resolved Hide resolved
//Floyd does not support flushdb by type
LOG(ERROR) << "cannot flushdb by type in floyd";
return false;
}

return db_->FlushDBWithoutLock();
}

void FlushdbCmd::DoWithoutLock() {
if (!db_) {
LOG(INFO) << "Flushdb, but DB not found";
} else {
if (db_name_ == "all") {
db_->FlushDBWithoutLock();
} else {
//Floyd does not support flushdb by type
LOG(ERROR) << "cannot flushdb by type in floyd";
}
void FlushdbCmd::DoBinlog() {
if (flush_succeed_) {
Cmd::DoBinlog();
}
}

Expand Down
14 changes: 11 additions & 3 deletions src/pika_db.cc
Original file line number Diff line number Diff line change
Expand Up @@ -204,15 +204,23 @@ bool DB::FlushDBWithoutLock() {
if (dbpath[dbpath.length() - 1] == '/') {
dbpath.erase(dbpath.length() - 1);
}
dbpath.append("_deleting/");
pstd::RenameFile(db_path_, dbpath);

std::string delete_suffix("_deleting_");
delete_suffix.append(std::to_string(NowMicros()));
delete_suffix.append("/");
dbpath.append(delete_suffix);
auto rename_success = pstd::RenameFile(db_path_, dbpath);
storage_ = std::make_shared<storage::Storage>(g_pika_conf->db_instance_num(),
g_pika_conf->default_slot_num(), g_pika_conf->classic_mode());
rocksdb::Status s = storage_->Open(g_pika_server->storage_options(), db_path_);
assert(storage_);
assert(s.ok());
if (rename_success == -1) {
//the storage_->Open actually opened old RocksDB instance, so flushdb failed
LOG(WARNING) << db_name_ << " FlushDB failed due to rename old db_path_ failed";
return false;
}
LOG(INFO) << db_name_ << " Open new db success";

g_pika_server->PurgeDir(dbpath);
return true;
}
Expand Down
2 changes: 2 additions & 0 deletions src/pika_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ void DoPurgeDir(void* arg) {
LOG(INFO) << "Delete dir: " << *path << " done";
}


PikaServer::PikaServer()
: exit_(false),
slow_cmd_thread_pool_flag_(g_pika_conf->slow_cmd_pool()),
Expand Down Expand Up @@ -794,6 +795,7 @@ void PikaServer::PurgeDir(const std::string& path) {
PurgeDirTaskSchedule(&DoPurgeDir, static_cast<void*>(dir_path));
}


void PikaServer::PurgeDirTaskSchedule(void (*function)(void*), void* arg) {
purge_thread_.set_thread_name("PurgeDirTask");
purge_thread_.StartThread();
Expand Down
2 changes: 1 addition & 1 deletion src/pika_transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ void ExecCmd::Do() {
client_conn->SetAllTxnFailed();
} else if (cmd->name() == kCmdNameFlushdb) {
auto flushdb = std::dynamic_pointer_cast<FlushdbCmd>(cmd);
flushdb->FlushAllDBsWithoutLock();
flushdb->DoWithoutLock();
if (cmd->res().ok()) {
cmd->res().SetRes(CmdRes::kOk);
}
Expand Down
Loading