From f54f337907552f9b2d10ec861f11e316a59e1f63 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Fri, 26 Feb 2016 21:32:04 +0100 Subject: [PATCH] Explictly avoid double-resumption of tasks during Libevent2TCPConnection.close(). The previous fix for #1376 resulted in a possible task starvation when the peer reset the connection before the outbound buffer was drained. The new approach now always resumes the waiting task exactly once, no matter how many events happen and no matter in which order. (cherry picked from commit 350130a2b05cfbf12764ffaa6a86403eacf172d9) --- source/vibe/core/core.d | 10 +++++++++- source/vibe/core/driver.d | 2 ++ source/vibe/core/drivers/libevent2_tcp.d | 9 +++++---- 3 files changed, 16 insertions(+), 5 deletions(-) diff --git a/source/vibe/core/core.d b/source/vibe/core/core.d index f4074db5a1..96d1c42302 100644 --- a/source/vibe/core/core.d +++ b/source/vibe/core/core.d @@ -1236,7 +1236,7 @@ private class VibeDriverCore : DriverCore { nothrow @safe { assert(ctask.thread is () @trusted { return Thread.getThis(); } (), "Resuming task in foreign thread."); assert(() @trusted nothrow { return ctask.state; } () == Fiber.State.HOLD, "Resuming fiber that is not on HOLD"); - assert(ctask.m_queue is null, "Manually resuming task that is already scheduled to resumed."); + assert(ctask.m_queue is null, "Manually resuming task that is already scheduled to be resumed."); if( event_exception ){ extrap(); @@ -1287,6 +1287,14 @@ private class VibeDriverCore : DriverCore { } else m_ignoreIdleForGC = false; } + bool isScheduledForResume(Task t) + { + if (t == Task.init) return false; + if (!t.running) return false; + auto cf = cast(CoreTask)t.fiber; + return cf.m_queue !is null; + } + private void resumeYieldedTasks() { for (auto limit = s_yieldedTasks.length; limit > 0 && !s_yieldedTasks.empty; limit--) { diff --git a/source/vibe/core/driver.d b/source/vibe/core/driver.d index 948001ae42..d415a04b53 100644 --- a/source/vibe/core/driver.d +++ b/source/vibe/core/driver.d @@ -223,6 +223,8 @@ interface DriverCore { fully processed. */ void notifyIdle(); + + bool isScheduledForResume(Task t); } diff --git a/source/vibe/core/drivers/libevent2_tcp.d b/source/vibe/core/drivers/libevent2_tcp.d index 58c835094f..388b5f0f26 100644 --- a/source/vibe/core/drivers/libevent2_tcp.d +++ b/source/vibe/core/drivers/libevent2_tcp.d @@ -423,6 +423,7 @@ package final class Libevent2TCPConnection : TCPConnection { cleanup(); throw new Exception(format("Connection error while %s TCPConnection.", write ? "writing to" : "reading from")); } + if (m_ctx.state == ConnectionState.activeClose) throw new Exception("Connection was actively closed."); enforce (!write || m_ctx.state == ConnectionState.open, "Remote hung up while writing to TCPConnection."); if (!write && m_ctx.state == ConnectionState.passiveClose) { auto buf = bufferevent_get_input(m_ctx.event); @@ -663,7 +664,7 @@ package nothrow extern(C) auto f = ctx.readOwner; try { - if (f && f.running && ctx.state != ConnectionState.activeClose) + if (f && f.running && !ctx.core.isScheduledForResume(f)) ctx.core.resumeTask(f); } catch (UncaughtException e) { logWarn("Got exception when resuming task onSocketRead: %s", e.msg); @@ -677,7 +678,7 @@ package nothrow extern(C) assert(ctx.magic__ == TCPContext.MAGIC); assert(ctx.event is buf_event, "Write event on bufferevent that does not match the TCPContext"); logTrace("socket %d write event (%s)!", ctx.socketfd, ctx.shutdown); - if (ctx.writeOwner != Task.init && ctx.writeOwner.running) { + if (ctx.writeOwner != Task.init && ctx.writeOwner.running && !ctx.core.isScheduledForResume(ctx.writeOwner)) { bufferevent_flush(buf_event, EV_WRITE, bufferevent_flush_mode.BEV_FLUSH); ctx.core.resumeTask(ctx.writeOwner); } @@ -700,7 +701,7 @@ package nothrow extern(C) string errorMessage; if (status & BEV_EVENT_EOF) { - logDebug("Connection was closed (fd %d).", ctx.socketfd); + logDebug("Connection was closed by remote peer (fd %d).", ctx.socketfd); if (ctx.state != ConnectionState.activeClose) ctx.state = ConnectionState.passiveClose; evbuffer* buf = bufferevent_get_input(buf_event); @@ -723,7 +724,7 @@ package nothrow extern(C) ctx.core.eventException = ex; - if (ctx.readOwner && ctx.readOwner.running && ctx.state != ConnectionState.activeClose) { + if (ctx.readOwner && ctx.readOwner.running && !ctx.core.isScheduledForResume(ctx.readOwner)) { logTrace("resuming corresponding task%s...", ex is null ? "" : " with exception"); if (ctx.readOwner.fiber.state == Fiber.State.EXEC) ctx.exception = ex; else ctx.core.resumeTask(ctx.readOwner, ex);