Skip to content

Commit

Permalink
Explictly avoid double-resumption of tasks during Libevent2TCPConnect…
Browse files Browse the repository at this point in the history
…ion.close().

The previous fix for #1376 resulted in a possible task starvation when the peer reset the connection before the outbound buffer was drained. The new approach now always resumes the waiting task exactly once, no matter how many events happen and no matter in which order.

(cherry picked from commit 350130a)
  • Loading branch information
s-ludwig committed Feb 27, 2016
1 parent 613d159 commit f54f337
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 5 deletions.
10 changes: 9 additions & 1 deletion source/vibe/core/core.d
Original file line number Diff line number Diff line change
Expand Up @@ -1236,7 +1236,7 @@ private class VibeDriverCore : DriverCore {
nothrow @safe {
assert(ctask.thread is () @trusted { return Thread.getThis(); } (), "Resuming task in foreign thread.");
assert(() @trusted nothrow { return ctask.state; } () == Fiber.State.HOLD, "Resuming fiber that is not on HOLD");
assert(ctask.m_queue is null, "Manually resuming task that is already scheduled to resumed.");
assert(ctask.m_queue is null, "Manually resuming task that is already scheduled to be resumed.");

if( event_exception ){
extrap();
Expand Down Expand Up @@ -1287,6 +1287,14 @@ private class VibeDriverCore : DriverCore {
} else m_ignoreIdleForGC = false;
}

bool isScheduledForResume(Task t)
{
if (t == Task.init) return false;
if (!t.running) return false;
auto cf = cast(CoreTask)t.fiber;
return cf.m_queue !is null;
}

private void resumeYieldedTasks()
{
for (auto limit = s_yieldedTasks.length; limit > 0 && !s_yieldedTasks.empty; limit--) {
Expand Down
2 changes: 2 additions & 0 deletions source/vibe/core/driver.d
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,8 @@ interface DriverCore {
fully processed.
*/
void notifyIdle();

bool isScheduledForResume(Task t);
}


Expand Down
9 changes: 5 additions & 4 deletions source/vibe/core/drivers/libevent2_tcp.d
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,7 @@ package final class Libevent2TCPConnection : TCPConnection {
cleanup();
throw new Exception(format("Connection error while %s TCPConnection.", write ? "writing to" : "reading from"));
}
if (m_ctx.state == ConnectionState.activeClose) throw new Exception("Connection was actively closed.");
enforce (!write || m_ctx.state == ConnectionState.open, "Remote hung up while writing to TCPConnection.");
if (!write && m_ctx.state == ConnectionState.passiveClose) {
auto buf = bufferevent_get_input(m_ctx.event);
Expand Down Expand Up @@ -663,7 +664,7 @@ package nothrow extern(C)

auto f = ctx.readOwner;
try {
if (f && f.running && ctx.state != ConnectionState.activeClose)
if (f && f.running && !ctx.core.isScheduledForResume(f))
ctx.core.resumeTask(f);
} catch (UncaughtException e) {
logWarn("Got exception when resuming task onSocketRead: %s", e.msg);
Expand All @@ -677,7 +678,7 @@ package nothrow extern(C)
assert(ctx.magic__ == TCPContext.MAGIC);
assert(ctx.event is buf_event, "Write event on bufferevent that does not match the TCPContext");
logTrace("socket %d write event (%s)!", ctx.socketfd, ctx.shutdown);
if (ctx.writeOwner != Task.init && ctx.writeOwner.running) {
if (ctx.writeOwner != Task.init && ctx.writeOwner.running && !ctx.core.isScheduledForResume(ctx.writeOwner)) {
bufferevent_flush(buf_event, EV_WRITE, bufferevent_flush_mode.BEV_FLUSH);
ctx.core.resumeTask(ctx.writeOwner);
}
Expand All @@ -700,7 +701,7 @@ package nothrow extern(C)

string errorMessage;
if (status & BEV_EVENT_EOF) {
logDebug("Connection was closed (fd %d).", ctx.socketfd);
logDebug("Connection was closed by remote peer (fd %d).", ctx.socketfd);
if (ctx.state != ConnectionState.activeClose)
ctx.state = ConnectionState.passiveClose;
evbuffer* buf = bufferevent_get_input(buf_event);
Expand All @@ -723,7 +724,7 @@ package nothrow extern(C)

ctx.core.eventException = ex;

if (ctx.readOwner && ctx.readOwner.running && ctx.state != ConnectionState.activeClose) {
if (ctx.readOwner && ctx.readOwner.running && !ctx.core.isScheduledForResume(ctx.readOwner)) {
logTrace("resuming corresponding task%s...", ex is null ? "" : " with exception");
if (ctx.readOwner.fiber.state == Fiber.State.EXEC) ctx.exception = ex;
else ctx.core.resumeTask(ctx.readOwner, ex);
Expand Down

0 comments on commit f54f337

Please sign in to comment.