diff --git a/source/vibe/core/core.d b/source/vibe/core/core.d index f4074db5a1..96d1c42302 100644 --- a/source/vibe/core/core.d +++ b/source/vibe/core/core.d @@ -1236,7 +1236,7 @@ private class VibeDriverCore : DriverCore { nothrow @safe { assert(ctask.thread is () @trusted { return Thread.getThis(); } (), "Resuming task in foreign thread."); assert(() @trusted nothrow { return ctask.state; } () == Fiber.State.HOLD, "Resuming fiber that is not on HOLD"); - assert(ctask.m_queue is null, "Manually resuming task that is already scheduled to resumed."); + assert(ctask.m_queue is null, "Manually resuming task that is already scheduled to be resumed."); if( event_exception ){ extrap(); @@ -1287,6 +1287,14 @@ private class VibeDriverCore : DriverCore { } else m_ignoreIdleForGC = false; } + bool isScheduledForResume(Task t) + { + if (t == Task.init) return false; + if (!t.running) return false; + auto cf = cast(CoreTask)t.fiber; + return cf.m_queue !is null; + } + private void resumeYieldedTasks() { for (auto limit = s_yieldedTasks.length; limit > 0 && !s_yieldedTasks.empty; limit--) { diff --git a/source/vibe/core/driver.d b/source/vibe/core/driver.d index 948001ae42..d415a04b53 100644 --- a/source/vibe/core/driver.d +++ b/source/vibe/core/driver.d @@ -223,6 +223,8 @@ interface DriverCore { fully processed. */ void notifyIdle(); + + bool isScheduledForResume(Task t); } diff --git a/source/vibe/core/drivers/libevent2_tcp.d b/source/vibe/core/drivers/libevent2_tcp.d index 58c835094f..388b5f0f26 100644 --- a/source/vibe/core/drivers/libevent2_tcp.d +++ b/source/vibe/core/drivers/libevent2_tcp.d @@ -423,6 +423,7 @@ package final class Libevent2TCPConnection : TCPConnection { cleanup(); throw new Exception(format("Connection error while %s TCPConnection.", write ? "writing to" : "reading from")); } + if (m_ctx.state == ConnectionState.activeClose) throw new Exception("Connection was actively closed."); enforce (!write || m_ctx.state == ConnectionState.open, "Remote hung up while writing to TCPConnection."); if (!write && m_ctx.state == ConnectionState.passiveClose) { auto buf = bufferevent_get_input(m_ctx.event); @@ -663,7 +664,7 @@ package nothrow extern(C) auto f = ctx.readOwner; try { - if (f && f.running && ctx.state != ConnectionState.activeClose) + if (f && f.running && !ctx.core.isScheduledForResume(f)) ctx.core.resumeTask(f); } catch (UncaughtException e) { logWarn("Got exception when resuming task onSocketRead: %s", e.msg); @@ -677,7 +678,7 @@ package nothrow extern(C) assert(ctx.magic__ == TCPContext.MAGIC); assert(ctx.event is buf_event, "Write event on bufferevent that does not match the TCPContext"); logTrace("socket %d write event (%s)!", ctx.socketfd, ctx.shutdown); - if (ctx.writeOwner != Task.init && ctx.writeOwner.running) { + if (ctx.writeOwner != Task.init && ctx.writeOwner.running && !ctx.core.isScheduledForResume(ctx.writeOwner)) { bufferevent_flush(buf_event, EV_WRITE, bufferevent_flush_mode.BEV_FLUSH); ctx.core.resumeTask(ctx.writeOwner); } @@ -700,7 +701,7 @@ package nothrow extern(C) string errorMessage; if (status & BEV_EVENT_EOF) { - logDebug("Connection was closed (fd %d).", ctx.socketfd); + logDebug("Connection was closed by remote peer (fd %d).", ctx.socketfd); if (ctx.state != ConnectionState.activeClose) ctx.state = ConnectionState.passiveClose; evbuffer* buf = bufferevent_get_input(buf_event); @@ -723,7 +724,7 @@ package nothrow extern(C) ctx.core.eventException = ex; - if (ctx.readOwner && ctx.readOwner.running && ctx.state != ConnectionState.activeClose) { + if (ctx.readOwner && ctx.readOwner.running && !ctx.core.isScheduledForResume(ctx.readOwner)) { logTrace("resuming corresponding task%s...", ex is null ? "" : " with exception"); if (ctx.readOwner.fiber.state == Fiber.State.EXEC) ctx.exception = ex; else ctx.core.resumeTask(ctx.readOwner, ex);