Skip to content

Commit

Permalink
Merge pull request #1204 from etcimon/libasync-timers
Browse files Browse the repository at this point in the history
Fix behavior of a 0 second timer, avoid processing them in a task
  • Loading branch information
s-ludwig committed Sep 21, 2015
2 parents 7f4326f + 4d9872a commit 43f1b7e
Showing 1 changed file with 26 additions and 26 deletions.
52 changes: 26 additions & 26 deletions source/vibe/core/drivers/libasync.d
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ final class LibasyncDriver : EventDriver {
int runEventLoop()
{
while(!m_break && getEventLoop().loop()){
processTimers();
getDriverCore().notifyIdle();
}
m_break = false;
Expand All @@ -142,15 +143,16 @@ final class LibasyncDriver : EventDriver {
int runEventLoopOnce()
{
getEventLoop().loop(0.seconds);
processTimers();
getDriverCore().notifyIdle();
logTrace("runEventLoopOnce exit");
return 0;
}

bool processEvents()
{
processTimers();
getEventLoop().loop(0.seconds);
processTimers();
if (m_break) {
m_break = false;
return false;
Expand Down Expand Up @@ -385,7 +387,7 @@ final class LibasyncDriver : EventDriver {

if (!periodic) releaseTimer(timer);

if (owner && owner.running) {
if (owner && owner.running && owner != Task.getThis()) {
if (Task.getThis == Task.init) getDriverCore().resumeTask(owner);
else getDriverCore().yieldAndResumeTask(owner);
}
Expand All @@ -397,26 +399,20 @@ final class LibasyncDriver : EventDriver {

private void rescheduleTimerEvent(SysTime now)
{
// logTrace("Rescheduling timer event %s", Task.getThis());

logTrace("Rescheduling timer event %s", Task.getThis());

// don't bother scheduling, the timers will be processed before leaving for the event loop
if (m_nextSched <= Clock.currTime())
return;

bool first;
auto next = m_timers.getFirstTimeout();
Duration dur;
if (next == SysTime.max) return;
dur = next - now;
if (dur == Duration.zero || dur.isNegative) {
processTimers();
next = m_timers.getFirstTimeout();
dur = next - now;
}
if (m_nextSched == next) {
logDebug("No upcoming timeouts beyond in: %s ms", (next-now).total!"msecs".to!string);
return;
}
else
m_nextSched = next;

assert(dur.total!"seconds"() <= int.max);
m_nextSched = next;
if (dur.total!"seconds"() >= int.max)
return; // will never trigger, don't bother
if (!m_timerEvent) {
//logTrace("creating new async timer");
m_timerEvent = new AsyncTimer(getEventLoop());
Expand All @@ -428,7 +424,7 @@ final class LibasyncDriver : EventDriver {
bool success = m_timerEvent.rearm(dur);
assert(success, "Failed to rearm timer");
}
logTrace("Rescheduled timer event for %s seconds in thread '%s' :: task '%s'", dur.total!"usecs" * 1e-6, Thread.getThis().name, Task.getThis());
//logTrace("Rescheduled timer event for %s seconds in thread '%s' :: task '%s'", dur.total!"usecs" * 1e-6, Thread.getThis().name, Task.getThis());
}

private void onTimerTimeout()
Expand Down Expand Up @@ -513,6 +509,8 @@ final class LibasyncFileStream : FileStream {
m_impl = null;
}
m_started = false;
if (m_task != Task() && Task.getThis() == Task())
getDriverCore().yieldAndResumeTask(m_task, new Exception("The file was closed during an operation"));
}

@property bool empty() const { assert(this.readable); return m_offset >= m_size; }
Expand All @@ -534,6 +532,7 @@ final class LibasyncFileStream : FileStream {
m_truncated = true;
m_size = 0;
}
m_finished = false;
enforce(dst.length <= leastSize);
enforce(m_impl.read(m_path.toNativeString(), bytes, m_offset, true, truncate_if_exists), "Failed to read data from disk: " ~ m_impl.error);

Expand All @@ -560,7 +559,6 @@ final class LibasyncFileStream : FileStream {
m_size = 0;
}
m_finished = false;

if (m_mode == FileMode.append)
enforce(m_impl.append(m_path.toNativeString(), cast(shared ubyte[]) bytes, true, truncate_if_exists), "Failed to write data to disk: " ~ m_impl.error);
else
Expand All @@ -574,16 +572,14 @@ final class LibasyncFileStream : FileStream {

if (m_mode == FileMode.append) {
m_size += bytes.length;
import std.file : getSize;
}
else {
m_offset += bytes.length;
if (m_offset >= m_size)
m_size += m_offset - m_size;
assert(m_impl.offset == m_offset, "Incoherent offset returned from file writer.");
}
import std.file : getSize;
assert(getSize(m_path.toNativeString()) == m_size, "Incoherency between local size and filesize: " ~ m_size.to!string ~ "B assumed for a file of size " ~ getSize(m_path.toNativeString()).to!string ~ "B");
// too slow: assert(getSize(m_path.toNativeString()) == m_size, "Incoherency between local size and filesize: " ~ m_size.to!string ~ "B assumed for a file of size " ~ getSize(m_path.toNativeString()).to!string ~ "B");
}

void write(InputStream stream, ulong nbytes = 0)
Expand Down Expand Up @@ -1144,18 +1140,22 @@ final class LibasyncTCPConnection : TCPConnection/*, Buffered*/ {
onClose(null, false);
}

bool waitForData(Duration timeout = 0.seconds)
bool waitForData(Duration timeout = Duration.max)
{
// 0 seconds is max. CHanging this would be breaking, might as well use -1 for immediate
if (timeout == 0.seconds)
timeout = Duration.max;
logTrace("WaitForData enter, timeout %s :: Ptr %s", timeout.toString(), (cast(void*)this).to!string);
acquireReader();
auto _driver = getEventDriver();
auto tm = _driver.createTimer(null);
scope(exit) {
scope(exit) {
_driver.stopTimer(tm);
_driver.releaseTimer(tm);
releaseReader();
}
_driver.m_timers.getUserData(tm).owner = Task.getThis();
_driver.rearmTimer(tm, timeout, false);
if (timeout != Duration.max) _driver.rearmTimer(tm, timeout, false);
logTrace("waitForData TCP");
while (m_readBuffer.empty) {
if (!connected) return false;
Expand All @@ -1168,7 +1168,7 @@ final class LibasyncTCPConnection : TCPConnection/*, Buffered*/ {
getDriverCore().yieldForEvent();
m_settings.reader.noExcept = false;
}
if (!_driver.isTimerPending(tm)) {
if (timeout != Duration.max && !_driver.isTimerPending(tm)) {
logTrace("WaitForData TCP: timer signal");
return false;
}
Expand Down

0 comments on commit 43f1b7e

Please sign in to comment.