From 1b3d71c7cc9d2c68de542bba5dcf9d976039aaa5 Mon Sep 17 00:00:00 2001 From: Julien Portalier Date: Thu, 2 May 2024 19:20:45 +0200 Subject: [PATCH 1/7] Add EventLoop#run(blocking) and EventLoop#interrupt The IOCP implementation hasn't been tested, yet. --- src/crystal/system/event_loop.cr | 23 ++++++- src/crystal/system/unix/event_libevent.cr | 17 +++-- .../system/unix/event_loop_libevent.cr | 10 ++- src/crystal/system/unix/lib_event2.cr | 1 + src/crystal/system/wasi/event_loop.cr | 4 ++ src/crystal/system/win32/event_loop_iocp.cr | 66 +++++++++++++++---- src/io/overlapped.cr | 6 +- .../c/processthreadsapi.cr | 1 + src/lib_c/x86_64-windows-msvc/c/winnt.cr | 2 + 9 files changed, 107 insertions(+), 23 deletions(-) diff --git a/src/crystal/system/event_loop.cr b/src/crystal/system/event_loop.cr index 0f6351fbac24..69937d989b15 100644 --- a/src/crystal/system/event_loop.cr +++ b/src/crystal/system/event_loop.cr @@ -7,9 +7,30 @@ abstract class Crystal::EventLoop Crystal::Scheduler.event_loop end - # Runs the event loop. + # Runs the loop. + # + # Set `blocking` to false to return immediately if there are no activable + # events. Set it to true to wait for activable events, which will block the + # current thread until then. + # + # Returns `true` on normal returns (e.g. has activated events, has pending + # events but blocking was false) and `false` when there are no registered + # events. + abstract def run(blocking : Bool) : Bool + + # Runs the event loop. Blocks until at least one event is activated then + # returns. May return immediately if there are activable events. abstract def run_once : Nil + # Tells a blocking run loop to no longer wait for events to activate. It may + # for example enqueue a NOOP event with an immediate (or past) timeout. Having + # activated an event, the loop shall return, allowing the blocked thread to + # continue. + # + # Should be a NOOP when the loop isn't running or is running in a nonblocking + # mode. + abstract def interrupt : Nil + # Create a new resume event for a fiber. abstract def create_resume_event(fiber : Fiber) : Event diff --git a/src/crystal/system/unix/event_libevent.cr b/src/crystal/system/unix/event_libevent.cr index 852c9483809d..21d6765646d1 100644 --- a/src/crystal/system/unix/event_libevent.cr +++ b/src/crystal/system/unix/event_libevent.cr @@ -59,18 +59,23 @@ module Crystal::LibEvent Crystal::LibEvent::Event.new(event) end - def run_loop : Nil - LibEvent2.event_base_loop(@base, LibEvent2::EventLoopFlags::None) - end - - def run_once : Nil - LibEvent2.event_base_loop(@base, LibEvent2::EventLoopFlags::Once) + # NOTE: may return `true` even if no event has been triggered (e.g. + # nonblocking), but `false` means that nothing was processed. + def loop(once : Bool, nonblock : Bool) : Bool + flags = LibEvent2::EventLoopFlags::None + flags |= LibEvent2::EventLoopFlags::Once if once + flags |= LibEvent2::EventLoopFlags::NonBlock if nonblock + LibEvent2.event_base_loop(@base, flags) == 0 end def loop_break : Nil LibEvent2.event_base_loopbreak(@base) end + def loop_exit : Nil + LibEvent2.event_base_loopexit(@base, nil) + end + def new_dns_base(init = true) DnsBase.new LibEvent2.evdns_base_new(@base, init ? 1 : 0) end diff --git a/src/crystal/system/unix/event_loop_libevent.cr b/src/crystal/system/unix/event_loop_libevent.cr index 06c0ea8b03f0..17228218200d 100644 --- a/src/crystal/system/unix/event_loop_libevent.cr +++ b/src/crystal/system/unix/event_loop_libevent.cr @@ -18,9 +18,17 @@ class Crystal::LibEvent::EventLoop < Crystal::EventLoop end {% end %} + def run(blocking : Bool) : Bool + event_base.loop(once: true, nonblock: !blocking) + end + # Runs the event loop. def run_once : Nil - event_base.run_once + event_base.loop(once: true) + end + + def interrupt : Nil + event_base.loop_exit end # Create a new resume event for a fiber. diff --git a/src/crystal/system/unix/lib_event2.cr b/src/crystal/system/unix/lib_event2.cr index 5bc8ff514818..2cd3e4635194 100644 --- a/src/crystal/system/unix/lib_event2.cr +++ b/src/crystal/system/unix/lib_event2.cr @@ -47,6 +47,7 @@ lib LibEvent2 fun event_base_dispatch(eb : EventBase) : Int fun event_base_loop(eb : EventBase, flags : EventLoopFlags) : Int fun event_base_loopbreak(eb : EventBase) : Int + fun event_base_loopexit(EventBase, LibC::Timeval*) : LibC::Int fun event_set_log_callback(callback : (Int, UInt8*) -> Nil) fun event_enable_debug_mode fun event_reinit(eb : EventBase) : Int diff --git a/src/crystal/system/wasi/event_loop.cr b/src/crystal/system/wasi/event_loop.cr index 0f411d80709f..f108e2720b28 100644 --- a/src/crystal/system/wasi/event_loop.cr +++ b/src/crystal/system/wasi/event_loop.cr @@ -7,6 +7,10 @@ end # :nodoc: class Crystal::Wasi::EventLoop < Crystal::EventLoop + def run(blocking : Bool) : Bool + raise NotImplementedError.new("Crystal::Wasi::EventLoop.run") + end + # Runs the event loop. def run_once : Nil raise NotImplementedError.new("Crystal::Wasi::EventLoop.run_once") diff --git a/src/crystal/system/win32/event_loop_iocp.cr b/src/crystal/system/win32/event_loop_iocp.cr index 36ff4e58330b..bfd0a74c172a 100644 --- a/src/crystal/system/win32/event_loop_iocp.cr +++ b/src/crystal/system/win32/event_loop_iocp.cr @@ -13,6 +13,10 @@ class Crystal::Iocp::EventLoop < Crystal::EventLoop # This is a list of resume and timeout events managed outside of IOCP. @queue = Deque(Crystal::Iocp::Event).new + @lock = Crystal::SpinLock.new + @interrupted = Atomic(Bool).new(false) + @blocked_thread = Atomic(Thread?).new(nil) + # Returns the base IO Completion Port getter iocp : LibC::HANDLE do create_completion_port(LibC::INVALID_HANDLE_VALUE, nil) @@ -35,35 +39,55 @@ class Crystal::Iocp::EventLoop < Crystal::EventLoop iocp end + def run_once : Nil + run(blocking: true) + end + # Runs the event loop and enqueues the fiber for the next upcoming event or # completion. - def run_once : Nil + def run(blocking : Bool) : Bool # Pull the next upcoming event from the event queue. This determines the # timeout for waiting on the completion port. # OPTIMIZE: Implement @queue as a priority queue in order to avoid this # explicit search for the lowest value and dequeue more efficient. next_event = @queue.min_by?(&.wake_at) - unless next_event - Crystal::System.print_error "Warning: No runnables in scheduler. Exiting program.\n" - ::exit - end + # no registered events: nothing to wait for + return false unless next_event now = Time.monotonic if next_event.wake_at > now - wait_time = next_event.wake_at - now - # There is no event ready to wake. So we wait for completions with a - # timeout for the next event wake time. + # There is no event ready to wake. We wait for completions until the next + # event wake time, unless nonblocking or already interrupted (timeout + # immediately). + if blocking + @lock.sync do + if @interrupted.get(:acquire) + blocking = false + else + # memorize the blocked thread (so we can alert it) + @blocked_thread.set(Thread.current, :release) + end + end + end - timed_out = IO::Overlapped.wait_queued_completions(wait_time.total_milliseconds) do |fiber| + wait_time = blocking ? (next_event.wake_at - now).total_milliseconds : 0 + timed_out = IO::Overlapped.wait_queued_completions(wait_time, alertable: blocking) do |fiber| # This block may run multiple times. Every single fiber gets enqueued. fiber.enqueue end - # If the wait for completion timed out we've reached the wake time and - # continue with waking `next_event`. - return unless timed_out + @blocked_thread.set(nil, :release) + @interrupted.set(false, :release) + + # The wait for completion enqueued events. + return true unless timed_out + + # Wait for completion timed out but it may have been interrupted or we ask + # for immediate timeout (nonblocking), so we check for the next event + # readyness again: + return false if next_event.wake_at > Time.monotonic end # next_event gets activated because its wake time is passed, either from the @@ -81,7 +105,7 @@ class Crystal::Iocp::EventLoop < Crystal::EventLoop # This would avoid the scheduler needing to looking at runnable again just # to notice it's still empty. The lock involved there should typically be # uncontested though, so it's probably not a big deal. - return if fiber.dead? + return false if fiber.dead? # A timeout event needs special handling because it does not necessarily # means to resume the fiber directly, in case a different select branch @@ -92,6 +116,22 @@ class Crystal::Iocp::EventLoop < Crystal::EventLoop else fiber.enqueue end + + # We enqueued a fiber. + true + end + + def interrupt : Nil + thread = nil + + @lock.sync do + @interrupted.set(true) + thread = @blocked_thread.swap(nil, :acquire) + end + return unless thread + + # alert the thread to interrupt GetQueuedCompletionStatusEx + LibC.QueueUserAPC(->(ptr : LibC::ULONG_PTR) {}, thread, LibC::ULONG_PTR.new(0)) end def enqueue(event : Crystal::Iocp::Event) diff --git a/src/io/overlapped.cr b/src/io/overlapped.cr index e50dd50901f8..20c7a30d3a17 100644 --- a/src/io/overlapped.cr +++ b/src/io/overlapped.cr @@ -8,7 +8,7 @@ module IO::Overlapped property fiber : Fiber? end - def self.wait_queued_completions(timeout, &) + def self.wait_queued_completions(timeout, alertable = false, &) overlapped_entries = uninitialized LibC::OVERLAPPED_ENTRY[1] if timeout > UInt64::MAX @@ -16,11 +16,13 @@ module IO::Overlapped else timeout = timeout.to_u64 end - result = LibC.GetQueuedCompletionStatusEx(Crystal::EventLoop.current.iocp, overlapped_entries, overlapped_entries.size, out removed, timeout, false) + result = LibC.GetQueuedCompletionStatusEx(Crystal::EventLoop.current.iocp, overlapped_entries, overlapped_entries.size, out removed, timeout, alertable) if result == 0 error = WinError.value if timeout && error.wait_timeout? return true + elsif alertable && error.error_exe_marked_invalid? + return true else raise IO::Error.from_os_error("GetQueuedCompletionStatusEx", error) end diff --git a/src/lib_c/x86_64-windows-msvc/c/processthreadsapi.cr b/src/lib_c/x86_64-windows-msvc/c/processthreadsapi.cr index efa684075162..d1e13eced324 100644 --- a/src/lib_c/x86_64-windows-msvc/c/processthreadsapi.cr +++ b/src/lib_c/x86_64-windows-msvc/c/processthreadsapi.cr @@ -57,6 +57,7 @@ lib LibC fun GetProcessTimes(hProcess : HANDLE, lpCreationTime : FILETIME*, lpExitTime : FILETIME*, lpKernelTime : FILETIME*, lpUserTime : FILETIME*) : BOOL fun SwitchToThread : BOOL + fun QueueUserAPC(pfnAPC : PAPCFUNC, hThread : HANDLE, dwData : ULONG_PTR) : DWORD PROCESS_QUERY_INFORMATION = 0x0400 end diff --git a/src/lib_c/x86_64-windows-msvc/c/winnt.cr b/src/lib_c/x86_64-windows-msvc/c/winnt.cr index 0c6a0db3c986..e1f133dcae48 100644 --- a/src/lib_c/x86_64-windows-msvc/c/winnt.cr +++ b/src/lib_c/x86_64-windows-msvc/c/winnt.cr @@ -14,6 +14,8 @@ lib LibC alias HANDLE = Void* alias HMODULE = Void* + alias PAPCFUNC = ULONG_PTR -> + INVALID_FILE_ATTRIBUTES = DWORD.new!(-1) FILE_ATTRIBUTE_DIRECTORY = 0x10 FILE_ATTRIBUTE_HIDDEN = 0x2 From 14c1d656f034ff22c66f0fc0d37f74f7eb5787f9 Mon Sep 17 00:00:00 2001 From: Julien Portalier Date: Mon, 6 May 2024 21:32:52 +0200 Subject: [PATCH 2/7] fixup! Add EventLoop#run(blocking) and EventLoop#interrupt --- src/crystal/system/unix/event_loop_libevent.cr | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/crystal/system/unix/event_loop_libevent.cr b/src/crystal/system/unix/event_loop_libevent.cr index 17228218200d..3af8aafffcaa 100644 --- a/src/crystal/system/unix/event_loop_libevent.cr +++ b/src/crystal/system/unix/event_loop_libevent.cr @@ -24,7 +24,7 @@ class Crystal::LibEvent::EventLoop < Crystal::EventLoop # Runs the event loop. def run_once : Nil - event_base.loop(once: true) + event_base.loop(once: true, nonblock: false) end def interrupt : Nil From dd259135e8a7d345601f75388db06677a5e43d14 Mon Sep 17 00:00:00 2001 From: Julien Portalier Date: Tue, 7 May 2024 09:45:33 +0200 Subject: [PATCH 3/7] Fix: WAIT_IO_COMPLETION instead of ERROR_EXE_MARKED_INVALID MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Johannes Müller --- src/io/overlapped.cr | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/io/overlapped.cr b/src/io/overlapped.cr index 20c7a30d3a17..09bd34466321 100644 --- a/src/io/overlapped.cr +++ b/src/io/overlapped.cr @@ -21,7 +21,7 @@ module IO::Overlapped error = WinError.value if timeout && error.wait_timeout? return true - elsif alertable && error.error_exe_marked_invalid? + elsif alertable && error.value == LibC::WAIT_IO_COMPLETION return true else raise IO::Error.from_os_error("GetQueuedCompletionStatusEx", error) From efde2343e61a9d2132e4cb0cf1e0d4fc29fd90a9 Mon Sep 17 00:00:00 2001 From: Julien Portalier Date: Mon, 6 May 2024 21:35:30 +0200 Subject: [PATCH 4/7] Fix: mising EventLoop#interrup for WASI --- src/crystal/system/wasi/event_loop.cr | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/crystal/system/wasi/event_loop.cr b/src/crystal/system/wasi/event_loop.cr index f108e2720b28..955ff820aeb4 100644 --- a/src/crystal/system/wasi/event_loop.cr +++ b/src/crystal/system/wasi/event_loop.cr @@ -16,6 +16,10 @@ class Crystal::Wasi::EventLoop < Crystal::EventLoop raise NotImplementedError.new("Crystal::Wasi::EventLoop.run_once") end + def interrupt : Bool + raise NotImplementedError.new("Crystal::Wasi::EventLoop.interrupt") + end + # Create a new resume event for a fiber. def create_resume_event(fiber : Fiber) : Crystal::EventLoop::Event raise NotImplementedError.new("Crystal::Wasi::EventLoop.create_resume_event") From 86153077190e5e8edc777e2eec34aaa8e798a0f4 Mon Sep 17 00:00:00 2001 From: Julien Portalier Date: Tue, 7 May 2024 09:51:32 +0200 Subject: [PATCH 5/7] fixup! Fix: mising EventLoop#interrup for WASI --- src/crystal/system/wasi/event_loop.cr | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/crystal/system/wasi/event_loop.cr b/src/crystal/system/wasi/event_loop.cr index 955ff820aeb4..77daad669733 100644 --- a/src/crystal/system/wasi/event_loop.cr +++ b/src/crystal/system/wasi/event_loop.cr @@ -16,7 +16,7 @@ class Crystal::Wasi::EventLoop < Crystal::EventLoop raise NotImplementedError.new("Crystal::Wasi::EventLoop.run_once") end - def interrupt : Bool + def interrupt : Nil raise NotImplementedError.new("Crystal::Wasi::EventLoop.interrupt") end From 66526de33e88e1cdf33794567a705366dbe0c545 Mon Sep 17 00:00:00 2001 From: Julien Portalier Date: Mon, 13 May 2024 18:03:08 +0200 Subject: [PATCH 6/7] Remove #run_once and use #run(blocking: true) instead --- src/crystal/scheduler.cr | 2 +- src/crystal/system/event_loop.cr | 10 +++------- src/crystal/system/unix/event_loop_libevent.cr | 5 ----- src/crystal/system/wasi/event_loop.cr | 6 +----- src/crystal/system/win32/event_loop_iocp.cr | 4 ---- 5 files changed, 5 insertions(+), 22 deletions(-) diff --git a/src/crystal/scheduler.cr b/src/crystal/scheduler.cr index c86d04309b14..4796226ce8e9 100644 --- a/src/crystal/scheduler.cr +++ b/src/crystal/scheduler.cr @@ -149,7 +149,7 @@ class Crystal::Scheduler resume(runnable) unless runnable == @thread.current_fiber break else - @event_loop.run_once + @event_loop.run(blocking: true) end end end diff --git a/src/crystal/system/event_loop.cr b/src/crystal/system/event_loop.cr index 69937d989b15..0c27a18ff147 100644 --- a/src/crystal/system/event_loop.cr +++ b/src/crystal/system/event_loop.cr @@ -9,19 +9,15 @@ abstract class Crystal::EventLoop # Runs the loop. # - # Set `blocking` to false to return immediately if there are no activable - # events. Set it to true to wait for activable events, which will block the - # current thread until then. + # Returns immediately if events are activable. Set `blocking` to false to + # return immediately if there are no activable events. Set it to true to wait + # for activable events, which will block the current thread until then. # # Returns `true` on normal returns (e.g. has activated events, has pending # events but blocking was false) and `false` when there are no registered # events. abstract def run(blocking : Bool) : Bool - # Runs the event loop. Blocks until at least one event is activated then - # returns. May return immediately if there are activable events. - abstract def run_once : Nil - # Tells a blocking run loop to no longer wait for events to activate. It may # for example enqueue a NOOP event with an immediate (or past) timeout. Having # activated an event, the loop shall return, allowing the blocked thread to diff --git a/src/crystal/system/unix/event_loop_libevent.cr b/src/crystal/system/unix/event_loop_libevent.cr index 3af8aafffcaa..fe95ec0c8a3e 100644 --- a/src/crystal/system/unix/event_loop_libevent.cr +++ b/src/crystal/system/unix/event_loop_libevent.cr @@ -22,11 +22,6 @@ class Crystal::LibEvent::EventLoop < Crystal::EventLoop event_base.loop(once: true, nonblock: !blocking) end - # Runs the event loop. - def run_once : Nil - event_base.loop(once: true, nonblock: false) - end - def interrupt : Nil event_base.loop_exit end diff --git a/src/crystal/system/wasi/event_loop.cr b/src/crystal/system/wasi/event_loop.cr index 77daad669733..e1c2fc2166f1 100644 --- a/src/crystal/system/wasi/event_loop.cr +++ b/src/crystal/system/wasi/event_loop.cr @@ -7,15 +7,11 @@ end # :nodoc: class Crystal::Wasi::EventLoop < Crystal::EventLoop + # Runs the event loop. def run(blocking : Bool) : Bool raise NotImplementedError.new("Crystal::Wasi::EventLoop.run") end - # Runs the event loop. - def run_once : Nil - raise NotImplementedError.new("Crystal::Wasi::EventLoop.run_once") - end - def interrupt : Nil raise NotImplementedError.new("Crystal::Wasi::EventLoop.interrupt") end diff --git a/src/crystal/system/win32/event_loop_iocp.cr b/src/crystal/system/win32/event_loop_iocp.cr index bfd0a74c172a..ae89e85bd3ed 100644 --- a/src/crystal/system/win32/event_loop_iocp.cr +++ b/src/crystal/system/win32/event_loop_iocp.cr @@ -39,10 +39,6 @@ class Crystal::Iocp::EventLoop < Crystal::EventLoop iocp end - def run_once : Nil - run(blocking: true) - end - # Runs the event loop and enqueues the fiber for the next upcoming event or # completion. def run(blocking : Bool) : Bool From 9b1e3aecf3c760ad019fae56f8ef6e609db66e97 Mon Sep 17 00:00:00 2001 From: Julien Portalier Date: Mon, 13 May 2024 18:08:59 +0200 Subject: [PATCH 7/7] Add note about #interrupt assuming only one waiting thread --- src/crystal/system/event_loop.cr | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/crystal/system/event_loop.cr b/src/crystal/system/event_loop.cr index 0c27a18ff147..b8697025d2fb 100644 --- a/src/crystal/system/event_loop.cr +++ b/src/crystal/system/event_loop.cr @@ -10,7 +10,7 @@ abstract class Crystal::EventLoop # Runs the loop. # # Returns immediately if events are activable. Set `blocking` to false to - # return immediately if there are no activable events. Set it to true to wait + # return immediately if there are no activable events; set it to true to wait # for activable events, which will block the current thread until then. # # Returns `true` on normal returns (e.g. has activated events, has pending @@ -25,6 +25,9 @@ abstract class Crystal::EventLoop # # Should be a NOOP when the loop isn't running or is running in a nonblocking # mode. + # + # NOTE: we assume that multiple threads won't run the event loop at the same + # time in parallel, but this assumption may change in the future! abstract def interrupt : Nil # Create a new resume event for a fiber.