Skip to content

Commit

Permalink
fix the problem that BinlogAckEnd smaller than BinlogAckStart(due to …
Browse files Browse the repository at this point in the history
…the Master clean un-relevant WriteQueue when one DB timeout) (OpenAtomFoundation#2666)

Co-authored-by: cjh <[email protected]>
  • Loading branch information
cheniujh and cheniujh authored May 24, 2024
1 parent b970e02 commit e9792ba
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 2 deletions.
1 change: 1 addition & 0 deletions include/pika_rm.h
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ class PikaReplicaManager {

// write_queue related
void ProduceWriteQueue(const std::string& ip, int port, std::string db_name, const std::vector<WriteTask>& tasks);
void DropItemInOneWriteQueue(const std::string& ip, int port, const std::string& db_name);
void DropItemInWriteQueue(const std::string& ip, int port);
int ConsumeWriteQueue();

Expand Down
12 changes: 10 additions & 2 deletions src/pika_rm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ Status SyncMasterDB::ActivateSlaveBinlogSync(const std::string& ip, int port, co
}
//Since we init a new reader, we should drop items in write queue and reset sync_window.
//Or the sent_offset and acked_offset will not match
g_pika_rm->DropItemInWriteQueue(ip, port);
g_pika_rm->DropItemInOneWriteQueue(ip, port, slave_ptr->DBName());
slave_ptr->sync_win.Reset();
slave_ptr->b_state = kReadFromFile;
}
Expand Down Expand Up @@ -335,7 +335,7 @@ Status SyncMasterDB::CheckSyncTimeout(uint64_t now) {

for (auto& node : to_del) {
coordinator_.SyncPros().RemoveSlaveNode(node.Ip(), node.Port());
g_pika_rm->DropItemInWriteQueue(node.Ip(), node.Port());
g_pika_rm->DropItemInOneWriteQueue(node.Ip(), node.Port(), DBName());
LOG(WARNING) << SyncDBInfo().ToString() << " Master del Recv Timeout slave success " << node.ToString();
}
return Status::OK();
Expand Down Expand Up @@ -645,6 +645,14 @@ int PikaReplicaManager::ConsumeWriteQueue() {
return counter;
}

void PikaReplicaManager::DropItemInOneWriteQueue(const std::string& ip, int port, const std::string& db_name) {
std::lock_guard l(write_queue_mu_);
std::string index = ip + ":" + std::to_string(port);
if (write_queues_.find(index) != write_queues_.end()) {
write_queues_[index].erase(db_name);
}
}

void PikaReplicaManager::DropItemInWriteQueue(const std::string& ip, int port) {
std::lock_guard l(write_queue_mu_);
std::string index = ip + ":" + std::to_string(port);
Expand Down

0 comments on commit e9792ba

Please sign in to comment.