diff --git a/src/common/Threading/DelayExecutor.cpp b/src/common/Threading/DelayExecutor.cpp deleted file mode 100644 index f53b5300cf659f..00000000000000 --- a/src/common/Threading/DelayExecutor.cpp +++ /dev/null @@ -1,121 +0,0 @@ -#include -#include -#include "Threading.h" -#include "DelayExecutor.h" - -DelayExecutor* DelayExecutor::instance() -{ - static DelayExecutor instance; - return &instance; -} - -DelayExecutor::DelayExecutor() - : pre_svc_hook_(0), post_svc_hook_(0), activated_(false), mqueue_(1 * 1024 * 1024, 1 * 1024 * 1024), queue_(&mqueue_) -{ -} - -DelayExecutor::~DelayExecutor() -{ - if (pre_svc_hook_) - delete pre_svc_hook_; - - if (post_svc_hook_) - delete post_svc_hook_; - - deactivate(); -} - -int DelayExecutor::deactivate() -{ - if (!activated()) - return -1; - - activated(false); - queue_.queue()->deactivate(); - wait(); - - return 0; -} - -int DelayExecutor::svc() -{ - if (pre_svc_hook_) - pre_svc_hook_->call(); - - for (;;) - { - ACE_Method_Request* rq = queue_.dequeue(); - - if (!rq) - break; - - rq->call(); - delete rq; - } - - if (post_svc_hook_) - post_svc_hook_->call(); - - return 0; -} - -int DelayExecutor::start(int num_threads, ACE_Method_Request* pre_svc_hook, ACE_Method_Request* post_svc_hook) -{ - if (activated()) - return -1; - - if (num_threads < 1) - return -1; - - if (pre_svc_hook_) - delete pre_svc_hook_; - - if (post_svc_hook_) - delete post_svc_hook_; - - pre_svc_hook_ = pre_svc_hook; - post_svc_hook_ = post_svc_hook; - - queue_.queue()->activate(); - - // pussywizard: - //acore::ThreadPriority tp; - //int _priority = tp.getPriority(acore::Priority_Highest); - //if (ACE_Task_Base::activate(THR_NEW_LWP | THR_JOINABLE, num_threads, 0, _priority) == -1) - // return -1; - - if (ACE_Task_Base::activate(THR_NEW_LWP | THR_JOINABLE | THR_INHERIT_SCHED, num_threads) == -1) - return -1; - - activated(true); - - return true; -} - -int DelayExecutor::execute(ACE_Method_Request* new_req) -{ - if (new_req == NULL) - return -1; - - // pussywizard: NULL as param for enqueue - wait until the action is possible! - // new tasks are added to the queue during map update (schedule_update in MapInstanced::Update) - // the queue can be momentarily blocked by map threads constantly waiting for tasks (for (;;) { queue_.dequeue();... } in DelayExecutor::svc()) - // so just wait a moment, don't drop the task xDddd - if (queue_.enqueue(new_req, /*(ACE_Time_Value*)&ACE_Time_Value::zero*/ NULL) == -1) - { - delete new_req; - ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT("(%t) %p\n"), ACE_TEXT("DelayExecutor::execute enqueue")), -1); - } - - return 0; -} - -bool DelayExecutor::activated() -{ - return activated_; -} - -void DelayExecutor::activated(bool s) -{ - activated_ = s; -} diff --git a/src/common/Threading/DelayExecutor.h b/src/common/Threading/DelayExecutor.h deleted file mode 100644 index 46269c837a6f9d..00000000000000 --- a/src/common/Threading/DelayExecutor.h +++ /dev/null @@ -1,39 +0,0 @@ -#ifndef _M_DELAY_EXECUTOR_H -#define _M_DELAY_EXECUTOR_H - -#include -#include -#include - -class DelayExecutor : protected ACE_Task_Base -{ -public: - - DelayExecutor(); - virtual ~DelayExecutor(); - - static DelayExecutor* instance(); - - int execute(ACE_Method_Request* new_req); - - int start(int num_threads = 1, ACE_Method_Request* pre_svc_hook = NULL, ACE_Method_Request* post_svc_hook = NULL); - - int deactivate(); - - bool activated(); - - virtual int svc(); - -private: - - ACE_Method_Request* pre_svc_hook_; - ACE_Method_Request* post_svc_hook_; - bool activated_; - ACE_Message_Queue mqueue_; - ACE_Activation_Queue queue_; - - - void activated(bool s); -}; - -#endif // _M_DELAY_EXECUTOR_H diff --git a/src/server/game/Maps/Map.cpp b/src/server/game/Maps/Map.cpp index 9e62efba69d5c8..62a8689c4e6461 100644 --- a/src/server/game/Maps/Map.cpp +++ b/src/server/game/Maps/Map.cpp @@ -708,11 +708,9 @@ void Map::VisitNearbyCellsOf(WorldObject* obj, TypeContainerVisitoroutDebug(LOG_FILTER_POOLSYS, "%u", mapId); // pussywizard: for crashlogs - if (t_diff) _dynamicTree.update(t_diff); + /// update worldsessions for existing players for (m_mapRefIter = m_mapRefManager.begin(); m_mapRefIter != m_mapRefManager.end(); ++m_mapRefIter) { @@ -837,8 +835,6 @@ void Map::Update(const uint32 t_diff, const uint32 s_diff, bool /*thread*/) sScriptMgr->OnMapUpdate(this, t_diff); BuildAndSendUpdateForObjects(); // pussywizard - - sLog->outDebug(LOG_FILTER_POOLSYS, "%u", mapId); // pussywizard: for crashlogs } void Map::HandleDelayedVisibility() diff --git a/src/server/game/Maps/MapManager.cpp b/src/server/game/Maps/MapManager.cpp index 16e62e802936a0..785f2211644c74 100644 --- a/src/server/game/Maps/MapManager.cpp +++ b/src/server/game/Maps/MapManager.cpp @@ -52,9 +52,9 @@ void MapManager::Initialize() { int num_threads(sWorld->getIntConfig(CONFIG_NUMTHREADS)); - // Start mtmaps if needed. - if (num_threads > 0 && m_updater.activate(num_threads) == -1) - abort(); + // Start mtmaps if needed + if (num_threads > 0) + m_updater.activate(num_threads); } void MapManager::InitializeVisibilityDistanceInfo() diff --git a/src/server/game/Maps/MapUpdater.cpp b/src/server/game/Maps/MapUpdater.cpp index e1f01d902cd409..93ad4d5b42c538 100644 --- a/src/server/game/Maps/MapUpdater.cpp +++ b/src/server/game/Maps/MapUpdater.cpp @@ -1,88 +1,62 @@ +/* + * Copyright (C) 2016+ AzerothCore , released under GNU GPL v2 license: https://github.com/azerothcore/azerothcore-wotlk/blob/master/LICENSE-GPL2 + * Copyright (C) 2008-2020 TrinityCore + * Copyright (C) 2005-2009 MaNGOS + */ + #include "MapUpdater.h" -#include "DelayExecutor.h" #include "Map.h" -#include "DatabaseEnv.h" #include "LFGMgr.h" #include "AvgDiffTracker.h" -#include -#include - -class WDBThreadStartReq1 : public ACE_Method_Request +class UpdateRequest { public: + UpdateRequest() = default; + virtual ~UpdateRequest() = default; - WDBThreadStartReq1() - { - } - - virtual int call() - { - return 0; - } + virtual void call() = 0; }; -class WDBThreadEndReq1 : public ACE_Method_Request +class MapUpdateRequest : public UpdateRequest { public: - - WDBThreadEndReq1() + MapUpdateRequest(Map& m, MapUpdater& u, uint32 d, uint32 sd) + : m_map(m), m_updater(u), m_diff(d), s_diff(sd) { } - virtual int call() + void call() override { - return 0; + m_map.Update(m_diff, s_diff); + m_updater.update_finished(); } -}; - -class MapUpdateRequest : public ACE_Method_Request -{ private: - Map& m_map; MapUpdater& m_updater; uint32 m_diff; uint32 s_diff; - -public: - - MapUpdateRequest(Map& m, MapUpdater& u, uint32 d, uint32 sd) - : m_map(m), m_updater(u), m_diff(d), s_diff(sd) - { - } - - virtual int call() - { - m_map.Update (m_diff, s_diff); - m_updater.update_finished(); - return 0; - } }; -class LFGUpdateRequest : public ACE_Method_Request +class LFGUpdateRequest : public UpdateRequest { -private: - - MapUpdater& m_updater; - uint32 m_diff; - public: LFGUpdateRequest(MapUpdater& u, uint32 d) : m_updater(u), m_diff(d) {} - virtual int call() + void call() { uint32 startTime = getMSTime(); sLFGMgr->Update(m_diff, 1); uint32 totalTime = getMSTimeDiff(startTime, getMSTime()); lfgDiffTracker.Update(totalTime); m_updater.update_finished(); - return 0; } +private: + MapUpdater& m_updater; + uint32 m_diff; }; -MapUpdater::MapUpdater(): - m_executor(), m_mutex(), m_condition(m_mutex), pending_requests(0) +MapUpdater::MapUpdater(): pending_requests(0) { } @@ -91,80 +65,82 @@ MapUpdater::~MapUpdater() deactivate(); } -int MapUpdater::activate(size_t num_threads) +void MapUpdater::activate(size_t num_threads) { - return m_executor.start((int)num_threads, new WDBThreadStartReq1, new WDBThreadEndReq1); + for (size_t i = 0; i < num_threads; ++i) + { + _workerThreads.push_back(std::thread(&MapUpdater::WorkerThread, this)); + } } -int MapUpdater::deactivate() +void MapUpdater::deactivate() { + _cancelationToken = true; + wait(); - return m_executor.deactivate(); + _queue.Cancel(); + + for (auto& thread : _workerThreads) + { + thread.join(); + } } -int MapUpdater::wait() +void MapUpdater::wait() { - ACORE_GUARD(ACE_Thread_Mutex, m_mutex); + std::unique_lock guard(_lock); while (pending_requests > 0) - m_condition.wait(); + _condition.wait(guard); - return 0; + guard.unlock(); } -int MapUpdater::schedule_update(Map& map, uint32 diff, uint32 s_diff) +void MapUpdater::schedule_update(Map& map, uint32 diff, uint32 s_diff) { - ACORE_GUARD(ACE_Thread_Mutex, m_mutex); + std::lock_guard guard(_lock); ++pending_requests; - if (m_executor.execute(new MapUpdateRequest(map, *this, diff, s_diff)) == -1) - { - ACE_DEBUG((LM_ERROR, ACE_TEXT("(%t) \n"), ACE_TEXT("Failed to schedule Map Update"))); - - --pending_requests; - return -1; - } - - return 0; + _queue.Push(new MapUpdateRequest(map, *this, diff, s_diff)); } -int MapUpdater::schedule_lfg_update(uint32 diff) +void MapUpdater::schedule_lfg_update(uint32 diff) { - ACORE_GUARD(ACE_Thread_Mutex, m_mutex); + std::lock_guard guard(_lock); ++pending_requests; - if (m_executor.execute(new LFGUpdateRequest(*this, diff)) == -1) - { - ACE_DEBUG((LM_ERROR, ACE_TEXT("(%t) \n"), ACE_TEXT("Failed to schedule LFG Update"))); - - --pending_requests; - return -1; - } - - return 0; + _queue.Push(new LFGUpdateRequest(*this, diff)); } bool MapUpdater::activated() { - return m_executor.activated(); + return _workerThreads.size() > 0; } void MapUpdater::update_finished() { - ACORE_GUARD(ACE_Thread_Mutex, m_mutex); - - if (pending_requests == 0) - { - ACE_ERROR((LM_ERROR, ACE_TEXT("(%t)\n"), ACE_TEXT("MapUpdater::update_finished BUG, report to devs"))); - sLog->outMisc("WOOT! pending_requests == 0 before decrement!"); - m_condition.broadcast(); - return; - } + std::lock_guard lock(_lock); --pending_requests; - m_condition.broadcast(); + _condition.notify_all(); } + +void MapUpdater::WorkerThread() +{ + while (1) + { + UpdateRequest* request = nullptr; + + _queue.WaitAndPop(request); + if (_cancelationToken) + return; + + request->call(); + + delete request; + } +} \ No newline at end of file diff --git a/src/server/game/Maps/MapUpdater.h b/src/server/game/Maps/MapUpdater.h index fd18333a5a6d67..35771546cfc725 100644 --- a/src/server/game/Maps/MapUpdater.h +++ b/src/server/game/Maps/MapUpdater.h @@ -1,43 +1,40 @@ #ifndef _MAP_UPDATER_H_INCLUDED #define _MAP_UPDATER_H_INCLUDED -#include -#include - -#include "DelayExecutor.h" -#include "World.h" +#include "Define.h" +#include "PCQueue.h" +#include +#include +#include class Map; +class UpdateRequest; class MapUpdater { public: - MapUpdater(); virtual ~MapUpdater(); - friend class MapUpdateRequest; - friend class LFGUpdateRequest; - - int schedule_update(Map& map, uint32 diff, uint32 s_diff); - int schedule_lfg_update(uint32 diff); - - int wait(); - - int activate(size_t num_threads); - - int deactivate(); - + void schedule_update(Map& map, uint32 diff, uint32 s_diff); + void schedule_lfg_update(uint32 diff); + void wait(); + void activate(size_t num_threads); + void deactivate(); bool activated(); + void update_finished(); private: + void WorkerThread(); - DelayExecutor m_executor; - ACE_Thread_Mutex m_mutex; - ACE_Condition_Thread_Mutex m_condition; - size_t pending_requests; + ProducerConsumerQueue _queue; - void update_finished(); + std::vector _workerThreads; + std::atomic _cancelationToken; + + std::mutex _lock; + std::condition_variable _condition; + size_t pending_requests; }; #endif //_MAP_UPDATER_H_INCLUDED