Skip to content

Commit

Permalink
1 handle lock when close all pubsub conns
Browse files Browse the repository at this point in the history
2 rename some funs and varibles
  • Loading branch information
cheniujh committed Aug 11, 2024
1 parent a2d7949 commit b45a402
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 16 deletions.
2 changes: 1 addition & 1 deletion src/net/include/net_pubsub.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ class PubSubThread : public Thread {
bool IsReady(int fd);
int ClientPubSubChannelSize(const std::shared_ptr<NetConn>& conn);
int ClientPubSubChannelPatternSize(const std::shared_ptr<NetConn>& conn);
void NotifyToCloseAllConns();
void NotifyCloseAllConns();

private:
void RemoveConn(const std::shared_ptr<NetConn>& conn);
Expand Down
30 changes: 18 additions & 12 deletions src/net/src/net_pubsub.cc
Original file line number Diff line number Diff line change
Expand Up @@ -151,26 +151,32 @@ void PubSubThread::RemoveConn(const std::shared_ptr<NetConn>& conn) {
}

void PubSubThread::CloseConn(const std::shared_ptr<NetConn>& conn) {
CloseFd(conn);
net_multiplexer_->NetDelEvent(conn->fd(), 0);
CloseFd(conn);
{
std::lock_guard l(rwlock_);
conns_.erase(conn->fd());
}
}

void PubSubThread::CloseAllConns() {
std::lock_guard l(rwlock_);

pubsub_channel_.clear();
pubsub_pattern_.clear();

for (auto& pair : conns_) {
net_multiplexer_->NetDelEvent(pair.second->conn->fd(), 0);
CloseFd(pair.second->conn);
{
std::lock_guard l(channel_mutex_);
pubsub_channel_.clear();
}
{
std::lock_guard l(pattern_mutex_);
pubsub_pattern_.clear();
}
{
std::lock_guard l(rwlock_);
for (auto& pair : conns_) {
net_multiplexer_->NetDelEvent(pair.second->conn->fd(), 0);
CloseFd(pair.second->conn);
}
std::map<int, std::shared_ptr<ConnHandle>> tmp;
conns_.swap(tmp);
}
std::map<int, std::shared_ptr<ConnHandle>> empty_conns;
conns_.swap(empty_conns);
}

int PubSubThread::Publish(const std::string& channel, const std::string& msg) {
Expand Down Expand Up @@ -605,7 +611,7 @@ void PubSubThread::Cleanup() {
}
conns_.clear();
}
void PubSubThread::NotifyToCloseAllConns() {
void PubSubThread::NotifyCloseAllConns() {
close_all_conn_sig_.store(true);
}
}; // namespace net
5 changes: 2 additions & 3 deletions src/pika_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -858,11 +858,10 @@ void PikaServer::KeyScanTaskSchedule(net::TaskFunc func, void* arg) {

void PikaServer::ClientKillAll() {
pika_dispatch_thread_->ClientKillAll();
pika_pubsub_thread_->NotifyToCloseAllConns();
pika_pubsub_thread_->NotifyCloseAllConns();
}

void PikaServer::ClientKillPubSub() {
pika_pubsub_thread_->NotifyToCloseAllConns();
void PikaServer::ClientKillPubSub() { pika_pubsub_thread_->NotifyCloseAllConns();
}

void PikaServer::ClientKillAllNormal() {
Expand Down

0 comments on commit b45a402

Please sign in to comment.