diff --git a/.github/workflows/mingw-w64.yml b/.github/workflows/mingw-w64.yml index eacf6a34c006..a9bbec81e1ce 100644 --- a/.github/workflows/mingw-w64.yml +++ b/.github/workflows/mingw-w64.yml @@ -80,7 +80,7 @@ jobs: cc crystal.obj -o .build/crystal.exe -municode \ $(pkg-config bdw-gc libpcre2-8 iconv zlib libffi --libs) \ $(llvm-config --libs --system-libs --ldflags) \ - -lole32 -lWS2_32 -Wl,--stack,0x800000 + -lole32 -lWS2_32 -lntdll -Wl,--stack,0x800000 - name: Package Crystal shell: msys2 {0} diff --git a/spec/std/crystal/event_loop/polling/timers_spec.cr b/spec/std/crystal/event_loop/timers_spec.cr similarity index 51% rename from spec/std/crystal/event_loop/polling/timers_spec.cr rename to spec/std/crystal/event_loop/timers_spec.cr index 6f6b8a670b08..a474d0a5167c 100644 --- a/spec/std/crystal/event_loop/polling/timers_spec.cr +++ b/spec/std/crystal/event_loop/timers_spec.cr @@ -1,13 +1,26 @@ -{% skip_file unless Crystal::EventLoop.has_constant?(:Polling) %} - require "spec" +require "crystal/event_loop/timers" + +private struct Timer + include Crystal::PointerPairingHeap::Node + + property! wake_at : Time::Span + + def initialize(timeout : Time::Span? = nil) + @wake_at = Time.monotonic + timeout if timeout + end + + def heap_compare(other : Pointer(self)) : Bool + wake_at < other.value.wake_at + end +end -describe Crystal::EventLoop::Polling::Timers do +describe Crystal::EventLoop::Timers do it "#empty?" do - timers = Crystal::EventLoop::Polling::Timers.new + timers = Crystal::EventLoop::Timers(Timer).new timers.empty?.should be_true - event = Crystal::EventLoop::Polling::Event.new(:sleep, Fiber.current, timeout: 7.seconds) + event = Timer.new(7.seconds) timers.add(pointerof(event)) timers.empty?.should be_false @@ -17,13 +30,13 @@ describe Crystal::EventLoop::Polling::Timers do it "#next_ready?" do # empty - timers = Crystal::EventLoop::Polling::Timers.new + timers = Crystal::EventLoop::Timers(Timer).new timers.next_ready?.should be_nil # with events - event1s = Crystal::EventLoop::Polling::Event.new(:sleep, Fiber.current, timeout: 1.second) - event3m = Crystal::EventLoop::Polling::Event.new(:sleep, Fiber.current, timeout: 3.minutes) - event5m = Crystal::EventLoop::Polling::Event.new(:sleep, Fiber.current, timeout: 5.minutes) + event1s = Timer.new(1.second) + event3m = Timer.new(3.minutes) + event5m = Timer.new(5.minutes) timers.add(pointerof(event5m)) timers.next_ready?.should eq(event5m.wake_at?) @@ -36,11 +49,11 @@ describe Crystal::EventLoop::Polling::Timers do end it "#dequeue_ready" do - timers = Crystal::EventLoop::Polling::Timers.new + timers = Crystal::EventLoop::Timers(Timer).new - event1 = Crystal::EventLoop::Polling::Event.new(:sleep, Fiber.current, timeout: 0.seconds) - event2 = Crystal::EventLoop::Polling::Event.new(:sleep, Fiber.current, timeout: 0.seconds) - event3 = Crystal::EventLoop::Polling::Event.new(:sleep, Fiber.current, timeout: 1.minute) + event1 = Timer.new(0.seconds) + event2 = Timer.new(0.seconds) + event3 = Timer.new(1.minute) # empty called = 0 @@ -48,12 +61,12 @@ describe Crystal::EventLoop::Polling::Timers do called.should eq(0) # add events in non chronological order - timers = Crystal::EventLoop::Polling::Timers.new + timers = Crystal::EventLoop::Timers(Timer).new timers.add(pointerof(event1)) timers.add(pointerof(event3)) timers.add(pointerof(event2)) - events = [] of Crystal::EventLoop::Polling::Event* + events = [] of Timer* timers.dequeue_ready { |event| events << event } events.should eq([ @@ -64,12 +77,12 @@ describe Crystal::EventLoop::Polling::Timers do end it "#add" do - timers = Crystal::EventLoop::Polling::Timers.new + timers = Crystal::EventLoop::Timers(Timer).new - event0 = Crystal::EventLoop::Polling::Event.new(:sleep, Fiber.current) - event1 = Crystal::EventLoop::Polling::Event.new(:sleep, Fiber.current, timeout: 0.seconds) - event2 = Crystal::EventLoop::Polling::Event.new(:sleep, Fiber.current, timeout: 2.minutes) - event3 = Crystal::EventLoop::Polling::Event.new(:sleep, Fiber.current, timeout: 1.minute) + event0 = Timer.new + event1 = Timer.new(0.seconds) + event2 = Timer.new(2.minutes) + event3 = Timer.new(1.minute) # add events in non chronological order timers.add(pointerof(event1)).should be_true # added to the head (next ready) @@ -81,13 +94,13 @@ describe Crystal::EventLoop::Polling::Timers do end it "#delete" do - event1 = Crystal::EventLoop::Polling::Event.new(:sleep, Fiber.current, timeout: 0.seconds) - event2 = Crystal::EventLoop::Polling::Event.new(:sleep, Fiber.current, timeout: 0.seconds) - event3 = Crystal::EventLoop::Polling::Event.new(:sleep, Fiber.current, timeout: 1.minute) - event4 = Crystal::EventLoop::Polling::Event.new(:sleep, Fiber.current, timeout: 4.minutes) + event1 = Timer.new(0.seconds) + event2 = Timer.new(0.seconds) + event3 = Timer.new(1.minute) + event4 = Timer.new(4.minutes) # add events in non chronological order - timers = Crystal::EventLoop::Polling::Timers.new + timers = Crystal::EventLoop::Timers(Timer).new timers.add(pointerof(event1)) timers.add(pointerof(event3)) timers.add(pointerof(event2)) diff --git a/src/crystal/event_loop/iocp.cr b/src/crystal/event_loop/iocp.cr index ce3112fa9d1d..5628e99121b1 100644 --- a/src/crystal/event_loop/iocp.cr +++ b/src/crystal/event_loop/iocp.cr @@ -1,146 +1,186 @@ -require "c/ioapiset" -require "crystal/system/print_error" +# forward declaration for the require below to not create a module +class Crystal::EventLoop::IOCP < Crystal::EventLoop +end + +require "c/ntdll" require "../system/win32/iocp" +require "../system/win32/waitable_timer" +require "./timers" +require "./iocp/*" # :nodoc: class Crystal::EventLoop::IOCP < Crystal::EventLoop - # This is a list of resume and timeout events managed outside of IOCP. - @queue = Deque(Event).new - - @lock = Crystal::SpinLock.new - @interrupted = Atomic(Bool).new(false) - @blocked_thread = Atomic(Thread?).new(nil) + @waitable_timer : System::WaitableTimer? + @timer_packet : LibC::HANDLE? + @timer_key : System::IOCP::CompletionKey? + + def initialize + @mutex = Thread::Mutex.new + @timers = Timers(Timer).new + + # the completion port + @iocp = System::IOCP.new + + # custom completion to interrupt a blocking run + @interrupted = Atomic(Bool).new(false) + @interrupt_key = System::IOCP::CompletionKey.new(:interrupt) + + # On Windows 10+ we leverage a high resolution timer with completion packet + # to notify a completion port; on legacy Windows we fallback to the low + # resolution timeout (~15.6ms) + if System::IOCP.wait_completion_packet_methods? + @waitable_timer = System::WaitableTimer.new + @timer_packet = @iocp.create_wait_completion_packet + @timer_key = System::IOCP::CompletionKey.new(:timer) + end + end - # Returns the base IO Completion Port - getter iocp : LibC::HANDLE do - create_completion_port(LibC::INVALID_HANDLE_VALUE, nil) + # Returns the base IO Completion Port. + def iocp_handle : LibC::HANDLE + @iocp.handle end - def create_completion_port(handle : LibC::HANDLE, parent : LibC::HANDLE? = iocp) - iocp = LibC.CreateIoCompletionPort(handle, parent, nil, 0) - if iocp.null? - raise IO::Error.from_winerror("CreateIoCompletionPort") - end - if parent - # all overlapped operations may finish synchronously, in which case we do - # not reschedule the running fiber; the following call tells Win32 not to - # queue an I/O completion packet to the associated IOCP as well, as this - # would be done by default - if LibC.SetFileCompletionNotificationModes(handle, LibC::FILE_SKIP_COMPLETION_PORT_ON_SUCCESS) == 0 - raise IO::Error.from_winerror("SetFileCompletionNotificationModes") - end + def create_completion_port(handle : LibC::HANDLE) : LibC::HANDLE + iocp = LibC.CreateIoCompletionPort(handle, @iocp.handle, nil, 0) + raise IO::Error.from_winerror("CreateIoCompletionPort") if iocp.null? + + # all overlapped operations may finish synchronously, in which case we do + # not reschedule the running fiber; the following call tells Win32 not to + # queue an I/O completion packet to the associated IOCP as well, as this + # would be done by default + if LibC.SetFileCompletionNotificationModes(handle, LibC::FILE_SKIP_COMPLETION_PORT_ON_SUCCESS) == 0 + raise IO::Error.from_winerror("SetFileCompletionNotificationModes") end + iocp end + def run(blocking : Bool) : Bool + enqueued = false + + run_impl(blocking) do |fiber| + fiber.enqueue + enqueued = true + end + + enqueued + end + # Runs the event loop and enqueues the fiber for the next upcoming event or # completion. - def run(blocking : Bool) : Bool - # Pull the next upcoming event from the event queue. This determines the - # timeout for waiting on the completion port. - # OPTIMIZE: Implement @queue as a priority queue in order to avoid this - # explicit search for the lowest value and dequeue more efficient. - next_event = @queue.min_by?(&.wake_at) - - # no registered events: nothing to wait for - return false unless next_event - - now = Time.monotonic - - if next_event.wake_at > now - # There is no event ready to wake. We wait for completions until the next - # event wake time, unless nonblocking or already interrupted (timeout - # immediately). - if blocking - @lock.sync do - if @interrupted.get(:acquire) - blocking = false - else - # memorize the blocked thread (so we can alert it) - @blocked_thread.set(Thread.current, :release) - end - end + private def run_impl(blocking : Bool, &) : Nil + Crystal.trace :evloop, "run", blocking: blocking ? 1 : 0 + + if @waitable_timer + timeout = blocking ? LibC::INFINITE : 0_i64 + elsif blocking + if time = @mutex.synchronize { @timers.next_ready? } + # convert absolute time of next timer to relative time, expressed in + # milliseconds, rounded up + seconds, nanoseconds = System::Time.monotonic + relative = time - Time::Span.new(seconds: seconds, nanoseconds: nanoseconds) + timeout = (relative.to_i * 1000 + (relative.nanoseconds + 999_999) // 1_000_000).clamp(0_i64..) + else + timeout = LibC::INFINITE end + else + timeout = 0_i64 + end - wait_time = blocking ? (next_event.wake_at - now).total_milliseconds : 0 - timed_out = System::IOCP.wait_queued_completions(wait_time, alertable: blocking) do |fiber| - # This block may run multiple times. Every single fiber gets enqueued. - fiber.enqueue + # the array must be at least as large as `overlapped_entries` in + # `System::IOCP#wait_queued_completions` + events = uninitialized FiberEvent[64] + size = 0 + + @iocp.wait_queued_completions(timeout) do |fiber| + if (event = fiber.@resume_event) && event.wake_at? + events[size] = event + size += 1 end + yield fiber + end - @blocked_thread.set(nil, :release) - @interrupted.set(false, :release) + @mutex.synchronize do + # cancel the timeout of completed operations + events.to_slice[0...size].each do |event| + @timers.delete(pointerof(event.@timer)) + event.clear + end - # The wait for completion enqueued events. - return true unless timed_out + # run expired timers + @timers.dequeue_ready do |timer| + process_timer(timer) { |fiber| yield fiber } + end - # Wait for completion timed out but it may have been interrupted or we ask - # for immediate timeout (nonblocking), so we check for the next event - # readiness again: - return false if next_event.wake_at > Time.monotonic + # update timer + rearm_waitable_timer(@timers.next_ready?, interruptible: false) end - # next_event gets activated because its wake time is passed, either from the - # start or because completion wait has timed out. - - dequeue next_event - - fiber = next_event.fiber + @interrupted.set(false, :release) + end - # If the waiting fiber was already shut down in the mean time, we can just - # abandon here. There's no need to go for the next event because the scheduler - # will just try again. - # OPTIMIZE: It might still be worth considering to start over from the top - # or call recursively, in order to ensure at least one fiber get enqueued. - # This would avoid the scheduler needing to looking at runnable again just - # to notice it's still empty. The lock involved there should typically be - # uncontested though, so it's probably not a big deal. - return false if fiber.dead? + private def process_timer(timer : Pointer(Timer), &) + fiber = timer.value.fiber - # A timeout event needs special handling because it does not necessarily - # means to resume the fiber directly, in case a different select branch - # was already activated. - if next_event.timeout? && (select_action = fiber.timeout_select_action) + case timer.value.type + in .sleep? + timer.value.timed_out! + fiber.@resume_event.as(FiberEvent).clear + in .select_timeout? + return unless select_action = fiber.timeout_select_action fiber.timeout_select_action = nil - select_action.time_expired(fiber) - else - fiber.enqueue + return unless select_action.time_expired? + fiber.@timeout_event.as(FiberEvent).clear end - # We enqueued a fiber. - true + yield fiber end def interrupt : Nil - thread = nil - - @lock.sync do - @interrupted.set(true) - thread = @blocked_thread.swap(nil, :acquire) + unless @interrupted.get(:acquire) + @iocp.post_queued_completion_status(@interrupt_key) end - return unless thread + end - # alert the thread to interrupt GetQueuedCompletionStatusEx - LibC.QueueUserAPC(->(ptr : LibC::ULONG_PTR) { }, thread, LibC::ULONG_PTR.new(0)) + protected def add_timer(timer : Pointer(Timer)) : Nil + @mutex.synchronize do + is_next_ready = @timers.add(timer) + rearm_waitable_timer(timer.value.wake_at, interruptible: true) if is_next_ready + end end - def enqueue(event : Event) - unless @queue.includes?(event) - @queue << event + protected def delete_timer(timer : Pointer(Timer)) : Nil + @mutex.synchronize do + _, was_next_ready = @timers.delete(timer) + rearm_waitable_timer(@timers.next_ready?, interruptible: false) if was_next_ready end end - def dequeue(event : Event) - @queue.delete(event) + protected def rearm_waitable_timer(time : Time::Span?, interruptible : Bool) : Nil + if waitable_timer = @waitable_timer + status = @iocp.cancel_wait_completion_packet(@timer_packet.not_nil!, true) + if time + waitable_timer.set(time) + if status == LibC::STATUS_PENDING + interrupt + else + # STATUS_CANCELLED, STATUS_SUCCESS + @iocp.associate_wait_completion_packet(@timer_packet.not_nil!, waitable_timer.handle, @timer_key.not_nil!) + end + else + waitable_timer.cancel + end + elsif interruptible + interrupt + end end - # Create a new resume event for a fiber. - def create_resume_event(fiber : Fiber) : Crystal::EventLoop::Event - Event.new(fiber) + def create_resume_event(fiber : Fiber) : EventLoop::Event + FiberEvent.new(:sleep, fiber) end - def create_timeout_event(fiber) : Crystal::EventLoop::Event - Event.new(fiber, timeout: true) + def create_timeout_event(fiber : Fiber) : EventLoop::Event + FiberEvent.new(:select_timeout, fiber) end def read(file_descriptor : Crystal::System::FileDescriptor, slice : Bytes) : Int32 @@ -278,28 +318,3 @@ class Crystal::EventLoop::IOCP < Crystal::EventLoop def remove(socket : ::Socket) : Nil end end - -class Crystal::EventLoop::IOCP::Event - include Crystal::EventLoop::Event - - getter fiber - getter wake_at - getter? timeout - - def initialize(@fiber : Fiber, @wake_at = Time.monotonic, *, @timeout = false) - end - - # Frees the event - def free : Nil - Crystal::EventLoop.current.dequeue(self) - end - - def delete - free - end - - def add(timeout : Time::Span) : Nil - @wake_at = Time.monotonic + timeout - Crystal::EventLoop.current.enqueue(self) - end -end diff --git a/src/crystal/event_loop/iocp/fiber_event.cr b/src/crystal/event_loop/iocp/fiber_event.cr new file mode 100644 index 000000000000..481648016210 --- /dev/null +++ b/src/crystal/event_loop/iocp/fiber_event.cr @@ -0,0 +1,34 @@ +class Crystal::EventLoop::IOCP::FiberEvent + include Crystal::EventLoop::Event + + delegate type, wake_at, wake_at?, fiber, timed_out?, to: @timer + + def initialize(type : Timer::Type, fiber : Fiber) + @timer = Timer.new(type, fiber) + end + + # io timeout, sleep, or select timeout + def add(timeout : Time::Span) : Nil + seconds, nanoseconds = System::Time.monotonic + now = Time::Span.new(seconds: seconds, nanoseconds: nanoseconds) + @timer.wake_at = now + timeout + EventLoop.current.add_timer(pointerof(@timer)) + end + + # select timeout has been cancelled + def delete : Nil + return unless @timer.wake_at? + EventLoop.current.delete_timer(pointerof(@timer)) + clear + end + + # fiber died + def free : Nil + delete + end + + # the timer triggered (already dequeued from eventloop) + def clear : Nil + @timer.wake_at = nil + end +end diff --git a/src/crystal/event_loop/iocp/timer.cr b/src/crystal/event_loop/iocp/timer.cr new file mode 100644 index 000000000000..b7284d53e130 --- /dev/null +++ b/src/crystal/event_loop/iocp/timer.cr @@ -0,0 +1,40 @@ +# NOTE: this struct is only needed to be able to re-use `PointerPairingHeap` +# because EventLoop::Polling uses pointers. If `EventLoop::Polling::Event` was a +# reference, then `PairingHeap` wouldn't need pointers, and this struct could be +# merged into `Event`. +struct Crystal::EventLoop::IOCP::Timer + enum Type + Sleep + SelectTimeout + end + + getter type : Type + + # The `Fiber` that is waiting on the event and that the `EventLoop` shall + # resume. + getter fiber : Fiber + + # The absolute time, against the monotonic clock, at which a timed event shall + # trigger. Nil for IO events without a timeout. + getter! wake_at : Time::Span + + # True if an IO event has timed out (i.e. we're past `#wake_at`). + getter? timed_out : Bool = false + + # The event can be added to the `Timers` list. + include PointerPairingHeap::Node + + def initialize(@type : Type, @fiber) + end + + def wake_at=(@wake_at) + end + + def timed_out! : Bool + @timed_out = true + end + + def heap_compare(other : Pointer(self)) : Bool + wake_at < other.value.wake_at + end +end diff --git a/src/crystal/event_loop/polling.cr b/src/crystal/event_loop/polling.cr index 0df0b134c7f4..774cc7060715 100644 --- a/src/crystal/event_loop/polling.cr +++ b/src/crystal/event_loop/polling.cr @@ -2,6 +2,7 @@ abstract class Crystal::EventLoop::Polling < Crystal::EventLoop; end require "./polling/*" +require "./timers" module Crystal::System::FileDescriptor # user data (generation index for the arena) @@ -96,7 +97,7 @@ abstract class Crystal::EventLoop::Polling < Crystal::EventLoop end @lock = SpinLock.new # protects parallel accesses to @timers - @timers = Timers.new + @timers = Timers(Event).new # reset the mutexes since another thread may have acquired the lock of one # event loop, which would prevent closing file descriptors for example. diff --git a/src/crystal/event_loop/polling/timers.cr b/src/crystal/event_loop/timers.cr similarity index 75% rename from src/crystal/event_loop/polling/timers.cr rename to src/crystal/event_loop/timers.cr index b9191f008f46..0ea686efad82 100644 --- a/src/crystal/event_loop/polling/timers.cr +++ b/src/crystal/event_loop/timers.cr @@ -1,15 +1,17 @@ require "crystal/pointer_pairing_heap" -# List of `Event` ordered by `Event#wake_at` ascending. Optimized for fast -# dequeue and determining when is the next timer event. +# List of `Pointer(T)` to `T` structs. # -# Thread unsafe: parallel accesses much be protected! +# Internally wraps a `PointerPairingHeap(T)` and thus requires that `T` +# implements `PointerPairingHeap::Node`. +# +# Thread unsafe: parallel accesses must be protected! # # NOTE: this is a struct because it only wraps a const pointer to an object # allocated in the heap. -struct Crystal::EventLoop::Polling::Timers +struct Crystal::EventLoop::Timers(T) def initialize - @heap = PointerPairingHeap(Event).new + @heap = PointerPairingHeap(T).new end def empty? : Bool @@ -24,7 +26,7 @@ struct Crystal::EventLoop::Polling::Timers # Dequeues and yields each ready timer (their `#wake_at` is lower than # `System::Time.monotonic`) from the oldest to the most recent (i.e. time # ascending). - def dequeue_ready(& : Event* -> Nil) : Nil + def dequeue_ready(& : Pointer(T) -> Nil) : Nil seconds, nanoseconds = System::Time.monotonic now = Time::Span.new(seconds: seconds, nanoseconds: nanoseconds) @@ -36,7 +38,7 @@ struct Crystal::EventLoop::Polling::Timers end # Add a new timer into the list. Returns true if it is the next ready timer. - def add(event : Event*) : Bool + def add(event : Pointer(T)) : Bool @heap.add(event) @heap.first? == event end @@ -44,7 +46,7 @@ struct Crystal::EventLoop::Polling::Timers # Remove a timer from the list. Returns a tuple(dequeued, was_next_ready) of # booleans. The first bool tells whether the event was dequeued, in which case # the second one tells if it was the next ready event. - def delete(event : Event*) : {Bool, Bool} + def delete(event : Pointer(T)) : {Bool, Bool} if @heap.first? == event @heap.shift? {true, true} diff --git a/src/crystal/system/win32/addrinfo.cr b/src/crystal/system/win32/addrinfo.cr index da5cb6ce20c3..24cff9c9aec3 100644 --- a/src/crystal/system/win32/addrinfo.cr +++ b/src/crystal/system/win32/addrinfo.cr @@ -43,7 +43,7 @@ module Crystal::System::Addrinfo end end - IOCP::GetAddrInfoOverlappedOperation.run(Crystal::EventLoop.current.iocp) do |operation| + IOCP::GetAddrInfoOverlappedOperation.run(Crystal::EventLoop.current.iocp_handle) do |operation| completion_routine = LibC::LPLOOKUPSERVICE_COMPLETION_ROUTINE.new do |dwError, dwBytes, lpOverlapped| orig_operation = IOCP::GetAddrInfoOverlappedOperation.unbox(lpOverlapped) LibC.PostQueuedCompletionStatus(orig_operation.iocp, 0, 0, lpOverlapped) diff --git a/src/crystal/system/win32/file_descriptor.cr b/src/crystal/system/win32/file_descriptor.cr index 894fcfaf5cb1..4a99d82e9134 100644 --- a/src/crystal/system/win32/file_descriptor.cr +++ b/src/crystal/system/win32/file_descriptor.cr @@ -489,7 +489,7 @@ private module ConsoleUtils @@read_requests << ReadRequest.new( handle: handle, slice: slice, - iocp: Crystal::EventLoop.current.iocp, + iocp: Crystal::EventLoop.current.iocp_handle, completion_key: Crystal::System::IOCP::CompletionKey.new(:stdin_read, ::Fiber.current), ) @@read_cv.signal diff --git a/src/crystal/system/win32/iocp.cr b/src/crystal/system/win32/iocp.cr index fece9ada3a83..70048d24cf8c 100644 --- a/src/crystal/system/win32/iocp.cr +++ b/src/crystal/system/win32/iocp.cr @@ -1,14 +1,63 @@ {% skip_file unless flag?(:win32) %} require "c/handleapi" +require "c/ioapiset" +require "c/ntdll" require "crystal/system/thread_linked_list" # :nodoc: -module Crystal::System::IOCP +struct Crystal::System::IOCP + @@wait_completion_packet_methods : Bool? = nil + + {% if flag?(:interpreted) %} + # We can't load the symbols from interpreted code since it would create + # interpreted Proc. We thus merely check for the existence of the symbols, + # then let the interpreter load the symbols, which will create interpreter + # Proc (not interpreted) that can be called. + class_getter?(wait_completion_packet_methods : Bool) do + detect_wait_completion_packet_methods + end + + private def self.detect_wait_completion_packet_methods : Bool + if handle = LibC.LoadLibraryExW(Crystal::System.to_wstr("ntdll.dll"), nil, 0) + !LibC.GetProcAddress(handle, "NtCreateWaitCompletionPacket").null? + else + false + end + end + {% else %} + @@_NtCreateWaitCompletionPacket = uninitialized LibNTDLL::NtCreateWaitCompletionPacketProc + @@_NtAssociateWaitCompletionPacket = uninitialized LibNTDLL::NtAssociateWaitCompletionPacketProc + @@_NtCancelWaitCompletionPacket = uninitialized LibNTDLL::NtCancelWaitCompletionPacketProc + + class_getter?(wait_completion_packet_methods : Bool) do + load_wait_completion_packet_methods + end + + private def self.load_wait_completion_packet_methods : Bool + handle = LibC.LoadLibraryExW(Crystal::System.to_wstr("ntdll.dll"), nil, 0) + return false if handle.null? + + pointer = LibC.GetProcAddress(handle, "NtCreateWaitCompletionPacket") + return false if pointer.null? + @@_NtCreateWaitCompletionPacket = LibNTDLL::NtCreateWaitCompletionPacketProc.new(pointer, Pointer(Void).null) + + pointer = LibC.GetProcAddress(handle, "NtAssociateWaitCompletionPacket") + @@_NtAssociateWaitCompletionPacket = LibNTDLL::NtAssociateWaitCompletionPacketProc.new(pointer, Pointer(Void).null) + + pointer = LibC.GetProcAddress(handle, "NtCancelWaitCompletionPacket") + @@_NtCancelWaitCompletionPacket = LibNTDLL::NtCancelWaitCompletionPacketProc.new(pointer, Pointer(Void).null) + + true + end + {% end %} + # :nodoc: class CompletionKey enum Tag ProcessRun StdinRead + Interrupt + Timer end property fiber : ::Fiber? @@ -16,17 +65,35 @@ module Crystal::System::IOCP def initialize(@tag : Tag, @fiber : ::Fiber? = nil) end + + def valid?(number_of_bytes_transferred) + case tag + in .process_run? + number_of_bytes_transferred.in?(LibC::JOB_OBJECT_MSG_EXIT_PROCESS, LibC::JOB_OBJECT_MSG_ABNORMAL_EXIT_PROCESS) + in .stdin_read?, .interrupt?, .timer? + true + end + end + end + + getter handle : LibC::HANDLE + + def initialize + @handle = LibC.CreateIoCompletionPort(LibC::INVALID_HANDLE_VALUE, nil, nil, 0) + raise IO::Error.from_winerror("CreateIoCompletionPort") if @handle.null? end - def self.wait_queued_completions(timeout, alertable = false, &) - overlapped_entries = uninitialized LibC::OVERLAPPED_ENTRY[1] + def wait_queued_completions(timeout, alertable = false, &) + overlapped_entries = uninitialized LibC::OVERLAPPED_ENTRY[64] if timeout > UInt64::MAX timeout = LibC::INFINITE else timeout = timeout.to_u64 end - result = LibC.GetQueuedCompletionStatusEx(Crystal::EventLoop.current.iocp, overlapped_entries, overlapped_entries.size, out removed, timeout, alertable) + + result = LibC.GetQueuedCompletionStatusEx(@handle, overlapped_entries, overlapped_entries.size, out removed, timeout, alertable) + if result == 0 error = WinError.value if timeout && error.wait_timeout? @@ -42,17 +109,21 @@ module Crystal::System::IOCP raise IO::Error.new("GetQueuedCompletionStatusEx returned 0") end + # TODO: wouldn't the processing fit better in `EventLoop::IOCP#run`? removed.times do |i| entry = overlapped_entries[i] - # at the moment only `::Process#wait` uses a non-nil completion key; all - # I/O operations, including socket ones, do not set this field + # See `CompletionKey` for the operations that use a non-nil completion + # key. All IO operations (include File, Socket) do not set this field. case completion_key = Pointer(Void).new(entry.lpCompletionKey).as(CompletionKey?) in Nil operation = OverlappedOperation.unbox(entry.lpOverlapped) + Crystal.trace :evloop, "operation", op: operation.class.name, fiber: operation.@fiber operation.schedule { |fiber| yield fiber } in CompletionKey - if completion_key_valid?(completion_key, entry.dwNumberOfBytesTransferred) + Crystal.trace :evloop, "completion", tag: completion_key.tag.to_s, bytes: entry.dwNumberOfBytesTransferred, fiber: completion_key.fiber + + if completion_key.valid?(entry.dwNumberOfBytesTransferred) # if `Process` exits before a call to `#wait`, this fiber will be # reset already if fiber = completion_key.fiber @@ -69,12 +140,52 @@ module Crystal::System::IOCP false end - private def self.completion_key_valid?(completion_key, number_of_bytes_transferred) - case completion_key.tag - in .process_run? - number_of_bytes_transferred.in?(LibC::JOB_OBJECT_MSG_EXIT_PROCESS, LibC::JOB_OBJECT_MSG_ABNORMAL_EXIT_PROCESS) - in .stdin_read? - true + def post_queued_completion_status(completion_key : CompletionKey, number_of_bytes_transferred = 0) + result = LibC.PostQueuedCompletionStatus(@handle, number_of_bytes_transferred, completion_key.as(Void*).address, nil) + raise RuntimeError.from_winerror("PostQueuedCompletionStatus") if result == 0 + end + + def create_wait_completion_packet : LibC::HANDLE + packet_handle = LibC::HANDLE.null + object_attributes = Pointer(LibC::OBJECT_ATTRIBUTES).null + status = + {% if flag?(:interpreted) %} + LibNTDLL.NtCreateWaitCompletionPacket(pointerof(packet_handle), LibNTDLL::GENERIC_ALL, object_attributes) + {% else %} + @@_NtCreateWaitCompletionPacket.call(pointerof(packet_handle), LibNTDLL::GENERIC_ALL, object_attributes) + {% end %} + raise RuntimeError.from_os_error("NtCreateWaitCompletionPacket", WinError.from_ntstatus(status)) unless status == 0 + packet_handle + end + + def associate_wait_completion_packet(wait_handle : LibC::HANDLE, target_handle : LibC::HANDLE, completion_key : CompletionKey) : Bool + signaled = 0_u8 + status = + {% if flag?(:interpreted) %} + LibNTDLL.NtAssociateWaitCompletionPacket(wait_handle, @handle, + target_handle, completion_key.as(Void*), nil, 0, nil, pointerof(signaled)) + {% else %} + @@_NtAssociateWaitCompletionPacket.call(wait_handle, @handle, + target_handle, completion_key.as(Void*), Pointer(Void).null, + LibNTDLL::NTSTATUS.new!(0), Pointer(LibC::ULONG).null, + pointerof(signaled)) + {% end %} + raise RuntimeError.from_os_error("NtAssociateWaitCompletionPacket", WinError.from_ntstatus(status)) unless status == 0 + signaled == 1 + end + + def cancel_wait_completion_packet(wait_handle : LibC::HANDLE, remove_signaled : Bool) : LibNTDLL::NTSTATUS + status = + {% if flag?(:interpreted) %} + LibNTDLL.NtCancelWaitCompletionPacket(wait_handle, remove_signaled ? 1 : 0) + {% else %} + @@_NtCancelWaitCompletionPacket.call(wait_handle, remove_signaled ? 1_u8 : 0_u8) + {% end %} + case status + when LibC::STATUS_CANCELLED, LibC::STATUS_SUCCESS, LibC::STATUS_PENDING + status + else + raise RuntimeError.from_os_error("NtCancelWaitCompletionPacket", WinError.from_ntstatus(status)) end end @@ -112,23 +223,29 @@ module Crystal::System::IOCP end private def done! - @fiber.cancel_timeout @state = :done end private def wait_for_completion(timeout) if timeout - sleep timeout - else + event = ::Fiber.current.resume_event + event.add(timeout) + ::Fiber.suspend - end - unless @state.done? - if try_cancel - # Wait for cancellation to complete. We must not free the operation - # until it's completed. + if event.timed_out? + # By the time the fiber was resumed, the operation may have completed + # concurrently. + return if @state.done? + return unless try_cancel + + # We cancelled the operation or failed to cancel it (e.g. race + # condition), we must suspend the fiber again until the completion + # port is notified of the actual result. ::Fiber.suspend end + else + ::Fiber.suspend end end end diff --git a/src/crystal/system/win32/process.cr b/src/crystal/system/win32/process.cr index 7031654d2299..5eb02d826c3b 100644 --- a/src/crystal/system/win32/process.cr +++ b/src/crystal/system/win32/process.cr @@ -37,7 +37,7 @@ struct Crystal::System::Process LibC::JOBOBJECTINFOCLASS::AssociateCompletionPortInformation, LibC::JOBOBJECT_ASSOCIATE_COMPLETION_PORT.new( completionKey: @completion_key.as(Void*), - completionPort: Crystal::EventLoop.current.iocp, + completionPort: Crystal::EventLoop.current.iocp_handle, ), ) diff --git a/src/crystal/system/win32/waitable_timer.cr b/src/crystal/system/win32/waitable_timer.cr new file mode 100644 index 000000000000..68ec821d6922 --- /dev/null +++ b/src/crystal/system/win32/waitable_timer.cr @@ -0,0 +1,38 @@ +require "c/ntdll" +require "c/synchapi" +require "c/winternl" + +class Crystal::System::WaitableTimer + getter handle : LibC::HANDLE + + def initialize + flags = LibC::CREATE_WAITABLE_TIMER_HIGH_RESOLUTION + desired_access = LibC::SYNCHRONIZE | LibC::TIMER_QUERY_STATE | LibC::TIMER_MODIFY_STATE + @handle = LibC.CreateWaitableTimerExW(nil, nil, flags, desired_access) + raise RuntimeError.from_winerror("CreateWaitableTimerExW") if @handle.null? + end + + def set(time : ::Time::Span) : Nil + # convert absolute time to relative time, expressed in 100ns interval, + # rounded up + seconds, nanoseconds = System::Time.monotonic + relative = time - ::Time::Span.new(seconds: seconds, nanoseconds: nanoseconds) + ticks = (relative.to_i * 10_000_000 + (relative.nanoseconds + 99) // 100).clamp(0_i64..) + + # negative duration means relative time (positive would mean absolute + # realtime clock) + duration = -ticks + + ret = LibC.SetWaitableTimer(@handle, pointerof(duration), 0, nil, nil, 0) + raise RuntimeError.from_winerror("SetWaitableTimer") if ret == 0 + end + + def cancel : Nil + ret = LibC.CancelWaitableTimer(@handle) + raise RuntimeError.from_winerror("CancelWaitableTimer") if ret == 0 + end + + def close : Nil + LibC.CloseHandle(@handle) + end +end diff --git a/src/kernel.cr b/src/kernel.cr index 2063acce95ae..34763b994839 100644 --- a/src/kernel.cr +++ b/src/kernel.cr @@ -617,19 +617,6 @@ end {% end %} {% end %} -# This is a temporary workaround to ensure there is always something in the IOCP -# event loop being awaited, since both the interrupt loop and the fiber stack -# pool collector are disabled in interpreted code. Without this, asynchronous -# code that bypasses `Crystal::System::IOCP::OverlappedOperation` does not currently -# work, see https://github.com/crystal-lang/crystal/pull/14949#issuecomment-2328314463 -{% if flag?(:interpreted) && flag?(:win32) %} - spawn(name: "Interpreter idle loop") do - while true - sleep 1.day - end - end -{% end %} - {% if flag?(:interpreted) && flag?(:unix) && Crystal::Interpreter.has_method?(:signal_descriptor) %} Crystal::System::Signal.setup_default_handlers {% end %} diff --git a/src/lib_c/x86_64-windows-msvc/c/ntdef.cr b/src/lib_c/x86_64-windows-msvc/c/ntdef.cr new file mode 100644 index 000000000000..a9a07a07b27e --- /dev/null +++ b/src/lib_c/x86_64-windows-msvc/c/ntdef.cr @@ -0,0 +1,16 @@ +lib LibC + struct UNICODE_STRING + length : USHORT + maximumLength : USHORT + buffer : LPWSTR + end + + struct OBJECT_ATTRIBUTES + length : ULONG + rootDirectory : HANDLE + objectName : UNICODE_STRING* + attributes : ULONG + securityDescriptor : Void* + securityQualityOfService : Void* + end +end diff --git a/src/lib_c/x86_64-windows-msvc/c/ntdll.cr b/src/lib_c/x86_64-windows-msvc/c/ntdll.cr new file mode 100644 index 000000000000..8d2653b8bb31 --- /dev/null +++ b/src/lib_c/x86_64-windows-msvc/c/ntdll.cr @@ -0,0 +1,36 @@ +require "c/ntdef" +require "c/winnt" + +@[Link("ntdll")] +lib LibNTDLL + alias NTSTATUS = LibC::ULONG + alias ACCESS_MASK = LibC::DWORD + + GENERIC_ALL = 0x10000000_u32 + + alias NtCreateWaitCompletionPacketProc = Proc(LibC::HANDLE*, ACCESS_MASK, LibC::OBJECT_ATTRIBUTES*, NTSTATUS) + alias NtAssociateWaitCompletionPacketProc = Proc(LibC::HANDLE, LibC::HANDLE, LibC::HANDLE, Void*, Void*, NTSTATUS, LibC::ULONG*, LibC::BOOLEAN*, NTSTATUS) + alias NtCancelWaitCompletionPacketProc = Proc(LibC::HANDLE, LibC::BOOLEAN, NTSTATUS) + + fun NtCreateWaitCompletionPacket( + waitCompletionPacketHandle : LibC::HANDLE*, + desiredAccess : ACCESS_MASK, + objectAttributes : LibC::OBJECT_ATTRIBUTES*, + ) : NTSTATUS + + fun NtAssociateWaitCompletionPacket( + waitCompletionPacketHandle : LibC::HANDLE, + ioCompletionHandle : LibC::HANDLE, + targetObjectHandle : LibC::HANDLE, + keyContext : Void*, + apcContext : Void*, + ioStatus : NTSTATUS, + ioStatusInformation : LibC::ULONG*, + alreadySignaled : LibC::BOOLEAN*, + ) : NTSTATUS + + fun NtCancelWaitCompletionPacket( + waitCompletionPacketHandle : LibC::HANDLE, + removeSignaledPacket : LibC::BOOLEAN, + ) : NTSTATUS +end diff --git a/src/lib_c/x86_64-windows-msvc/c/ntstatus.cr b/src/lib_c/x86_64-windows-msvc/c/ntstatus.cr index 2a013036adb4..0596c641bcc3 100644 --- a/src/lib_c/x86_64-windows-msvc/c/ntstatus.cr +++ b/src/lib_c/x86_64-windows-msvc/c/ntstatus.cr @@ -1,6 +1,7 @@ require "lib_c" lib LibC + STATUS_SUCCESS = 0x00000000_u32 STATUS_FATAL_APP_EXIT = 0x40000015_u32 STATUS_DATATYPE_MISALIGNMENT = 0x80000002_u32 STATUS_BREAKPOINT = 0x80000003_u32 @@ -13,5 +14,6 @@ lib LibC STATUS_FLOAT_UNDERFLOW = 0xC0000093_u32 STATUS_PRIVILEGED_INSTRUCTION = 0xC0000096_u32 STATUS_STACK_OVERFLOW = 0xC00000FD_u32 + STATUS_CANCELLED = 0xC0000120_u32 STATUS_CONTROL_C_EXIT = 0xC000013A_u32 end diff --git a/src/lib_c/x86_64-windows-msvc/c/synchapi.cr b/src/lib_c/x86_64-windows-msvc/c/synchapi.cr index e101b7f6284b..e85f0af1eb8f 100644 --- a/src/lib_c/x86_64-windows-msvc/c/synchapi.cr +++ b/src/lib_c/x86_64-windows-msvc/c/synchapi.cr @@ -32,4 +32,11 @@ lib LibC fun Sleep(dwMilliseconds : DWORD) fun WaitForSingleObject(hHandle : HANDLE, dwMilliseconds : DWORD) : DWORD + + alias PTIMERAPCROUTINE = (Void*, DWORD, DWORD) -> + CREATE_WAITABLE_TIMER_HIGH_RESOLUTION = 0x00000002_u32 + + fun CreateWaitableTimerExW(lpTimerAttributes : SECURITY_ATTRIBUTES*, lpTimerName : LPWSTR, dwFlags : DWORD, dwDesiredAccess : DWORD) : HANDLE + fun SetWaitableTimer(hTimer : HANDLE, lpDueTime : LARGE_INTEGER*, lPeriod : LONG, pfnCompletionRoutine : PTIMERAPCROUTINE*, lpArgToCompletionRoutine : Void*, fResume : BOOL) : BOOL + fun CancelWaitableTimer(hTimer : HANDLE) : BOOL end diff --git a/src/lib_c/x86_64-windows-msvc/c/winnt.cr b/src/lib_c/x86_64-windows-msvc/c/winnt.cr index 99c8f24ac9e1..1bee1cb173ab 100644 --- a/src/lib_c/x86_64-windows-msvc/c/winnt.cr +++ b/src/lib_c/x86_64-windows-msvc/c/winnt.cr @@ -3,6 +3,8 @@ require "c/int_safe" lib LibC alias BOOLEAN = BYTE alias LONG = Int32 + alias ULONG = UInt32 + alias USHORT = UInt16 alias LARGE_INTEGER = Int64 alias CHAR = UChar @@ -469,4 +471,7 @@ lib LibC alias IMAGE_NT_HEADERS = IMAGE_NT_HEADERS64 alias IMAGE_THUNK_DATA = IMAGE_THUNK_DATA64 IMAGE_ORDINAL_FLAG = IMAGE_ORDINAL_FLAG64 + + TIMER_QUERY_STATE = 0x0001 + TIMER_MODIFY_STATE = 0x0002 end diff --git a/src/lib_c/x86_64-windows-msvc/c/winternl.cr b/src/lib_c/x86_64-windows-msvc/c/winternl.cr new file mode 100644 index 000000000000..7046370a1035 --- /dev/null +++ b/src/lib_c/x86_64-windows-msvc/c/winternl.cr @@ -0,0 +1,4 @@ +@[Link("ntdll")] +lib LibNTDLL + fun RtlNtStatusToDosError(status : LibC::ULONG) : LibC::ULONG +end diff --git a/src/winerror.cr b/src/winerror.cr index 844df5b07315..ae4eceb1f18e 100644 --- a/src/winerror.cr +++ b/src/winerror.cr @@ -2,6 +2,7 @@ require "c/winbase" require "c/errhandlingapi" require "c/winsock2" + require "c/winternl" {% end %} # `WinError` represents Windows' [System Error Codes](https://docs.microsoft.com/en-us/windows/win32/debug/system-error-codes#system-error-codes-1). @@ -54,6 +55,14 @@ enum WinError : UInt32 {% end %} end + def self.from_ntstatus(status) : self + {% if flag?(:win32) %} + WinError.new(LibNTDLL.RtlNtStatusToDosError(status)) + {% else %} + raise NotImplementedError.new("WinError.from_ntstatus") + {% end %} + end + # Returns the system error message associated with this error code. # # The message is retrieved via [`FormatMessageW`](https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-formatmessagew)