Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Windows] Improve timer accuracy. #843

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ if(CMAKE_SYSTEM_NAME STREQUAL Windows)
ShLwApi
WS2_32
WinMM
mincore
synchronization)
endif()

Expand Down
293 changes: 196 additions & 97 deletions src/event/event_windows.c
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@
*/

#include "internal.h"

#if DISPATCH_EVENT_BACKEND_WINDOWS

#define DEBUG_TIMERS 0

static HANDLE hPort = NULL;
enum _dispatch_windows_port {
DISPATCH_PORT_POKE = 0,
DISPATCH_PORT_TIMER_CLOCK_WALL,
DISPATCH_PORT_TIMER_CLOCK_UPTIME,
DISPATCH_PORT_TIMER_CLOCK_MONOTONIC,
DISPATCH_PORT_FILE_HANDLE,
DISPATCH_PORT_PIPE_HANDLE_READ,
DISPATCH_PORT_PIPE_HANDLE_WRITE,
Expand Down Expand Up @@ -740,16 +740,16 @@ _dispatch_event_merge_socket_write(dispatch_muxnote_t dmn,
#pragma mark timers

typedef struct _dispatch_windows_timeout_s {
PTP_TIMER pTimer;
enum _dispatch_windows_port ullIdent;
uint64_t fireTime;
uint64_t leeway;
bool bArmed;
} *dispatch_windows_timeout_t;

#define DISPATCH_WINDOWS_TIMEOUT_INITIALIZER(clock) \
[DISPATCH_CLOCK_##clock] = { \
.pTimer = NULL, \
.ullIdent = DISPATCH_PORT_TIMER_CLOCK_##clock, \
.bArmed = FALSE, \
#define DISPATCH_WINDOWS_TIMEOUT_INITIALIZER(clock) \
[DISPATCH_CLOCK_##clock] = { \
.fireTime = 0, \
.leeway = 0, \
.bArmed = FALSE, \
}

static struct _dispatch_windows_timeout_s _dispatch_windows_timeout[] = {
Expand All @@ -770,58 +770,27 @@ _dispatch_event_merge_timer(dispatch_clock_t clock)
_dispatch_timers_heap[tidx].dth_armed = false;
}

static void CALLBACK
_dispatch_timer_callback(PTP_CALLBACK_INSTANCE Instance, PVOID Context,
PTP_TIMER Timer)
{
BOOL bSuccess;

bSuccess = PostQueuedCompletionStatus(hPort, 0, (ULONG_PTR)Context,
NULL);
if (bSuccess == FALSE) {
DISPATCH_INTERNAL_CRASH(GetLastError(),
"PostQueuedCompletionStatus");
}
}

void
_dispatch_event_loop_timer_arm(dispatch_timer_heap_t dth DISPATCH_UNUSED,
uint32_t tidx, dispatch_timer_delay_s range,
dispatch_clock_now_cache_t nows)
{
dispatch_windows_timeout_t timer;
FILETIME ftDueTime;
LARGE_INTEGER liTime;

switch (DISPATCH_TIMER_CLOCK(tidx)) {
case DISPATCH_CLOCK_WALL:
timer = &_dispatch_windows_timeout[DISPATCH_CLOCK_WALL];
liTime.QuadPart = range.delay +
_dispatch_time_now_cached(DISPATCH_TIMER_CLOCK(tidx), nows);
break;

case DISPATCH_CLOCK_UPTIME:
case DISPATCH_CLOCK_MONOTONIC:
timer = &_dispatch_windows_timeout[DISPATCH_TIMER_CLOCK(tidx)];
liTime.QuadPart = -((range.delay + 99) / 100);
break;
}

if (timer->pTimer == NULL) {
timer->pTimer = CreateThreadpoolTimer(_dispatch_timer_callback,
(LPVOID)timer->ullIdent, NULL);
if (timer->pTimer == NULL) {
DISPATCH_INTERNAL_CRASH(GetLastError(),
"CreateThreadpoolTimer");
}
}

ftDueTime.dwHighDateTime = liTime.HighPart;
ftDueTime.dwLowDateTime = liTime.LowPart;
uint64_t now = _dispatch_time_now_cached(DISPATCH_TIMER_CLOCK(tidx), nows);

SetThreadpoolTimer(timer->pTimer, &ftDueTime, /*msPeriod=*/0,
/*msWindowLength=*/0);
timer = &_dispatch_windows_timeout[DISPATCH_TIMER_CLOCK(tidx)];
timer->fireTime = range.delay + now;
timer->leeway = range.leeway;
timer->bArmed = TRUE;

#if DEBUG_TIMERS
printf("[%lx] Arming clock %d: fire time %"PRIu64", leeway %"PRIu64"\n",
GetCurrentThreadId(),
DISPATCH_TIMER_CLOCK(tidx),
timer->fireTime,
timer->leeway);
#endif
}

void
Expand All @@ -830,20 +799,137 @@ _dispatch_event_loop_timer_delete(dispatch_timer_heap_t dth DISPATCH_UNUSED,
{
dispatch_windows_timeout_t timer;

switch (DISPATCH_TIMER_CLOCK(tidx)) {
case DISPATCH_CLOCK_WALL:
timer = &_dispatch_windows_timeout[DISPATCH_CLOCK_WALL];
break;
timer = &_dispatch_windows_timeout[DISPATCH_TIMER_CLOCK(tidx)];
#if DEBUG_TIMERS
BOOL wasArmed = timer->bArmed;
#endif
timer->bArmed = FALSE;

case DISPATCH_CLOCK_UPTIME:
case DISPATCH_CLOCK_MONOTONIC:
timer = &_dispatch_windows_timeout[DISPATCH_TIMER_CLOCK(tidx)];
break;
#if DEBUG_TIMERS
if (wasArmed) {
printf("[%lx] Disarming clock %d\n",
GetCurrentThreadId(),
DISPATCH_TIMER_CLOCK(tidx));
}
#endif
}

SetThreadpoolTimer(timer->pTimer, NULL, /*msPeriod=*/0,
/*msWindowLength=*/0);
timer->bArmed = FALSE;
static uint32_t
_dispatch_service_event_loop_timers(dispatch_clock_now_cache_t nows,
BOOL shouldWait) {
int nextClock = -1;
uint64_t nextDelay = ~(uint64_t)0;
uint64_t nextLeeway = 0;
uint64_t minDeadline = ~(uint64_t)0;
BOOL didFireTimer = FALSE;

#if DEBUG_TIMERS
printf("[%lx] Runnimg timers\n", GetCurrentThreadId());
#endif

// Fire any timer events that have passed, and work out the
// minimum delay until the next one we need to deal with, taking
// leeway into account (e.g. a timer that needs to fire in 10ms
// with 0ms leeway will take priority over a timer that needs to
// fire in 1ms with 100ms leeway).
for (int clock = 0; clock < DISPATCH_CLOCK_COUNT; ++clock) {
#if DEBUG_TIMERS
printf("Clock %d: ", clock);
#endif

if (!_dispatch_windows_timeout[clock].bArmed) {
#if DEBUG_TIMERS
printf("not armed\n");
#endif
continue;
}

uint64_t now = _dispatch_time_now_cached(clock, nows);
#if DEBUG_TIMERS
printf("current time %"PRIu64", ", now);
#endif
if (_dispatch_windows_timeout[clock].fireTime <= now) {
#if DEBUG_TIMERS
uint64_t lateness = now - _dispatch_windows_timeout[clock].fireTime;
printf("firing timer (late by %"PRIu64")\n", lateness);
#endif
didFireTimer = TRUE;
_dispatch_event_merge_timer(clock);
continue;
}

uint64_t delay = _dispatch_windows_timeout[clock].fireTime - now;
uint64_t leeway = _dispatch_windows_timeout[clock].leeway;
uint64_t deadline;

#if DEBUG_TIMERS
printf("delay %"PRIu64", leeway %"PRIu64"\n", delay, leeway);
#endif

// Use saturating addition here to avoid wrapping
if (~(uint64_t)0 - delay < leeway)
deadline = ~(uint64_t)0;
else
deadline = delay + leeway;

if (deadline < minDeadline) {
nextClock = clock;
nextDelay = delay;
nextLeeway = leeway;
minDeadline = deadline;
}
}

// If we fired a timer, we mustn't wait; the timer code might need to
// run in order to set up another timer.
if (didFireTimer) {
#if DEBUG_TIMERS
printf("Timer fired, so not waiting\n");
#endif
return 0;
}

// If we aren't waiting for a timer, we want to wait forever on the
// completion port.
if (nextClock == -1) {
#if DEBUG_TIMERS
printf("Not waiting\n");
#endif
return INFINITE;
}

#if DEBUG_TIMERS
printf("Waiting for clock %d\n", nextClock);
#endif

// Calculate the number of milliseconds we should wait in an ideal world.
// Windows can only actually wait multiples of its current tick length,
// which defaults to 1/64s, but may vary depending on other programs
// that are executing.
uint32_t msToWait = nextDelay / 1000000;

#if DEBUG_TIMERS
printf("msToWait = %"PRIu32"\n", msToWait);
#endif

// If the deadline is less than 15ms away, or we have less than 15ms
// of leeway, reduce `msToWait` so that we spin up to the fire time.
if (minDeadline < 15000000 || nextLeeway < 15000000) {
if (msToWait < 15)
msToWait = 0;
else
msToWait -= 15;

#if DEBUG_TIMERS
printf("Adjusted msToWait = %"PRIu32"\n", msToWait);
#endif
} else {
#if DEBUG_TIMERS
printf("msToWait = %"PRIu32"\n", msToWait);
#endif
}

return msToWait;
}

#pragma mark dispatch_loop
Expand Down Expand Up @@ -881,30 +967,51 @@ DISPATCH_NOINLINE
void
_dispatch_event_loop_drain(uint32_t flags)
{
DWORD dwNumberOfBytesTransferred;
ULONG_PTR ulCompletionKey;
LPOVERLAPPED pOV;
BOOL bSuccess;
BOOL shouldWait = !(flags & KEVENT_FLAG_IMMEDIATE);
OVERLAPPED_ENTRY entries[64];
ULONG ulEntryCount = 0;

pOV = (LPOVERLAPPED)&pOV;
bSuccess = GetQueuedCompletionStatus(hPort, &dwNumberOfBytesTransferred,
&ulCompletionKey, &pOV,
(flags & KEVENT_FLAG_IMMEDIATE) ? 0 : INFINITE);
while (bSuccess) {
switch (ulCompletionKey) {
case DISPATCH_PORT_POKE:
break;
do {
dispatch_clock_now_cache_s nows = { };

// Run the timers first, calculating the number of milliseconds until
// the next time we need to wake up
DWORD dwMsToWait = _dispatch_service_event_loop_timers(&nows,
shouldWait);

// Read entries from the IO completion port
BOOL bSuccess = GetQueuedCompletionStatusEx(
hPort,
entries,
sizeof(entries) / sizeof(entries[0]),
&ulEntryCount,
shouldWait ? dwMsToWait : 0,
TRUE
);

case DISPATCH_PORT_TIMER_CLOCK_WALL:
_dispatch_event_merge_timer(DISPATCH_CLOCK_WALL);
break;
if (!bSuccess) {
DWORD dwErr = GetLastError();

case DISPATCH_PORT_TIMER_CLOCK_UPTIME:
_dispatch_event_merge_timer(DISPATCH_CLOCK_UPTIME);
break;
// If the port has been closed, or we timed-out, we're done.
if (dwErr == ERROR_ABANDONED_WAIT_0 || dwErr == WAIT_TIMEOUT)
break;

// If an APC occurred, go around again (we still want to wait).
if (dwErr == WAIT_IO_COMPLETION)
continue;

DISPATCH_INTERNAL_CRASH(dwErr, "GetQueuedCompletionStatus");
}
} while (false);

case DISPATCH_PORT_TIMER_CLOCK_MONOTONIC:
_dispatch_event_merge_timer(DISPATCH_CLOCK_MONOTONIC);
for (ULONG ulEntry = 0; ulEntry < ulEntryCount; ++ulEntry) {
ULONG_PTR ulCompletionKey = entries[ulEntry].lpCompletionKey;
LPOVERLAPPED pOV = entries[ulEntry].lpOverlapped;
DWORD dwNumberOfBytesTransferred =
entries[ulEntry].dwNumberOfBytesTransferred;

switch (ulCompletionKey) {
case DISPATCH_PORT_POKE:
break;

case DISPATCH_PORT_FILE_HANDLE:
Expand All @@ -913,36 +1020,28 @@ _dispatch_event_loop_drain(uint32_t flags)

case DISPATCH_PORT_PIPE_HANDLE_READ:
_dispatch_event_merge_pipe_handle_read((dispatch_muxnote_t)pOV,
dwNumberOfBytesTransferred);
dwNumberOfBytesTransferred);
break;

case DISPATCH_PORT_PIPE_HANDLE_WRITE:
_dispatch_event_merge_pipe_handle_write((dispatch_muxnote_t)pOV,
dwNumberOfBytesTransferred);
dwNumberOfBytesTransferred);
break;

case DISPATCH_PORT_SOCKET_READ:
_dispatch_event_merge_socket_read((dispatch_muxnote_t)pOV,
dwNumberOfBytesTransferred);
dwNumberOfBytesTransferred);
break;

case DISPATCH_PORT_SOCKET_WRITE:
_dispatch_event_merge_socket_write((dispatch_muxnote_t)pOV,
dwNumberOfBytesTransferred);
dwNumberOfBytesTransferred);
break;

default:
DISPATCH_INTERNAL_CRASH(ulCompletionKey,
"unsupported completion key");
"unsupported completion key");
}

bSuccess = GetQueuedCompletionStatus(hPort,
&dwNumberOfBytesTransferred, &ulCompletionKey, &pOV, 0);
}

if (bSuccess == FALSE && pOV != NULL) {
DISPATCH_INTERNAL_CRASH(GetLastError(),
"GetQueuedCompletionStatus");
}
}

Expand Down
Loading