diff --git a/src/client.cc b/src/client.cc index 8d37e09f2..cbe72f71f 100644 --- a/src/client.cc +++ b/src/client.cc @@ -625,10 +625,17 @@ void PClient::TransferToSlaveThreads() { auto slave_loop = tcp_connection->SelectSlaveEventLoop(); auto id = tcp_connection->GetUniqueId(); auto event_object = loop->GetEventObject(id); - loop->Unregister(event_object); - event_object->SetUniqueId(-1); - slave_loop->Register(event_object, 0); - tcp_connection->ResetEventLoop(slave_loop); + auto del_conn = [loop, slave_loop, event_object]() { + loop->Unregister(event_object); + event_object->SetUniqueId(-1); + auto tcp_connection = std::dynamic_pointer_cast(event_object); + assert(tcp_connection); + tcp_connection->ResetEventLoop(slave_loop); + + auto add_conn = [slave_loop, event_object]() { slave_loop->Register(event_object, 0); }; + slave_loop->Execute(std::move(add_conn)); + }; + loop->Execute(std::move(del_conn)); } } diff --git a/src/net/event_loop.cc b/src/net/event_loop.cc index 2160a5bce..6c8295989 100644 --- a/src/net/event_loop.cc +++ b/src/net/event_loop.cc @@ -58,15 +58,11 @@ void EventLoop::Run() { } } - { - std::unique_lock guard(object_mutex_); - for (auto& pair : objects_) { - reactor_->Unregister(pair.second.get()); - } - - objects_.clear(); + for (auto& pair : objects_) { + reactor_->Unregister(pair.second.get()); } + objects_.clear(); reactor_.reset(); } @@ -106,7 +102,7 @@ EventLoop* EventLoop::Self() { return g_this_loop; } bool EventLoop::Register(std::shared_ptr obj, int events) { if (!obj) return false; - // assert(InThisLoop()); + assert(InThisLoop()); assert(obj->GetUniqueId() == -1); if (!reactor_) { @@ -115,7 +111,6 @@ bool EventLoop::Register(std::shared_ptr obj, int events) { { // alloc unique id - std::unique_lock guard(object_mutex_); int id = -1; do { id = obj_id_generator_.fetch_add(1) + 1; @@ -139,14 +134,9 @@ bool EventLoop::Modify(std::shared_ptr obj, int events) { assert(InThisLoop()); assert(obj->GetUniqueId() >= 0); - - { #ifdef DEBUG - std::unique_lock guard(object_mutex_); - assert(objects_.count(obj->GetUniqueId()) == 1); + assert(objects_.count(obj->GetUniqueId()) == 1); #endif - } - if (!reactor_) { return false; } @@ -157,18 +147,15 @@ void EventLoop::Unregister(std::shared_ptr obj) { if (!obj) return; int id = obj->GetUniqueId(); - // assert(InThisLoop()); + assert(InThisLoop()); assert(id >= 0); - { - std::unique_lock guard(object_mutex_); - assert(objects_.count(id) == 1); + assert(objects_.count(id) == 1); - if (!reactor_) { - return; - } - reactor_->Unregister(obj.get()); - objects_.erase(id); + if (!reactor_) { + return; } + reactor_->Unregister(obj.get()); + objects_.erase(id); } bool EventLoop::Listen(const char* ip, int port, NewTcpConnectionCallback ccb, EventLoopSelector selector) { @@ -218,13 +205,10 @@ std::shared_ptr EventLoop::ConnectHTTP(const char* ip, int port) { } void EventLoop::Reset() { - { - std::unique_lock guard(object_mutex_); - for (auto& kv : objects_) { - Unregister(kv.second); - } - objects_.clear(); + for (auto& kv : objects_) { + Unregister(kv.second); } + objects_.clear(); { std::unique_lock guard(task_mutex_); @@ -236,12 +220,9 @@ void EventLoop::Reset() { } std::shared_ptr EventLoop::GetEventObject(int id) const { - { - std::unique_lock guard(object_mutex_); - auto it = objects_.find(id); - if (it != objects_.end()) { - return it->second; - } + auto it = objects_.find(id); + if (it != objects_.end()) { + return it->second; } return nullptr; diff --git a/src/net/event_loop.h b/src/net/event_loop.h index 5b10acdb3..da6c683d0 100644 --- a/src/net/event_loop.h +++ b/src/net/event_loop.h @@ -91,7 +91,6 @@ class EventLoop { private: std::unique_ptr reactor_; - mutable std::mutex object_mutex_; std::unordered_map> objects_; std::shared_ptr notifier_;