Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: make pika compactible with redis-sentinel #2854

Merged
6 changes: 3 additions & 3 deletions .github/workflows/pika.yml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ jobs:

- name: Unit Test
working-directory: ${{ github.workspace }}
run: ./pikatests.sh all
run: ./pikatests.sh all clean

# master on port 9221, slave on port 9231, all with 2 db
- name: Start codis, pika master and pika slave
Expand Down Expand Up @@ -135,7 +135,7 @@ jobs:

- name: Unit Test
working-directory: ${{ github.workspace }}
run: ./pikatests.sh all
run: ./pikatests.sh all clean

- name: Start codis, pika master and pika slave
working-directory: ${{ github.workspace }}/build
Expand Down Expand Up @@ -197,7 +197,7 @@ jobs:
- name: Unit Test
working-directory: ${{ github.workspace }}
run: |
./pikatests.sh all
./pikatests.sh all clean

- name: Start codis, pika master and pika slave
working-directory: ${{ github.workspace }}/build
Expand Down
6 changes: 4 additions & 2 deletions include/pika_admin.h
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,10 @@ class ClientCmd : public Cmd {
Cmd* Clone() override { return new ClientCmd(*this); }

private:
std::string operation_, info_;
const static std::string KILLTYPE_NORMAL;
const static std::string KILLTYPE_PUBSUB;

std::string operation_, info_, kill_type_;
void DoInitial() override;
};

Expand All @@ -262,7 +265,6 @@ class InfoCmd : public Cmd {
kInfoCommandStats,
kInfoCache
};

InfoCmd(const std::string& name, int arity, uint32_t flag) : Cmd(name, arity, flag) {}
void Do() override;
void Split(const HintKeys& hint_keys) override {};
Expand Down
2 changes: 1 addition & 1 deletion include/pika_dispatch_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ class PikaDispatchThread {
int max_conn_rbuf_size);
~PikaDispatchThread();
int StartThread();

void StopThread();
uint64_t ThreadClientList(std::vector<ClientInfo>* clients);

bool ClientKill(const std::string& ip_port);
Expand Down
5 changes: 5 additions & 0 deletions include/pika_repl_bgworker.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ class PikaReplBgWorker {
explicit PikaReplBgWorker(int queue_size);
int StartThread();
int StopThread();
int TaskQueueSize() {
int pri_size, qu_size;
cheniujh marked this conversation as resolved.
Show resolved Hide resolved
bg_thread_.QueueSize(&pri_size, &qu_size);
return pri_size + qu_size;
}
void Schedule(net::TaskFunc func, void* arg);
void Schedule(net::TaskFunc func, void* arg, std::function<void()>& call_back);
static void HandleBGWorkerWriteBinlog(void* arg);
Expand Down
2 changes: 2 additions & 0 deletions include/pika_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,8 @@ class PikaServer : public pstd::noncopyable {
void ClientKillAll();
int ClientKill(const std::string& ip_port);
int64_t ClientList(std::vector<ClientInfo>* clients = nullptr);
void ClientKillPubSub();
void ClientKillAllNormal();

/*
* Monitor used
Expand Down
4 changes: 3 additions & 1 deletion src/net/include/net_pubsub.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,18 +77,20 @@ class PubSubThread : public Thread {
bool IsReady(int fd);
int ClientPubSubChannelSize(const std::shared_ptr<NetConn>& conn);
int ClientPubSubChannelPatternSize(const std::shared_ptr<NetConn>& conn);
void NotifyCloseAllConns();

private:
void RemoveConn(const std::shared_ptr<NetConn>& conn);
void CloseConn(const std::shared_ptr<NetConn>& conn);

void CloseAllConns();
int ClientChannelSize(const std::shared_ptr<NetConn>& conn);

int msg_pfd_[2];
bool should_exit_;

mutable pstd::RWMutex rwlock_; /* For external statistics */
std::map<int, std::shared_ptr<ConnHandle>> conns_;
std::atomic<bool> close_all_conn_sig_{false};

pstd::Mutex pub_mutex_;
pstd::CondVar receiver_rsignal_;
Expand Down
31 changes: 30 additions & 1 deletion src/net/src/net_pubsub.cc
Original file line number Diff line number Diff line change
Expand Up @@ -151,14 +151,34 @@ void PubSubThread::RemoveConn(const std::shared_ptr<NetConn>& conn) {
}

void PubSubThread::CloseConn(const std::shared_ptr<NetConn>& conn) {
CloseFd(conn);
net_multiplexer_->NetDelEvent(conn->fd(), 0);
CloseFd(conn);
{
std::lock_guard l(rwlock_);
conns_.erase(conn->fd());
}
}

void PubSubThread::CloseAllConns() {
{
std::lock_guard l(channel_mutex_);
pubsub_channel_.clear();
}
{
std::lock_guard l(pattern_mutex_);
pubsub_pattern_.clear();
}
{
std::lock_guard l(rwlock_);
for (auto& pair : conns_) {
net_multiplexer_->NetDelEvent(pair.second->conn->fd(), 0);
CloseFd(pair.second->conn);
}
std::map<int, std::shared_ptr<ConnHandle>> tmp;
conns_.swap(tmp);
}
}

int PubSubThread::Publish(const std::string& channel, const std::string& msg) {
// TODO(LIBA-S): change the Publish Mode to Asynchronous
std::lock_guard lk(pub_mutex_);
Expand Down Expand Up @@ -414,6 +434,12 @@ void* PubSubThread::ThreadMain() {
char triger[1];

while (!should_stop()) {

if (close_all_conn_sig_.load()) {
close_all_conn_sig_.store(false);
CloseAllConns();
}

nfds = net_multiplexer_->NetPoll(NET_CRON_INTERVAL);
for (int i = 0; i < nfds; i++) {
pfe = (net_multiplexer_->FiredEvents()) + i;
Expand Down Expand Up @@ -585,4 +611,7 @@ void PubSubThread::Cleanup() {
}
conns_.clear();
}
void PubSubThread::NotifyCloseAllConns() {
close_all_conn_sig_.store(true);
}
}; // namespace net
28 changes: 26 additions & 2 deletions src/pika_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -722,8 +722,15 @@ void ClientCmd::DoInitial() {
res_.SetRes(CmdRes::kErrOther, "Syntax error, try CLIENT (LIST [order by [addr|idle])");
return;
}
} else if ((strcasecmp(argv_[1].data(), "kill") == 0) && argv_.size() == 3) {
} else if (argv_.size() == 3 && (strcasecmp(argv_[1].data(), "kill") == 0)) {
info_ = argv_[2];
} else if (argv_.size() == 4 &&
(strcasecmp(argv_[1].data(), "kill") == 0) &&
(strcasecmp(argv_[2].data(), "type") == 0) &&
((strcasecmp(argv_[3].data(), KILLTYPE_NORMAL.data()) == 0) || (strcasecmp(argv_[3].data(), KILLTYPE_PUBSUB.data()) == 0))) {
//kill all if user wanna kill a type
info_ = "type";
kill_type_ = argv_[3];
} else {
res_.SetRes(CmdRes::kErrOther, "Syntax error, try CLIENT (LIST [order by [addr|idle]| KILL ip:port)");
return;
Expand Down Expand Up @@ -773,6 +780,16 @@ void ClientCmd::Do() {
} else if ((strcasecmp(operation_.data(), "kill") == 0) && (strcasecmp(info_.data(), "all") == 0)) {
g_pika_server->ClientKillAll();
res_.SetRes(CmdRes::kOk);
} else if ((strcasecmp(operation_.data(), "kill") == 0) && (strcasecmp(info_.data(), "type") == 0)) {
cheniujh marked this conversation as resolved.
Show resolved Hide resolved
if (kill_type_ == KILLTYPE_NORMAL) {
g_pika_server->ClientKillAllNormal();
res_.SetRes(CmdRes::kOk);
} else if (kill_type_ == KILLTYPE_PUBSUB) {
g_pika_server->ClientKillPubSub();
res_.SetRes(CmdRes::kOk);
} else {
res_.SetRes(CmdRes::kErrOther, "kill type is unknown");
}
} else if (g_pika_server->ClientKill(info_) == 1) {
res_.SetRes(CmdRes::kOk);
} else {
Expand Down Expand Up @@ -827,6 +844,10 @@ const std::string InfoCmd::kDebugSection = "debug";
const std::string InfoCmd::kCommandStatsSection = "commandstats";
const std::string InfoCmd::kCacheSection = "cache";


const std::string ClientCmd::KILLTYPE_NORMAL = "normal";
const std::string ClientCmd::KILLTYPE_PUBSUB = "pubsub";

void InfoCmd::Execute() {
std::shared_ptr<DB> db = g_pika_server->GetDB(db_name_);
Do();
Expand Down Expand Up @@ -1238,6 +1259,7 @@ void InfoCmd::InfoReplication(std::string& info) {
Status s;
uint32_t filenum = 0;
uint64_t offset = 0;
uint64_t slave_repl_offset = 0;
std::string safety_purge;
std::shared_ptr<SyncMasterDB> master_db = nullptr;
for (const auto& t_item : g_pika_server->dbs_) {
Expand All @@ -1249,11 +1271,13 @@ void InfoCmd::InfoReplication(std::string& info) {
continue;
}
master_db->Logger()->GetProducerStatus(&filenum, &offset);
slave_repl_offset += static_cast<uint64_t>(filenum) * static_cast<uint64_t>(g_pika_conf->binlog_file_size());
slave_repl_offset += offset;
tmp_stream << db_name << ":binlog_offset=" << filenum << " " << offset;
s = master_db->GetSafetyPurgeBinlog(&safety_purge);
tmp_stream << ",safety_purge=" << (s.ok() ? safety_purge : "error") << "\r\n";
}

tmp_stream << "slave_repl_offset:" << slave_repl_offset << "\r\n";
info.append(tmp_stream.str());
}

Expand Down
4 changes: 4 additions & 0 deletions src/pika_dispatch_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ void PikaDispatchThread::UnAuthUserAndKillClient(const std::set<std::string>& us
}
}

void PikaDispatchThread::StopThread() {
AlexStocks marked this conversation as resolved.
Show resolved Hide resolved
thread_rep_->StopThread();
}

bool PikaDispatchThread::Handles::AccessHandle(std::string& ip) const {
if (ip == "127.0.0.1") {
ip = g_pika_server->host();
Expand Down
17 changes: 17 additions & 0 deletions src/pika_repl_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,23 @@ int PikaReplClient::Stop() {
for (auto & binlog_worker : write_binlog_workers_) {
binlog_worker->StopThread();
}

// write DB task is async task, we must wait all writeDB task done and then to exit
// or some data will be loss
bool all_write_db_task_done = true;
do {
for (auto &db_worker: write_db_workers_) {
if (db_worker->TaskQueueSize() != 0) {
all_write_db_task_done = false;
std::this_thread::sleep_for(std::chrono::microseconds(300));
break;
} else {
all_write_db_task_done = true;
}
}
//if there are unfinished async write db task, just continue to wait
} while (!all_write_db_task_done);

for (auto &db_worker: write_db_workers_) {
db_worker->StopThread();
}
Expand Down
31 changes: 19 additions & 12 deletions src/pika_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -104,12 +104,18 @@ PikaServer::PikaServer()

acl_ = std::make_unique<::Acl>();
SetSlowCmdThreadPoolFlag(g_pika_conf->slow_cmd_pool());
bgsave_thread_.set_thread_name("PikaServer::bgsave_thread_");
purge_thread_.set_thread_name("PikaServer::purge_thread_");
bgslots_cleanup_thread_.set_thread_name("PikaServer::bgslots_cleanup_thread_");
common_bg_thread_.set_thread_name("PikaServer::common_bg_thread_");
key_scan_thread_.set_thread_name("PikaServer::key_scan_thread_");
}

PikaServer::~PikaServer() {
rsync_server_->Stop();
// DispatchThread will use queue of worker thread,
// so we need to delete dispatch before worker.
// DispatchThread will use queue of worker thread
// so we need to Stop dispatch before worker.
pika_dispatch_thread_->StopThread();
pika_client_processor_->Stop();
pika_slow_cmd_thread_pool_->stop_thread_pool();
pika_admin_cmd_thread_pool_->stop_thread_pool();
Expand Down Expand Up @@ -779,13 +785,11 @@ size_t PikaServer::SlowCmdThreadPoolMaxQueueSize() {
}

void PikaServer::BGSaveTaskSchedule(net::TaskFunc func, void* arg) {
bgsave_thread_.set_thread_name("BGSaveTask");
bgsave_thread_.StartThread();
bgsave_thread_.Schedule(func, arg);
}

void PikaServer::PurgelogsTaskSchedule(net::TaskFunc func, void* arg) {
purge_thread_.set_thread_name("PurgelogsTask");
purge_thread_.StartThread();
purge_thread_.Schedule(func, arg);
}
Expand All @@ -797,7 +801,6 @@ void PikaServer::PurgeDir(const std::string& path) {


void PikaServer::PurgeDirTaskSchedule(void (*function)(void*), void* arg) {
purge_thread_.set_thread_name("PurgeDirTask");
purge_thread_.StartThread();
purge_thread_.Schedule(function, arg);
}
Expand Down Expand Up @@ -849,12 +852,21 @@ void PikaServer::TryDBSync(const std::string& ip, int port, const std::string& d
}

void PikaServer::KeyScanTaskSchedule(net::TaskFunc func, void* arg) {
key_scan_thread_.set_thread_name("KeyScanTask");
key_scan_thread_.StartThread();
key_scan_thread_.Schedule(func, arg);
}

void PikaServer::ClientKillAll() { pika_dispatch_thread_->ClientKillAll(); }
void PikaServer::ClientKillAll() {
pika_dispatch_thread_->ClientKillAll();
pika_pubsub_thread_->NotifyCloseAllConns();
}

void PikaServer::ClientKillPubSub() { pika_pubsub_thread_->NotifyCloseAllConns();
}

void PikaServer::ClientKillAllNormal() {
pika_dispatch_thread_->ClientKillAll();
}

int PikaServer::ClientKill(const std::string& ip_port) {
if (pika_dispatch_thread_->ClientKill(ip_port)) {
Expand Down Expand Up @@ -1575,7 +1587,6 @@ void PikaServer::Bgslotsreload(const std::shared_ptr<DB>& db) {
LOG(INFO) << "Start slot reloading";

// Start new thread if needed
bgsave_thread_.set_thread_name("SlotsReload");
bgsave_thread_.StartThread();
bgsave_thread_.Schedule(&DoBgslotsreload, static_cast<void*>(this));
}
Expand Down Expand Up @@ -1643,7 +1654,6 @@ void PikaServer::Bgslotscleanup(std::vector<int> cleanupSlots, const std::shared
LOG(INFO) << "Start slot cleanup, slots: " << slotsStr << std::endl;

// Start new thread if needed
bgslots_cleanup_thread_.set_thread_name("SlotsCleanup");
bgslots_cleanup_thread_.StartThread();
bgslots_cleanup_thread_.Schedule(&DoBgslotscleanup, static_cast<void*>(this));
}
Expand Down Expand Up @@ -1748,7 +1758,6 @@ void DoBgslotscleanup(void* arg) {
void PikaServer::ResetCacheAsync(uint32_t cache_num, std::shared_ptr<DB> db, cache::CacheConfig *cache_cfg) {
if (PIKA_CACHE_STATUS_OK == db->cache()->CacheStatus()
|| PIKA_CACHE_STATUS_NONE == db->cache()->CacheStatus()) {
common_bg_thread_.set_thread_name("ResetCacheTask");
common_bg_thread_.StartThread();
BGCacheTaskArg *arg = new BGCacheTaskArg();
arg->db = db;
Expand All @@ -1772,7 +1781,6 @@ void PikaServer::ClearCacheDbAsync(std::shared_ptr<DB> db) {
LOG(WARNING) << "can not clear cache in status: " << db->cache()->CacheStatus();
return;
}
common_bg_thread_.set_thread_name("CacheClearThread");
common_bg_thread_.StartThread();
BGCacheTaskArg *arg = new BGCacheTaskArg();
arg->db = db;
Expand Down Expand Up @@ -1840,7 +1848,6 @@ void PikaServer::ClearCacheDbAsyncV2(std::shared_ptr<DB> db) {
LOG(WARNING) << "can not clear cache in status: " << db->cache()->CacheStatus();
return;
}
common_bg_thread_.set_thread_name("V2CacheClearThread");
common_bg_thread_.StartThread();
BGCacheTaskArg *arg = new BGCacheTaskArg();
arg->db = db;
Expand Down
Loading