diff --git a/.github/workflows/pika.yml b/.github/workflows/pika.yml index 7d42a8b6c0..fc86272a92 100644 --- a/.github/workflows/pika.yml +++ b/.github/workflows/pika.yml @@ -63,7 +63,7 @@ jobs: - name: Unit Test working-directory: ${{ github.workspace }} - run: ./pikatests.sh all + run: ./pikatests.sh all clean # master on port 9221, slave on port 9231, all with 2 db - name: Start codis, pika master and pika slave @@ -135,7 +135,7 @@ jobs: - name: Unit Test working-directory: ${{ github.workspace }} - run: ./pikatests.sh all + run: ./pikatests.sh all clean - name: Start codis, pika master and pika slave working-directory: ${{ github.workspace }}/build @@ -197,7 +197,7 @@ jobs: - name: Unit Test working-directory: ${{ github.workspace }} run: | - ./pikatests.sh all + ./pikatests.sh all clean - name: Start codis, pika master and pika slave working-directory: ${{ github.workspace }}/build diff --git a/include/pika_admin.h b/include/pika_admin.h index 7ae269e559..1b1aa1bad3 100644 --- a/include/pika_admin.h +++ b/include/pika_admin.h @@ -238,7 +238,10 @@ class ClientCmd : public Cmd { Cmd* Clone() override { return new ClientCmd(*this); } private: - std::string operation_, info_; + const static std::string KILLTYPE_NORMAL; + const static std::string KILLTYPE_PUBSUB; + + std::string operation_, info_, kill_type_; void DoInitial() override; }; @@ -262,7 +265,6 @@ class InfoCmd : public Cmd { kInfoCommandStats, kInfoCache }; - InfoCmd(const std::string& name, int arity, uint32_t flag) : Cmd(name, arity, flag) {} void Do() override; void Split(const HintKeys& hint_keys) override {}; diff --git a/include/pika_dispatch_thread.h b/include/pika_dispatch_thread.h index 4d48bf60bc..01a6fe96b0 100644 --- a/include/pika_dispatch_thread.h +++ b/include/pika_dispatch_thread.h @@ -14,7 +14,7 @@ class PikaDispatchThread { int max_conn_rbuf_size); ~PikaDispatchThread(); int StartThread(); - + void StopThread(); uint64_t ThreadClientList(std::vector* clients); bool ClientKill(const std::string& ip_port); diff --git a/include/pika_repl_bgworker.h b/include/pika_repl_bgworker.h index dcf59c4b94..dd62622fb9 100644 --- a/include/pika_repl_bgworker.h +++ b/include/pika_repl_bgworker.h @@ -24,6 +24,12 @@ class PikaReplBgWorker { explicit PikaReplBgWorker(int queue_size); int StartThread(); int StopThread(); + int TaskQueueSize() { + int pri_size = 0; + int qu_size = 0; + bg_thread_.QueueSize(&pri_size, &qu_size); + return pri_size + qu_size; + } void Schedule(net::TaskFunc func, void* arg); void Schedule(net::TaskFunc func, void* arg, std::function& call_back); static void HandleBGWorkerWriteBinlog(void* arg); diff --git a/include/pika_server.h b/include/pika_server.h index aa5f8dbc53..68643660fe 100644 --- a/include/pika_server.h +++ b/include/pika_server.h @@ -222,6 +222,8 @@ class PikaServer : public pstd::noncopyable { void ClientKillAll(); int ClientKill(const std::string& ip_port); int64_t ClientList(std::vector* clients = nullptr); + void ClientKillPubSub(); + void ClientKillAllNormal(); /* * Monitor used diff --git a/src/net/include/net_pubsub.h b/src/net/include/net_pubsub.h index 86541f771c..51b76268aa 100644 --- a/src/net/include/net_pubsub.h +++ b/src/net/include/net_pubsub.h @@ -77,11 +77,12 @@ class PubSubThread : public Thread { bool IsReady(int fd); int ClientPubSubChannelSize(const std::shared_ptr& conn); int ClientPubSubChannelPatternSize(const std::shared_ptr& conn); + void NotifyCloseAllConns(); private: void RemoveConn(const std::shared_ptr& conn); void CloseConn(const std::shared_ptr& conn); - + void CloseAllConns(); int ClientChannelSize(const std::shared_ptr& conn); int msg_pfd_[2]; @@ -89,6 +90,7 @@ class PubSubThread : public Thread { mutable pstd::RWMutex rwlock_; /* For external statistics */ std::map> conns_; + std::atomic close_all_conn_sig_{false}; pstd::Mutex pub_mutex_; pstd::CondVar receiver_rsignal_; diff --git a/src/net/src/net_pubsub.cc b/src/net/src/net_pubsub.cc index 6354d16790..110144ba14 100644 --- a/src/net/src/net_pubsub.cc +++ b/src/net/src/net_pubsub.cc @@ -151,14 +151,34 @@ void PubSubThread::RemoveConn(const std::shared_ptr& conn) { } void PubSubThread::CloseConn(const std::shared_ptr& conn) { - CloseFd(conn); net_multiplexer_->NetDelEvent(conn->fd(), 0); + CloseFd(conn); { std::lock_guard l(rwlock_); conns_.erase(conn->fd()); } } +void PubSubThread::CloseAllConns() { + { + std::lock_guard l(channel_mutex_); + pubsub_channel_.clear(); + } + { + std::lock_guard l(pattern_mutex_); + pubsub_pattern_.clear(); + } + { + std::lock_guard l(rwlock_); + for (auto& pair : conns_) { + net_multiplexer_->NetDelEvent(pair.second->conn->fd(), 0); + CloseFd(pair.second->conn); + } + std::map> tmp; + conns_.swap(tmp); + } +} + int PubSubThread::Publish(const std::string& channel, const std::string& msg) { // TODO(LIBA-S): change the Publish Mode to Asynchronous std::lock_guard lk(pub_mutex_); @@ -414,6 +434,12 @@ void* PubSubThread::ThreadMain() { char triger[1]; while (!should_stop()) { + + if (close_all_conn_sig_.load()) { + close_all_conn_sig_.store(false); + CloseAllConns(); + } + nfds = net_multiplexer_->NetPoll(NET_CRON_INTERVAL); for (int i = 0; i < nfds; i++) { pfe = (net_multiplexer_->FiredEvents()) + i; @@ -585,4 +611,7 @@ void PubSubThread::Cleanup() { } conns_.clear(); } +void PubSubThread::NotifyCloseAllConns() { + close_all_conn_sig_.store(true); +} }; // namespace net diff --git a/src/pika_admin.cc b/src/pika_admin.cc index e6753e4385..a6226917f8 100644 --- a/src/pika_admin.cc +++ b/src/pika_admin.cc @@ -722,8 +722,15 @@ void ClientCmd::DoInitial() { res_.SetRes(CmdRes::kErrOther, "Syntax error, try CLIENT (LIST [order by [addr|idle])"); return; } - } else if ((strcasecmp(argv_[1].data(), "kill") == 0) && argv_.size() == 3) { + } else if (argv_.size() == 3 && (strcasecmp(argv_[1].data(), "kill") == 0)) { info_ = argv_[2]; + } else if (argv_.size() == 4 && + (strcasecmp(argv_[1].data(), "kill") == 0) && + (strcasecmp(argv_[2].data(), "type") == 0) && + ((strcasecmp(argv_[3].data(), KILLTYPE_NORMAL.data()) == 0) || (strcasecmp(argv_[3].data(), KILLTYPE_PUBSUB.data()) == 0))) { + //kill all if user wanna kill a type + info_ = "type"; + kill_type_ = argv_[3]; } else { res_.SetRes(CmdRes::kErrOther, "Syntax error, try CLIENT (LIST [order by [addr|idle]| KILL ip:port)"); return; @@ -773,6 +780,16 @@ void ClientCmd::Do() { } else if ((strcasecmp(operation_.data(), "kill") == 0) && (strcasecmp(info_.data(), "all") == 0)) { g_pika_server->ClientKillAll(); res_.SetRes(CmdRes::kOk); + } else if ((strcasecmp(operation_.data(), "kill") == 0) && (strcasecmp(info_.data(), "type") == 0)) { + if (kill_type_ == KILLTYPE_NORMAL) { + g_pika_server->ClientKillAllNormal(); + res_.SetRes(CmdRes::kOk); + } else if (kill_type_ == KILLTYPE_PUBSUB) { + g_pika_server->ClientKillPubSub(); + res_.SetRes(CmdRes::kOk); + } else { + res_.SetRes(CmdRes::kErrOther, "kill type is unknown"); + } } else if (g_pika_server->ClientKill(info_) == 1) { res_.SetRes(CmdRes::kOk); } else { @@ -827,6 +844,10 @@ const std::string InfoCmd::kDebugSection = "debug"; const std::string InfoCmd::kCommandStatsSection = "commandstats"; const std::string InfoCmd::kCacheSection = "cache"; + +const std::string ClientCmd::KILLTYPE_NORMAL = "normal"; +const std::string ClientCmd::KILLTYPE_PUBSUB = "pubsub"; + void InfoCmd::Execute() { std::shared_ptr db = g_pika_server->GetDB(db_name_); Do(); @@ -1238,6 +1259,7 @@ void InfoCmd::InfoReplication(std::string& info) { Status s; uint32_t filenum = 0; uint64_t offset = 0; + uint64_t slave_repl_offset = 0; std::string safety_purge; std::shared_ptr master_db = nullptr; for (const auto& t_item : g_pika_server->dbs_) { @@ -1249,11 +1271,13 @@ void InfoCmd::InfoReplication(std::string& info) { continue; } master_db->Logger()->GetProducerStatus(&filenum, &offset); + slave_repl_offset += static_cast(filenum) * static_cast(g_pika_conf->binlog_file_size()); + slave_repl_offset += offset; tmp_stream << db_name << ":binlog_offset=" << filenum << " " << offset; s = master_db->GetSafetyPurgeBinlog(&safety_purge); tmp_stream << ",safety_purge=" << (s.ok() ? safety_purge : "error") << "\r\n"; } - + tmp_stream << "slave_repl_offset:" << slave_repl_offset << "\r\n"; info.append(tmp_stream.str()); } diff --git a/src/pika_dispatch_thread.cc b/src/pika_dispatch_thread.cc index ae94deaf7e..bc892e23e4 100644 --- a/src/pika_dispatch_thread.cc +++ b/src/pika_dispatch_thread.cc @@ -59,6 +59,10 @@ void PikaDispatchThread::UnAuthUserAndKillClient(const std::set& us } } +void PikaDispatchThread::StopThread() { + thread_rep_->StopThread(); +} + bool PikaDispatchThread::Handles::AccessHandle(std::string& ip) const { if (ip == "127.0.0.1") { ip = g_pika_server->host(); diff --git a/src/pika_repl_client.cc b/src/pika_repl_client.cc index 77e3a60f78..117b5adb8c 100644 --- a/src/pika_repl_client.cc +++ b/src/pika_repl_client.cc @@ -77,6 +77,23 @@ int PikaReplClient::Stop() { for (auto & binlog_worker : write_binlog_workers_) { binlog_worker->StopThread(); } + + // write DB task is async task, we must wait all writeDB task done and then to exit + // or some data will be loss + bool all_write_db_task_done = true; + do { + for (auto &db_worker: write_db_workers_) { + if (db_worker->TaskQueueSize() != 0) { + all_write_db_task_done = false; + std::this_thread::sleep_for(std::chrono::microseconds(300)); + break; + } else { + all_write_db_task_done = true; + } + } + //if there are unfinished async write db task, just continue to wait + } while (!all_write_db_task_done); + for (auto &db_worker: write_db_workers_) { db_worker->StopThread(); } diff --git a/src/pika_server.cc b/src/pika_server.cc index 9ef2681833..7b8995ea24 100644 --- a/src/pika_server.cc +++ b/src/pika_server.cc @@ -104,12 +104,18 @@ PikaServer::PikaServer() acl_ = std::make_unique<::Acl>(); SetSlowCmdThreadPoolFlag(g_pika_conf->slow_cmd_pool()); + bgsave_thread_.set_thread_name("PikaServer::bgsave_thread_"); + purge_thread_.set_thread_name("PikaServer::purge_thread_"); + bgslots_cleanup_thread_.set_thread_name("PikaServer::bgslots_cleanup_thread_"); + common_bg_thread_.set_thread_name("PikaServer::common_bg_thread_"); + key_scan_thread_.set_thread_name("PikaServer::key_scan_thread_"); } PikaServer::~PikaServer() { rsync_server_->Stop(); - // DispatchThread will use queue of worker thread, - // so we need to delete dispatch before worker. + // DispatchThread will use queue of worker thread + // so we need to Stop dispatch before worker. + pika_dispatch_thread_->StopThread(); pika_client_processor_->Stop(); pika_slow_cmd_thread_pool_->stop_thread_pool(); pika_admin_cmd_thread_pool_->stop_thread_pool(); @@ -779,13 +785,11 @@ size_t PikaServer::SlowCmdThreadPoolMaxQueueSize() { } void PikaServer::BGSaveTaskSchedule(net::TaskFunc func, void* arg) { - bgsave_thread_.set_thread_name("BGSaveTask"); bgsave_thread_.StartThread(); bgsave_thread_.Schedule(func, arg); } void PikaServer::PurgelogsTaskSchedule(net::TaskFunc func, void* arg) { - purge_thread_.set_thread_name("PurgelogsTask"); purge_thread_.StartThread(); purge_thread_.Schedule(func, arg); } @@ -797,7 +801,6 @@ void PikaServer::PurgeDir(const std::string& path) { void PikaServer::PurgeDirTaskSchedule(void (*function)(void*), void* arg) { - purge_thread_.set_thread_name("PurgeDirTask"); purge_thread_.StartThread(); purge_thread_.Schedule(function, arg); } @@ -849,12 +852,21 @@ void PikaServer::TryDBSync(const std::string& ip, int port, const std::string& d } void PikaServer::KeyScanTaskSchedule(net::TaskFunc func, void* arg) { - key_scan_thread_.set_thread_name("KeyScanTask"); key_scan_thread_.StartThread(); key_scan_thread_.Schedule(func, arg); } -void PikaServer::ClientKillAll() { pika_dispatch_thread_->ClientKillAll(); } +void PikaServer::ClientKillAll() { + pika_dispatch_thread_->ClientKillAll(); + pika_pubsub_thread_->NotifyCloseAllConns(); +} + +void PikaServer::ClientKillPubSub() { pika_pubsub_thread_->NotifyCloseAllConns(); +} + +void PikaServer::ClientKillAllNormal() { + pika_dispatch_thread_->ClientKillAll(); +} int PikaServer::ClientKill(const std::string& ip_port) { if (pika_dispatch_thread_->ClientKill(ip_port)) { @@ -1575,7 +1587,6 @@ void PikaServer::Bgslotsreload(const std::shared_ptr& db) { LOG(INFO) << "Start slot reloading"; // Start new thread if needed - bgsave_thread_.set_thread_name("SlotsReload"); bgsave_thread_.StartThread(); bgsave_thread_.Schedule(&DoBgslotsreload, static_cast(this)); } @@ -1643,7 +1654,6 @@ void PikaServer::Bgslotscleanup(std::vector cleanupSlots, const std::shared LOG(INFO) << "Start slot cleanup, slots: " << slotsStr << std::endl; // Start new thread if needed - bgslots_cleanup_thread_.set_thread_name("SlotsCleanup"); bgslots_cleanup_thread_.StartThread(); bgslots_cleanup_thread_.Schedule(&DoBgslotscleanup, static_cast(this)); } @@ -1748,7 +1758,6 @@ void DoBgslotscleanup(void* arg) { void PikaServer::ResetCacheAsync(uint32_t cache_num, std::shared_ptr db, cache::CacheConfig *cache_cfg) { if (PIKA_CACHE_STATUS_OK == db->cache()->CacheStatus() || PIKA_CACHE_STATUS_NONE == db->cache()->CacheStatus()) { - common_bg_thread_.set_thread_name("ResetCacheTask"); common_bg_thread_.StartThread(); BGCacheTaskArg *arg = new BGCacheTaskArg(); arg->db = db; @@ -1772,7 +1781,6 @@ void PikaServer::ClearCacheDbAsync(std::shared_ptr db) { LOG(WARNING) << "can not clear cache in status: " << db->cache()->CacheStatus(); return; } - common_bg_thread_.set_thread_name("CacheClearThread"); common_bg_thread_.StartThread(); BGCacheTaskArg *arg = new BGCacheTaskArg(); arg->db = db; @@ -1840,7 +1848,6 @@ void PikaServer::ClearCacheDbAsyncV2(std::shared_ptr db) { LOG(WARNING) << "can not clear cache in status: " << db->cache()->CacheStatus(); return; } - common_bg_thread_.set_thread_name("V2CacheClearThread"); common_bg_thread_.StartThread(); BGCacheTaskArg *arg = new BGCacheTaskArg(); arg->db = db;