diff --git a/source/vibe/core/drivers/libasync.d b/source/vibe/core/drivers/libasync.d index 435c962f05..4451a39252 100644 --- a/source/vibe/core/drivers/libasync.d +++ b/source/vibe/core/drivers/libasync.d @@ -44,7 +44,6 @@ version (Windows) import core.sys.windows.winsock2; private __gshared EventLoop gs_evLoop; private EventLoop s_evLoop; private DriverCore s_driverCore; -private shared int s_refCount; // will destroy async threads when 0 version(Windows) extern(C) { FILE* _wfopen(const(wchar)* filename, in wchar* mode); @@ -87,9 +86,8 @@ final class LibasyncDriver : EventDriver { this(DriverCore core) nothrow { assert(!isControlThread, "Libasync driver created in control thread"); - try { + try { import core.atomic : atomicOp; - s_refCount.atomicOp!"+="(1); if (!gs_mutex) { import core.sync.mutex; gs_mutex = new core.sync.mutex.Mutex; @@ -113,7 +111,7 @@ final class LibasyncDriver : EventDriver { s_evLoop = getThreadEventLoop(); if (!gs_evLoop) gs_evLoop = s_evLoop; - + m_exitSignal = new shared AsyncSignal(getEventLoop()); m_exitSignal.run({ m_break = true; @@ -131,10 +129,6 @@ final class LibasyncDriver : EventDriver { logTrace("Deleting event driver"); m_break = true; getEventLoop().exit(); - if (s_refCount.atomicOp!"-="(1) == 0) { - import libasync.threads : destroyAsyncThreads; - destroyAsyncThreads(); - } } int runEventLoop() @@ -195,7 +189,7 @@ final class LibasyncDriver : EventDriver { is_ipv6 = isIPv6.yes; else is_ipv6 = isIPv6.no; - + import std.regex : regex, Captures, Regex, matchFirst, ctRegex; import std.traits : ReturnType; @@ -218,7 +212,7 @@ final class LibasyncDriver : EventDriver { { use_dns = true; } - + NetworkAddress ret; if (use_dns) { @@ -234,12 +228,12 @@ final class LibasyncDriver : EventDriver { getDriverCore().resumeTask(waiter); } } - + DNSCallback* cb = FreeListObjectAlloc!DNSCallback.alloc(); cb.waiter = Task.getThis(); cb.address = &ret; cb.finished = &done; - + // todo: remove the shared attribute to avoid GC? shared AsyncDNS dns = new shared AsyncDNS(getEventLoop()); scope(exit) dns.destroy(); @@ -254,10 +248,10 @@ final class LibasyncDriver : EventDriver { assert(ret.family != 0); logTrace("Async resolved address %s", ret.toString()); FreeListObjectAlloc!DNSCallback.free(cb); - + if (ret.family == 0) ret.family = family; - + return ret; } else { @@ -389,7 +383,7 @@ final class LibasyncDriver : EventDriver { auto now = Clock.currTime(UTC()); // event loop timer will need to be rescheduled because we'll process everything until now m_nextSched = SysTime.max; - + m_timers.consumeTimeouts(now, (timer, periodic, ref data) { Task owner = data.owner; auto callback = data.callback; @@ -397,7 +391,7 @@ final class LibasyncDriver : EventDriver { logTrace("Timer %s fired (%s/%s)", timer, owner != Task.init, callback !is null); if (!periodic) releaseTimer(timer); - + if (owner && owner.running && owner != Task.getThis()) { if (Task.getThis == Task.init) getDriverCore().resumeTask(owner); else getDriverCore().yieldAndResumeTask(owner); @@ -411,11 +405,11 @@ final class LibasyncDriver : EventDriver { private void rescheduleTimerEvent(SysTime now) { logTrace("Rescheduling timer event %s", Task.getThis()); - + // don't bother scheduling, the timers will be processed before leaving for the event loop if (m_nextSched <= Clock.currTime(UTC())) return; - + bool first; auto next = m_timers.getFirstTimeout(); Duration dur; @@ -466,7 +460,7 @@ final class LibasyncFileStream : FileStream { Task m_task; Exception m_ex; shared AsyncFile m_impl; - + bool m_started; bool m_truncated; bool m_finished; @@ -493,21 +487,21 @@ final class LibasyncFileStream : FileStream { fclose(f); m_truncated = true; } - } + } m_path = path; m_mode = mode; - + m_impl = new shared AsyncFile(getEventLoop()); m_impl.onReady(&handler); - + m_started = true; } - + ~this() { try close(); catch {} } - + @property Path path() const { return m_path; } @property bool isOpen() const { return m_started; } @property ulong size() const { return m_size; } @@ -532,7 +526,7 @@ final class LibasyncFileStream : FileStream { getDriverCore().yieldAndResumeTask(m_task, new Exception("The file was closed during an operation")); else if (m_task != Task() && Task.getThis() == Task()) getDriverCore().resumeTask(m_task, new Exception("The file was closed during an operation")); - + } @property bool empty() const { assert(this.readable); return m_offset >= m_size; } @@ -559,16 +553,16 @@ final class LibasyncFileStream : FileStream { m_finished = false; enforce(dst.length <= leastSize); enforce(m_impl.read(m_path.toNativeString(), bytes, m_offset, true, truncate_if_exists), "Failed to read data from disk: " ~ m_impl.error); - + if (!m_finished) { acquire(); scope(exit) release(); getDriverCore().yieldForEvent(); } m_finished = false; - + if (m_ex) throw m_ex; - + m_offset += dst.length; assert(m_impl.offset == m_offset, "Incoherent offset returned from file reader: " ~ m_offset.to!string ~ "B assumed but the implementation is at: " ~ m_impl.offset.to!string ~ "B"); } @@ -577,9 +571,9 @@ final class LibasyncFileStream : FileStream { void write(in ubyte[] bytes_) { assert(this.writable, "To write to a file, it must be opened in a write-enabled mode."); - + shared const(ubyte)[] bytes = cast(shared const(ubyte)[]) bytes_; - + bool truncate_if_exists; if (!m_truncated && m_mode == FileMode.createTrunc) { truncate_if_exists = true; @@ -587,21 +581,21 @@ final class LibasyncFileStream : FileStream { m_size = 0; } m_finished = false; - + if (m_mode == FileMode.append) enforce(m_impl.append(m_path.toNativeString(), cast(shared ubyte[]) bytes, true, truncate_if_exists), "Failed to write data to disk: " ~ m_impl.error); else enforce(m_impl.write(m_path.toNativeString(), bytes, m_offset, true, truncate_if_exists), "Failed to write data to disk: " ~ m_impl.error); - + if (!m_finished) { acquire(); scope(exit) release(); getDriverCore().yieldForEvent(); } m_finished = false; - + if (m_ex) throw m_ex; - + if (m_mode == FileMode.append) { m_size += bytes.length; } @@ -622,7 +616,7 @@ final class LibasyncFileStream : FileStream { void flush() { assert(this.writable, "To write to a file, it must be opened in a write-enabled mode."); - + } void finalize() @@ -642,11 +636,11 @@ final class LibasyncFileStream : FileStream { assert(Task.getThis() == Task() || m_task == Task(), "Acquiring FileStream that is already owned."); m_task = Task.getThis(); } - + private void handler() { // This may be called by the event loop if read/write > 64kb and another thread was delegated Exception ex; - + if (m_impl.status.code != Status.OK) ex = new Exception(m_impl.error); m_finished = true; @@ -1201,7 +1195,7 @@ final class LibasyncTCPConnection : TCPConnection/*, Buffered*/ { m_settings.reader.acquire(); auto _driver = getEventDriver(); auto tm = _driver.createTimer(null); - scope(exit) { + scope(exit) { _driver.stopTimer(tm); _driver.releaseTimer(tm); m_settings.reader.release(); @@ -1337,8 +1331,8 @@ final class LibasyncTCPConnection : TCPConnection/*, Buffered*/ { return false; // cancel slices and revert to the fixed ring buffer } - if (m_slice.length > 0) { - //logDebug("post-assign m_slice "); + if (m_slice.length > 0) { + //logDebug("post-assign m_slice "); m_slice = m_slice.ptr[0 .. m_slice.length + ret]; } else { @@ -1346,7 +1340,7 @@ final class LibasyncTCPConnection : TCPConnection/*, Buffered*/ { m_slice = m_buffer[0 .. ret]; } return true; - } + } logTrace("TryReadBuf exit with %d bytes in m_slice, %d bytes in m_readBuffer ", m_slice.length, m_readBuffer.length); return false; @@ -1363,7 +1357,7 @@ final class LibasyncTCPConnection : TCPConnection/*, Buffered*/ { assert(!m_slice); logTrace("OnRead with %s", m_readBuffer.freeSpace); - + while( m_readBuffer.freeSpace > 0 ) { ubyte[] dst = m_readBuffer.peekDst(); assert(dst.length <= int.max); @@ -1374,11 +1368,11 @@ final class LibasyncTCPConnection : TCPConnection/*, Buffered*/ { if( ret > 0 ){ logTrace("received bytes: %s", ret); m_readBuffer.putN(ret); - } + } read_more = ret == dst.length; // ret == 0! let's look for some errors if (read_more) { - if (m_readBuffer.freeSpace == 0) + if (m_readBuffer.freeSpace == 0) m_readBuffer.capacity = m_readBuffer.capacity*2; dst = m_readBuffer.peekDst(); } @@ -1386,7 +1380,7 @@ final class LibasyncTCPConnection : TCPConnection/*, Buffered*/ { if (conn.status.code == Status.ASYNC) { m_mustRecv = false; // we'll have to wait break; // the kernel's buffer is empty - } + } // ret == 0! let's look for some errors else if (conn.status.code == Status.ASYNC) { m_mustRecv = false; // we'll have to wait