Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add EventLoop#run(blocking) and EventLoop#interrupt #14568

2 changes: 1 addition & 1 deletion src/crystal/scheduler.cr
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ class Crystal::Scheduler
resume(runnable) unless runnable == @thread.current_fiber
break
else
@event_loop.run_once
@event_loop.run(blocking: true)
end
end
end
Expand Down
24 changes: 22 additions & 2 deletions src/crystal/system/event_loop.cr
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,28 @@ abstract class Crystal::EventLoop
Crystal::Scheduler.event_loop
end

# Runs the event loop.
abstract def run_once : Nil
# Runs the loop.
#
# Returns immediately if events are activable. Set `blocking` to false to
# return immediately if there are no activable events; set it to true to wait
# for activable events, which will block the current thread until then.
#
# Returns `true` on normal returns (e.g. has activated events, has pending
# events but blocking was false) and `false` when there are no registered
# events.
abstract def run(blocking : Bool) : Bool

# Tells a blocking run loop to no longer wait for events to activate. It may
# for example enqueue a NOOP event with an immediate (or past) timeout. Having
# activated an event, the loop shall return, allowing the blocked thread to
# continue.
#
# Should be a NOOP when the loop isn't running or is running in a nonblocking
# mode.
#
# NOTE: we assume that multiple threads won't run the event loop at the same
# time in parallel, but this assumption may change in the future!
abstract def interrupt : Nil

# Create a new resume event for a fiber.
abstract def create_resume_event(fiber : Fiber) : Event
Expand Down
17 changes: 11 additions & 6 deletions src/crystal/system/unix/event_libevent.cr
Original file line number Diff line number Diff line change
Expand Up @@ -59,18 +59,23 @@ module Crystal::LibEvent
Crystal::LibEvent::Event.new(event)
end

def run_loop : Nil
LibEvent2.event_base_loop(@base, LibEvent2::EventLoopFlags::None)
end

def run_once : Nil
LibEvent2.event_base_loop(@base, LibEvent2::EventLoopFlags::Once)
# NOTE: may return `true` even if no event has been triggered (e.g.
# nonblocking), but `false` means that nothing was processed.
def loop(once : Bool, nonblock : Bool) : Bool
flags = LibEvent2::EventLoopFlags::None
flags |= LibEvent2::EventLoopFlags::Once if once
flags |= LibEvent2::EventLoopFlags::NonBlock if nonblock
LibEvent2.event_base_loop(@base, flags) == 0
end

def loop_break : Nil
LibEvent2.event_base_loopbreak(@base)
end

def loop_exit : Nil
LibEvent2.event_base_loopexit(@base, nil)
end

def new_dns_base(init = true)
DnsBase.new LibEvent2.evdns_base_new(@base, init ? 1 : 0)
end
Expand Down
9 changes: 6 additions & 3 deletions src/crystal/system/unix/event_loop_libevent.cr
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@ class Crystal::LibEvent::EventLoop < Crystal::EventLoop
end
{% end %}

# Runs the event loop.
def run_once : Nil
event_base.run_once
def run(blocking : Bool) : Bool
event_base.loop(once: true, nonblock: !blocking)
end

def interrupt : Nil
event_base.loop_exit
end

# Create a new resume event for a fiber.
Expand Down
1 change: 1 addition & 0 deletions src/crystal/system/unix/lib_event2.cr
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ lib LibEvent2
fun event_base_dispatch(eb : EventBase) : Int
fun event_base_loop(eb : EventBase, flags : EventLoopFlags) : Int
fun event_base_loopbreak(eb : EventBase) : Int
fun event_base_loopexit(EventBase, LibC::Timeval*) : LibC::Int
fun event_set_log_callback(callback : (Int, UInt8*) -> Nil)
fun event_enable_debug_mode
fun event_reinit(eb : EventBase) : Int
Expand Down
8 changes: 6 additions & 2 deletions src/crystal/system/wasi/event_loop.cr
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,12 @@ end
# :nodoc:
class Crystal::Wasi::EventLoop < Crystal::EventLoop
# Runs the event loop.
def run_once : Nil
raise NotImplementedError.new("Crystal::Wasi::EventLoop.run_once")
def run(blocking : Bool) : Bool
raise NotImplementedError.new("Crystal::Wasi::EventLoop.run")
end

def interrupt : Nil
raise NotImplementedError.new("Crystal::Wasi::EventLoop.interrupt")
end

# Create a new resume event for a fiber.
Expand Down
62 changes: 49 additions & 13 deletions src/crystal/system/win32/event_loop_iocp.cr
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ class Crystal::Iocp::EventLoop < Crystal::EventLoop
# This is a list of resume and timeout events managed outside of IOCP.
@queue = Deque(Crystal::Iocp::Event).new

@lock = Crystal::SpinLock.new
@interrupted = Atomic(Bool).new(false)
@blocked_thread = Atomic(Thread?).new(nil)

# Returns the base IO Completion Port
getter iocp : LibC::HANDLE do
create_completion_port(LibC::INVALID_HANDLE_VALUE, nil)
Expand All @@ -37,33 +41,49 @@ class Crystal::Iocp::EventLoop < Crystal::EventLoop

# Runs the event loop and enqueues the fiber for the next upcoming event or
# completion.
def run_once : Nil
def run(blocking : Bool) : Bool
# Pull the next upcoming event from the event queue. This determines the
# timeout for waiting on the completion port.
# OPTIMIZE: Implement @queue as a priority queue in order to avoid this
# explicit search for the lowest value and dequeue more efficient.
next_event = @queue.min_by?(&.wake_at)

unless next_event
Crystal::System.print_error "Warning: No runnables in scheduler. Exiting program.\n"
::exit
end
# no registered events: nothing to wait for
return false unless next_event

now = Time.monotonic

if next_event.wake_at > now
wait_time = next_event.wake_at - now
# There is no event ready to wake. So we wait for completions with a
# timeout for the next event wake time.
# There is no event ready to wake. We wait for completions until the next
# event wake time, unless nonblocking or already interrupted (timeout
# immediately).
if blocking
@lock.sync do
if @interrupted.get(:acquire)
blocking = false
else
# memorize the blocked thread (so we can alert it)
@blocked_thread.set(Thread.current, :release)
ysbaddaden marked this conversation as resolved.
Show resolved Hide resolved
end
end
end

timed_out = IO::Overlapped.wait_queued_completions(wait_time.total_milliseconds) do |fiber|
wait_time = blocking ? (next_event.wake_at - now).total_milliseconds : 0
timed_out = IO::Overlapped.wait_queued_completions(wait_time, alertable: blocking) do |fiber|
# This block may run multiple times. Every single fiber gets enqueued.
fiber.enqueue
end

# If the wait for completion timed out we've reached the wake time and
# continue with waking `next_event`.
return unless timed_out
@blocked_thread.set(nil, :release)
@interrupted.set(false, :release)

# The wait for completion enqueued events.
return true unless timed_out

# Wait for completion timed out but it may have been interrupted or we ask
# for immediate timeout (nonblocking), so we check for the next event
# readyness again:
return false if next_event.wake_at > Time.monotonic
end

# next_event gets activated because its wake time is passed, either from the
Expand All @@ -81,7 +101,7 @@ class Crystal::Iocp::EventLoop < Crystal::EventLoop
# This would avoid the scheduler needing to looking at runnable again just
# to notice it's still empty. The lock involved there should typically be
# uncontested though, so it's probably not a big deal.
return if fiber.dead?
return false if fiber.dead?

# A timeout event needs special handling because it does not necessarily
# means to resume the fiber directly, in case a different select branch
Expand All @@ -92,6 +112,22 @@ class Crystal::Iocp::EventLoop < Crystal::EventLoop
else
fiber.enqueue
end

# We enqueued a fiber.
true
end

def interrupt : Nil
thread = nil

@lock.sync do
@interrupted.set(true)
thread = @blocked_thread.swap(nil, :acquire)
end
return unless thread

# alert the thread to interrupt GetQueuedCompletionStatusEx
LibC.QueueUserAPC(->(ptr : LibC::ULONG_PTR) {}, thread, LibC::ULONG_PTR.new(0))
end

def enqueue(event : Crystal::Iocp::Event)
Expand Down
6 changes: 4 additions & 2 deletions src/io/overlapped.cr
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,21 @@ module IO::Overlapped
property fiber : Fiber?
end

def self.wait_queued_completions(timeout, &)
def self.wait_queued_completions(timeout, alertable = false, &)
overlapped_entries = uninitialized LibC::OVERLAPPED_ENTRY[1]

if timeout > UInt64::MAX
timeout = LibC::INFINITE
else
timeout = timeout.to_u64
end
result = LibC.GetQueuedCompletionStatusEx(Crystal::EventLoop.current.iocp, overlapped_entries, overlapped_entries.size, out removed, timeout, false)
result = LibC.GetQueuedCompletionStatusEx(Crystal::EventLoop.current.iocp, overlapped_entries, overlapped_entries.size, out removed, timeout, alertable)
if result == 0
error = WinError.value
if timeout && error.wait_timeout?
return true
elsif alertable && error.value == LibC::WAIT_IO_COMPLETION
return true
else
raise IO::Error.from_os_error("GetQueuedCompletionStatusEx", error)
end
Expand Down
1 change: 1 addition & 0 deletions src/lib_c/x86_64-windows-msvc/c/processthreadsapi.cr
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ lib LibC
fun GetProcessTimes(hProcess : HANDLE, lpCreationTime : FILETIME*, lpExitTime : FILETIME*,
lpKernelTime : FILETIME*, lpUserTime : FILETIME*) : BOOL
fun SwitchToThread : BOOL
fun QueueUserAPC(pfnAPC : PAPCFUNC, hThread : HANDLE, dwData : ULONG_PTR) : DWORD

PROCESS_QUERY_INFORMATION = 0x0400
end
2 changes: 2 additions & 0 deletions src/lib_c/x86_64-windows-msvc/c/winnt.cr
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ lib LibC
alias HANDLE = Void*
alias HMODULE = Void*

alias PAPCFUNC = ULONG_PTR ->

INVALID_FILE_ATTRIBUTES = DWORD.new!(-1)
FILE_ATTRIBUTE_DIRECTORY = 0x10
FILE_ATTRIBUTE_HIDDEN = 0x2
Expand Down
Loading