diff --git a/redis/vibe/db/redis/redis.d b/redis/vibe/db/redis/redis.d index e9f65da519..7adf8e8ab1 100644 --- a/redis/vibe/db/redis/redis.d +++ b/redis/vibe/db/redis/redis.d @@ -651,6 +651,7 @@ final class RedisSubscriberImpl { Task m_listener; Task m_listenerHelper; Task m_waiter; + Task m_stopWaiter; InterruptibleRecursiveTaskMutex m_mutex; InterruptibleTaskMutex m_connMutex; } @@ -684,9 +685,37 @@ final class RedisSubscriberImpl { m_connMutex = new InterruptibleTaskMutex; } + // FIXME: instead of waiting here, the class must be reference counted + // and destructions needs to be defered until everything is stopped ~this() { logTrace("~this"); - bstop(); + waitForStop(); + } + + // Task will block until the listener is finished + private void waitForStop() + { + logTrace("waitForStop"); + if (!m_listening) return; + + void impl() @safe { + m_mutex.performLocked!({ + m_stopWaiter = Task.getThis(); + }); + if (!m_listening) return; // verify again in case the mutex was locked by bstop + scope(exit) { + m_mutex.performLocked!({ + m_stopWaiter = Task(); + }); + } + bool stopped; + do { + () @trusted { receive((Action act) { if (act == Action.STOP) stopped = true; }); } (); + } while (!stopped); + + enforce(stopped, "Failed to wait for Redis listener to stop"); + } + inTask(&impl); } /// Stop listening and yield until the operation is complete. @@ -1112,6 +1141,8 @@ final class RedisSubscriberImpl { if (m_waiter != Task()) () @trusted { m_waiter.send(Action.STOP); } (); + if (m_stopWaiter != Task()) + () @trusted { m_stopWaiter.send(Action.STOP); } (); m_listenerHelper = Task(); m_listener = Task(); diff --git a/tests/redis/source/app.d b/tests/redis/source/app.d index c8584ce34b..869e8ea0a2 100644 --- a/tests/redis/source/app.d +++ b/tests/redis/source/app.d @@ -13,7 +13,7 @@ void runTest() { //setLogLevel(LogLevel.trace); /* open a redis server locally to run these tests - * Windows download link: https://github.com/MSOpenTech/redis/tree/2.8/bin/release + * Windows download link: https://github.com/MSOpenTech/redis/releases * Linux: use "yum install redis" on RHEL or "apt-get install redis" on Debian-like */ RedisClient redis; @@ -96,6 +96,7 @@ void runTest() { RedisSubscriber scoped = redis.createSubscriber(); sub = scoped; + sleep(50.msecs); } import std.datetime;