Skip to content

Commit

Permalink
Merge pull request #1837 from rejectedsoftware/remove_libasync_destro…
Browse files Browse the repository at this point in the history
…y_threads

Remove explicit thread destruction code in the libasync driver.
merged-on-behalf-of: Sönke Ludwig <[email protected]>
Conflicts:
	source/vibe/core/drivers/libasync.d
  • Loading branch information
dlang-bot authored and s-ludwig committed Sep 2, 2017
1 parent ffe1ce0 commit c762300
Showing 1 changed file with 39 additions and 45 deletions.
84 changes: 39 additions & 45 deletions source/vibe/core/drivers/libasync.d
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ version (Windows) import core.sys.windows.winsock2;
private __gshared EventLoop gs_evLoop;
private EventLoop s_evLoop;
private DriverCore s_driverCore;
private shared int s_refCount; // will destroy async threads when 0

version(Windows) extern(C) {
FILE* _wfopen(const(wchar)* filename, in wchar* mode);
Expand Down Expand Up @@ -87,9 +86,8 @@ final class LibasyncDriver : EventDriver {
this(DriverCore core) nothrow
{
assert(!isControlThread, "Libasync driver created in control thread");
try {
try {
import core.atomic : atomicOp;
s_refCount.atomicOp!"+="(1);
if (!gs_mutex) {
import core.sync.mutex;
gs_mutex = new core.sync.mutex.Mutex;
Expand All @@ -113,7 +111,7 @@ final class LibasyncDriver : EventDriver {
s_evLoop = getThreadEventLoop();
if (!gs_evLoop)
gs_evLoop = s_evLoop;

m_exitSignal = new shared AsyncSignal(getEventLoop());
m_exitSignal.run({
m_break = true;
Expand All @@ -131,10 +129,6 @@ final class LibasyncDriver : EventDriver {
logTrace("Deleting event driver");
m_break = true;
getEventLoop().exit();
if (s_refCount.atomicOp!"-="(1) == 0) {
import libasync.threads : destroyAsyncThreads;
destroyAsyncThreads();
}
}

int runEventLoop()
Expand Down Expand Up @@ -195,7 +189,7 @@ final class LibasyncDriver : EventDriver {
is_ipv6 = isIPv6.yes;
else
is_ipv6 = isIPv6.no;

import std.regex : regex, Captures, Regex, matchFirst, ctRegex;
import std.traits : ReturnType;

Expand All @@ -218,7 +212,7 @@ final class LibasyncDriver : EventDriver {
{
use_dns = true;
}

NetworkAddress ret;

if (use_dns) {
Expand All @@ -234,12 +228,12 @@ final class LibasyncDriver : EventDriver {
getDriverCore().resumeTask(waiter);
}
}

DNSCallback* cb = FreeListObjectAlloc!DNSCallback.alloc();
cb.waiter = Task.getThis();
cb.address = &ret;
cb.finished = &done;

// todo: remove the shared attribute to avoid GC?
shared AsyncDNS dns = new shared AsyncDNS(getEventLoop());
scope(exit) dns.destroy();
Expand All @@ -254,10 +248,10 @@ final class LibasyncDriver : EventDriver {
assert(ret.family != 0);
logTrace("Async resolved address %s", ret.toString());
FreeListObjectAlloc!DNSCallback.free(cb);

if (ret.family == 0)
ret.family = family;

return ret;
}
else {
Expand Down Expand Up @@ -389,15 +383,15 @@ final class LibasyncDriver : EventDriver {
auto now = Clock.currTime(UTC());
// event loop timer will need to be rescheduled because we'll process everything until now
m_nextSched = SysTime.max;

m_timers.consumeTimeouts(now, (timer, periodic, ref data) {
Task owner = data.owner;
auto callback = data.callback;

logTrace("Timer %s fired (%s/%s)", timer, owner != Task.init, callback !is null);

if (!periodic) releaseTimer(timer);

if (owner && owner.running && owner != Task.getThis()) {
if (Task.getThis == Task.init) getDriverCore().resumeTask(owner);
else getDriverCore().yieldAndResumeTask(owner);
Expand All @@ -411,11 +405,11 @@ final class LibasyncDriver : EventDriver {
private void rescheduleTimerEvent(SysTime now)
{
logTrace("Rescheduling timer event %s", Task.getThis());

// don't bother scheduling, the timers will be processed before leaving for the event loop
if (m_nextSched <= Clock.currTime(UTC()))
return;

bool first;
auto next = m_timers.getFirstTimeout();
Duration dur;
Expand Down Expand Up @@ -466,7 +460,7 @@ final class LibasyncFileStream : FileStream {
Task m_task;
Exception m_ex;
shared AsyncFile m_impl;

bool m_started;
bool m_truncated;
bool m_finished;
Expand All @@ -493,21 +487,21 @@ final class LibasyncFileStream : FileStream {
fclose(f);
m_truncated = true;
}
}
}
m_path = path;
m_mode = mode;

m_impl = new shared AsyncFile(getEventLoop());
m_impl.onReady(&handler);

m_started = true;
}

~this()
{
try close(); catch {}
}

@property Path path() const { return m_path; }
@property bool isOpen() const { return m_started; }
@property ulong size() const { return m_size; }
Expand All @@ -532,7 +526,7 @@ final class LibasyncFileStream : FileStream {
getDriverCore().yieldAndResumeTask(m_task, new Exception("The file was closed during an operation"));
else if (m_task != Task() && Task.getThis() == Task())
getDriverCore().resumeTask(m_task, new Exception("The file was closed during an operation"));

}

@property bool empty() const { assert(this.readable); return m_offset >= m_size; }
Expand All @@ -559,16 +553,16 @@ final class LibasyncFileStream : FileStream {
m_finished = false;
enforce(dst.length <= leastSize);
enforce(m_impl.read(m_path.toNativeString(), bytes, m_offset, true, truncate_if_exists), "Failed to read data from disk: " ~ m_impl.error);

if (!m_finished) {
acquire();
scope(exit) release();
getDriverCore().yieldForEvent();
}
m_finished = false;

if (m_ex) throw m_ex;

m_offset += dst.length;
assert(m_impl.offset == m_offset, "Incoherent offset returned from file reader: " ~ m_offset.to!string ~ "B assumed but the implementation is at: " ~ m_impl.offset.to!string ~ "B");
}
Expand All @@ -577,31 +571,31 @@ final class LibasyncFileStream : FileStream {
void write(in ubyte[] bytes_)
{
assert(this.writable, "To write to a file, it must be opened in a write-enabled mode.");

shared const(ubyte)[] bytes = cast(shared const(ubyte)[]) bytes_;

bool truncate_if_exists;
if (!m_truncated && m_mode == FileMode.createTrunc) {
truncate_if_exists = true;
m_truncated = true;
m_size = 0;
}
m_finished = false;

if (m_mode == FileMode.append)
enforce(m_impl.append(m_path.toNativeString(), cast(shared ubyte[]) bytes, true, truncate_if_exists), "Failed to write data to disk: " ~ m_impl.error);
else
enforce(m_impl.write(m_path.toNativeString(), bytes, m_offset, true, truncate_if_exists), "Failed to write data to disk: " ~ m_impl.error);

if (!m_finished) {
acquire();
scope(exit) release();
getDriverCore().yieldForEvent();
}
m_finished = false;

if (m_ex) throw m_ex;

if (m_mode == FileMode.append) {
m_size += bytes.length;
}
Expand All @@ -622,7 +616,7 @@ final class LibasyncFileStream : FileStream {
void flush()
{
assert(this.writable, "To write to a file, it must be opened in a write-enabled mode.");

}

void finalize()
Expand All @@ -642,11 +636,11 @@ final class LibasyncFileStream : FileStream {
assert(Task.getThis() == Task() || m_task == Task(), "Acquiring FileStream that is already owned.");
m_task = Task.getThis();
}

private void handler() {
// This may be called by the event loop if read/write > 64kb and another thread was delegated
Exception ex;

if (m_impl.status.code != Status.OK)
ex = new Exception(m_impl.error);
m_finished = true;
Expand Down Expand Up @@ -1201,7 +1195,7 @@ final class LibasyncTCPConnection : TCPConnection/*, Buffered*/ {
m_settings.reader.acquire();
auto _driver = getEventDriver();
auto tm = _driver.createTimer(null);
scope(exit) {
scope(exit) {
_driver.stopTimer(tm);
_driver.releaseTimer(tm);
m_settings.reader.release();
Expand Down Expand Up @@ -1337,16 +1331,16 @@ final class LibasyncTCPConnection : TCPConnection/*, Buffered*/ {
return false; // cancel slices and revert to the fixed ring buffer
}

if (m_slice.length > 0) {
//logDebug("post-assign m_slice ");
if (m_slice.length > 0) {
//logDebug("post-assign m_slice ");
m_slice = m_slice.ptr[0 .. m_slice.length + ret];
}
else {
//logDebug("using m_buffer");
m_slice = m_buffer[0 .. ret];
}
return true;
}
}
logTrace("TryReadBuf exit with %d bytes in m_slice, %d bytes in m_readBuffer ", m_slice.length, m_readBuffer.length);

return false;
Expand All @@ -1363,7 +1357,7 @@ final class LibasyncTCPConnection : TCPConnection/*, Buffered*/ {
assert(!m_slice);

logTrace("OnRead with %s", m_readBuffer.freeSpace);

while( m_readBuffer.freeSpace > 0 ) {
ubyte[] dst = m_readBuffer.peekDst();
assert(dst.length <= int.max);
Expand All @@ -1374,19 +1368,19 @@ final class LibasyncTCPConnection : TCPConnection/*, Buffered*/ {
if( ret > 0 ){
logTrace("received bytes: %s", ret);
m_readBuffer.putN(ret);
}
}
read_more = ret == dst.length;
// ret == 0! let's look for some errors
if (read_more) {
if (m_readBuffer.freeSpace == 0)
if (m_readBuffer.freeSpace == 0)
m_readBuffer.capacity = m_readBuffer.capacity*2;
dst = m_readBuffer.peekDst();
}
} while( read_more );
if (conn.status.code == Status.ASYNC) {
m_mustRecv = false; // we'll have to wait
break; // the kernel's buffer is empty
}
}
// ret == 0! let's look for some errors
else if (conn.status.code == Status.ASYNC) {
m_mustRecv = false; // we'll have to wait
Expand Down

0 comments on commit c762300

Please sign in to comment.