From a2be73bb79ce96abedf938e0746a0827b9e30e04 Mon Sep 17 00:00:00 2001 From: Julien Portalier Date: Thu, 5 Dec 2024 18:09:12 +0100 Subject: [PATCH 01/14] Add Crystal.print_error_buffered Writes a message to a growable, in-memory buffer, before writing to STDERR in a single write (possibly atomic, depending on PIPE_BUF), instead of having many writes to the IO which will be intermingled with other writes and be completely unintelligible. --- src/crystal/print_buffered.cr | 43 +++++++++++++++++++++++++++++++++++ src/fiber.cr | 15 +++--------- 2 files changed, 46 insertions(+), 12 deletions(-) create mode 100644 src/crystal/print_buffered.cr diff --git a/src/crystal/print_buffered.cr b/src/crystal/print_buffered.cr new file mode 100644 index 000000000000..854fa0951021 --- /dev/null +++ b/src/crystal/print_buffered.cr @@ -0,0 +1,43 @@ +module Crystal + # Prepares an error message, with an optional exception or backtrace, to an + # in-memory buffer, before writing to an IO, usually STDERR, in a single write + # operation. + # + # Avoids intermingled messages caused by multiple threads writing to a STDIO + # in parallel. This may still happen, since writes may not be atomic when the + # overall size is larger than PIPE_BUF, buf should at least write 512 bytes + # atomically. + def self.print_buffered(message : String, *args, to io : IO, exception = nil, backtrace = nil) : Nil + buf = buffered_message(message, *args, exception: exception, backtrace: backtrace) + io.write(buf.to_slice) + io.flush unless io.sync? + end + + # Identical to `#print_buffered` but eventually calls + # `System.print_error(bytes)` to write to stderr without going through the + # event loop. + def self.print_error_buffered(message : String, *args, exception = nil, backtrace = nil) : Nil + buf = buffered_message(message, *args, exception: exception, backtrace: backtrace) + System.print_error(buf.to_slice) + end + + def self.buffered_message(message : String, *args, exception = nil, backtrace = nil) + buf = IO::Memory.new(4096) + + if args.empty? + buf << message + else + System.printf(message, *args) { |bytes| buf.write(bytes) } + end + + if exception + buf << ": " + exception.inspect_with_backtrace(buf) + else + buf.puts + backtrace.try(&.each { |line| buf << " from " << line << '\n' }) + end + + buf + end +end diff --git a/src/fiber.cr b/src/fiber.cr index 55745666c66d..b34a8762037d 100644 --- a/src/fiber.cr +++ b/src/fiber.cr @@ -1,4 +1,5 @@ require "crystal/system/thread_linked_list" +require "crystal/print_buffered" require "./fiber/context" # :nodoc: @@ -147,21 +148,11 @@ class Fiber GC.unlock_read @proc.call rescue ex - io = {% if flag?(:preview_mt) %} - IO::Memory.new(4096) # PIPE_BUF - {% else %} - STDERR - {% end %} if name = @name - io << "Unhandled exception in spawn(name: " << name << "): " + Crystal.print_buffered("Unhandled exception in spawn(name: %s)", name, exception: ex, to: STDERR) else - io << "Unhandled exception in spawn: " + Crystal.print_buffered("Unhandled exception in spawn", exception: ex, to: STDERR) end - ex.inspect_with_backtrace(io) - {% if flag?(:preview_mt) %} - STDERR.write(io.to_slice) - {% end %} - STDERR.flush ensure # Remove the current fiber from the linked list Fiber.inactive(self) From b2276f9ee742454261499a327057fbca4485be59 Mon Sep 17 00:00:00 2001 From: Julien Portalier Date: Tue, 3 Dec 2024 19:36:51 +0100 Subject: [PATCH 02/14] Add Thread::WaitGroup Simple abstraction on top of a mutex and condition variable to synchronize the execution of a set of threads. --- src/crystal/system/thread_wait_group.cr | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) create mode 100644 src/crystal/system/thread_wait_group.cr diff --git a/src/crystal/system/thread_wait_group.cr b/src/crystal/system/thread_wait_group.cr new file mode 100644 index 000000000000..3494e1e7f569 --- /dev/null +++ b/src/crystal/system/thread_wait_group.cr @@ -0,0 +1,20 @@ +# :nodoc: +class Thread::WaitGroup + def initialize(@count : Int32) + @mutex = Thread::Mutex.new + @condition = Thread::ConditionVariable.new + end + + def done : Nil + @mutex.synchronize do + @count -= 1 + @condition.broadcast if @count == 0 + end + end + + def wait : Nil + @mutex.synchronize do + @condition.wait(@mutex) unless @count == 0 + end + end +end From 7b11dcc05c1eba53109d1c80f19866860f98049e Mon Sep 17 00:00:00 2001 From: Julien Portalier Date: Thu, 5 Dec 2024 16:46:39 +0100 Subject: [PATCH 03/14] Add thread safety to Fiber::StackPool --- src/fiber/stack_pool.cr | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/fiber/stack_pool.cr b/src/fiber/stack_pool.cr index 8f809335f46c..2dc1703e67aa 100644 --- a/src/fiber/stack_pool.cr +++ b/src/fiber/stack_pool.cr @@ -13,6 +13,7 @@ class Fiber # pointer value) rather than downwards, so *protect* must be false. def initialize(@protect : Bool = true) @deque = Deque(Void*).new + @lock = Crystal::SpinLock.new end def finalize @@ -25,7 +26,7 @@ class Fiber # returning memory to the operating system. def collect(count = lazy_size // 2) : Nil count.times do - if stack = @deque.shift? + if stack = @lock.sync { @deque.shift? } Crystal::System::Fiber.free_stack(stack, STACK_SIZE) else return @@ -42,7 +43,7 @@ class Fiber # Removes a stack from the bottom of the pool, or allocates a new one. def checkout : {Void*, Void*} - if stack = @deque.pop? + if !@deque.empty? && (stack = @lock.sync { @deque.pop? }) Crystal::System::Fiber.reset_stack(stack, STACK_SIZE, @protect) else stack = Crystal::System::Fiber.allocate_stack(STACK_SIZE, @protect) @@ -52,7 +53,7 @@ class Fiber # Appends a stack to the bottom of the pool. def release(stack) : Nil - @deque.push(stack) + @lock.sync { @deque.push(stack) } end # Returns the approximated size of the pool. It may be equal or slightly From 98022ab63de82708e6d194523d03ed4140b0e0a9 Mon Sep 17 00:00:00 2001 From: Julien Portalier Date: Wed, 4 Dec 2024 12:35:56 +0100 Subject: [PATCH 04/14] Skeleton for ExecutionContext types as per RFC #0002 - Add the `ExecutionContext` module; - Add the `ExecutionContext::Scheduler` module; - Add the `execution_context` compile-time flag. When the `execution_context` flag is set: - Don't load `Crystal::Scheduler`; - Plug `ExecutionContext` instead of `Crystal::Scheduler`. --- src/concurrent.cr | 41 ++++++--- src/crystal/event_loop.cr | 14 ++- src/crystal/event_loop/polling.cr | 18 +++- src/crystal/scheduler.cr | 3 +- src/crystal/system/thread.cr | 41 +++++++-- src/crystal/system/unix/signal.cr | 1 + src/crystal/tracing.cr | 10 +++ src/execution_context/execution_context.cr | 100 +++++++++++++++++++++ src/execution_context/scheduler.cr | 83 +++++++++++++++++ src/fiber.cr | 73 +++++++++++++-- src/io/evented.cr | 12 ++- src/kernel.cr | 6 +- 12 files changed, 365 insertions(+), 37 deletions(-) create mode 100644 src/execution_context/execution_context.cr create mode 100644 src/execution_context/scheduler.cr diff --git a/src/concurrent.cr b/src/concurrent.cr index 07ae945a84f6..1f1ad04bfd06 100644 --- a/src/concurrent.cr +++ b/src/concurrent.cr @@ -1,8 +1,13 @@ require "fiber" require "channel" -require "crystal/scheduler" require "crystal/tracing" +{% if flag?(:execution_context) %} + require "execution_context" +{% else %} + require "crystal/scheduler" +{% end %} + # Blocks the current fiber for the specified number of seconds. # # While this fiber is waiting this time, other ready-to-execute @@ -12,8 +17,7 @@ def sleep(seconds : Number) : Nil if seconds < 0 raise ArgumentError.new "Sleep seconds must be positive" end - - Crystal::Scheduler.sleep(seconds.seconds) + sleep(seconds.seconds) end # Blocks the current Fiber for the specified time span. @@ -21,16 +25,28 @@ end # While this fiber is waiting this time, other ready-to-execute # fibers might start their execution. def sleep(time : Time::Span) : Nil - Crystal::Scheduler.sleep(time) + Crystal.trace :sched, "sleep", for: time + + {% if flag?(:execution_context) %} + Fiber.current.resume_event.add(time) + ExecutionContext.reschedule + {% else %} + Crystal::Scheduler.sleep(time) + {% end %} end # Blocks the current fiber forever. # # Meanwhile, other ready-to-execute fibers might start their execution. def sleep : Nil - Crystal::Scheduler.reschedule + {% if flag?(:execution_context) %} + ExecutionContext.reschedule + {% else %} + Crystal::Scheduler.reschedule + {% end %} end +{% begin %} # Spawns a new fiber. # # NOTE: The newly created fiber doesn't run as soon as spawned. @@ -64,12 +80,17 @@ end # wg.wait # ``` def spawn(*, name : String? = nil, same_thread = false, &block) - fiber = Fiber.new(name, &block) - Crystal.trace :sched, "spawn", fiber: fiber - {% if flag?(:preview_mt) %} fiber.set_current_thread if same_thread {% end %} - fiber.enqueue - fiber + {% if flag?(:execution_context) %} + ExecutionContext::Scheduler.current.spawn(name: name, same_thread: same_thread, &block) + {% else %} + fiber = Fiber.new(name, &block) + Crystal.trace :sched, "spawn", fiber: fiber + {% if flag?(:preview_mt) %} fiber.set_current_thread if same_thread {% end %} + fiber.enqueue + fiber + {% end %} end +{% end %} # Spawns a fiber by first creating a `Proc`, passing the *call*'s # expressions to it, and letting the `Proc` finally invoke the *call*. diff --git a/src/crystal/event_loop.cr b/src/crystal/event_loop.cr index 00bcb86040b6..0294abc2ef3d 100644 --- a/src/crystal/event_loop.cr +++ b/src/crystal/event_loop.cr @@ -27,12 +27,20 @@ abstract class Crystal::EventLoop @[AlwaysInline] def self.current : self - Crystal::Scheduler.event_loop + {% if flag?(:execution_context) %} + ExecutionContext.current.event_loop + {% else %} + Crystal::Scheduler.event_loop + {% end %} end @[AlwaysInline] - def self.current? : self? - Crystal::Scheduler.event_loop? + def self.current? : self | Nil + {% if flag?(:execution_context) %} + ExecutionContext.current.event_loop + {% else %} + Crystal::Scheduler.event_loop? + {% end %} end # Runs the loop. diff --git a/src/crystal/event_loop/polling.cr b/src/crystal/event_loop/polling.cr index 2fe86ad5b649..f7fc36082a0e 100644 --- a/src/crystal/event_loop/polling.cr +++ b/src/crystal/event_loop/polling.cr @@ -115,7 +115,11 @@ abstract class Crystal::EventLoop::Polling < Crystal::EventLoop # NOTE: thread unsafe def run(blocking : Bool) : Bool system_run(blocking) do |fiber| - Crystal::Scheduler.enqueue(fiber) + {% if flag?(:execution_context) %} + fiber.execution_context.enqueue(fiber) + {% else %} + Crystal::Scheduler.enqueue(fiber) + {% end %} end true end @@ -303,13 +307,21 @@ abstract class Crystal::EventLoop::Polling < Crystal::EventLoop Polling.arena.free(index) do |pd| pd.value.@readers.ready_all do |event| pd.value.@event_loop.try(&.unsafe_resume_io(event) do |fiber| - Crystal::Scheduler.enqueue(fiber) + {% if flag?(:execution_context) %} + fiber.execution_context.enqueue(fiber) + {% else %} + Crystal::Scheduler.enqueue(fiber) + {% end %} end) end pd.value.@writers.ready_all do |event| pd.value.@event_loop.try(&.unsafe_resume_io(event) do |fiber| - Crystal::Scheduler.enqueue(fiber) + {% if flag?(:execution_context) %} + fiber.execution_context.enqueue(fiber) + {% else %} + Crystal::Scheduler.enqueue(fiber) + {% end %} end) end diff --git a/src/crystal/scheduler.cr b/src/crystal/scheduler.cr index efee6b3c06f1..6cc13406ea4a 100644 --- a/src/crystal/scheduler.cr +++ b/src/crystal/scheduler.cr @@ -1,3 +1,5 @@ +{% skip_file if flag?(:execution_context) %} + require "crystal/event_loop" require "crystal/system/print_error" require "fiber" @@ -66,7 +68,6 @@ class Crystal::Scheduler end def self.sleep(time : Time::Span) : Nil - Crystal.trace :sched, "sleep", for: time Thread.current.scheduler.sleep(time) end diff --git a/src/crystal/system/thread.cr b/src/crystal/system/thread.cr index 92136d1f3989..2b5e06498798 100644 --- a/src/crystal/system/thread.cr +++ b/src/crystal/system/thread.cr @@ -68,6 +68,39 @@ class Thread getter name : String? + {% if flag?(:execution_context) %} + # :nodoc: + getter! execution_context : ExecutionContext + + # :nodoc: + property! current_scheduler : ExecutionContext::Scheduler + + # :nodoc: + def execution_context=(@execution_context : ExecutionContext) : ExecutionContext + main_fiber.execution_context = execution_context + end + + # :nodoc: + def dead_fiber=(@dead_fiber : Fiber) : Fiber + end + + # :nodoc: + def dead_fiber? : Fiber? + if fiber = @dead_fiber + @dead_fiber = nil + fiber + end + end + {% else %} + # :nodoc: + getter scheduler : Crystal::Scheduler { Crystal::Scheduler.new(self) } + + # :nodoc: + def scheduler? : ::Crystal::Scheduler? + @scheduler + end + {% end %} + def self.unsafe_each(&) # nothing to iterate when @@threads is nil + don't lazily allocate in a # method called from a GC collection callback! @@ -154,14 +187,6 @@ class Thread thread.name = name end - # :nodoc: - getter scheduler : Crystal::Scheduler { Crystal::Scheduler.new(self) } - - # :nodoc: - def scheduler? : ::Crystal::Scheduler? - @scheduler - end - protected def start Thread.threads.push(self) Thread.current = self diff --git a/src/crystal/system/unix/signal.cr b/src/crystal/system/unix/signal.cr index 26f4bf6cf7e9..f65b529bf0fb 100644 --- a/src/crystal/system/unix/signal.cr +++ b/src/crystal/system/unix/signal.cr @@ -2,6 +2,7 @@ require "c/signal" require "c/stdio" require "c/sys/wait" require "c/unistd" +require "../print_error" module Crystal::System::Signal # The number of libc functions that can be called safely from a signal(2) diff --git a/src/crystal/tracing.cr b/src/crystal/tracing.cr index d9508eda85a8..a6c1f747625f 100644 --- a/src/crystal/tracing.cr +++ b/src/crystal/tracing.cr @@ -81,6 +81,16 @@ module Crystal write value.name || '?' end + {% if flag?(:execution_context) %} + def write(value : ExecutionContext) : Nil + write value.name + end + + def write(value : ExecutionContext::Scheduler) : Nil + write value.name + end + {% end %} + def write(value : Pointer) : Nil write "0x" System.to_int_slice(value.address, 16, true, 2) { |bytes| write(bytes) } diff --git a/src/execution_context/execution_context.cr b/src/execution_context/execution_context.cr new file mode 100644 index 000000000000..4342945e3812 --- /dev/null +++ b/src/execution_context/execution_context.cr @@ -0,0 +1,100 @@ +require "../crystal/event_loop" +require "../crystal/system/thread" +require "../crystal/system/thread_linked_list" +require "../fiber" +require "../fiber/stack_pool" +require "./scheduler" + +{% raise "ERROR: execution contexts require the `preview_mt` compilation flag" unless flag?(:preview_mt) %} + +module ExecutionContext + @@default : ExecutionContext? + + @[AlwaysInline] + def self.default : ExecutionContext + @@default.not_nil!("expected default execution context to have been setup") + end + + # :nodoc: + def self.init_default_context : Nil + raise NotImplementedError.new("No execution context implementations (yet)") + end + + # Returns the default number of workers to start in the execution context. + def self.default_workers_count : Int32 + ENV["CRYSTAL_WORKERS"]?.try(&.to_i?) || Math.min(System.cpu_count.to_i, 32) + end + + # :nodoc: + protected class_getter(execution_contexts) { Thread::LinkedList(ExecutionContext).new } + + # :nodoc: + property next : ExecutionContext? + + # :nodoc: + property previous : ExecutionContext? + + # :nodoc: + def self.unsafe_each(&) : Nil + @@execution_contexts.try(&.unsafe_each { |execution_context| yield execution_context }) + end + + def self.each(&) : Nil + execution_contexts.each { |execution_context| yield execution_context } + end + + @[AlwaysInline] + def self.current : ExecutionContext + Thread.current.execution_context + end + + # Tells the current scheduler to suspend the current fiber and resume the + # next runnable fiber. The current fiber will never be resumed; you're + # responsible to reenqueue it. + # + # This method is safe as it only operates on the current `ExecutionContext` + # and `Scheduler`. + @[AlwaysInline] + def self.reschedule : Nil + Scheduler.current.reschedule + end + + # Tells the current scheduler to suspend the current fiber and to resume + # *fiber* instead. The current fiber will never be resumed; you're responsible + # to reenqueue it. + # + # Raises `RuntimeError` if the fiber doesn't belong to the current execution + # context. + # + # This method is safe as it only operates on the current `ExecutionContext` + # and `Scheduler`. + def self.resume(fiber : Fiber) : Nil + if fiber.execution_context == current + Scheduler.current.resume(fiber) + else + raise RuntimeError.new("Can't resume fiber from #{fiber.execution_context} into #{current}") + end + end + + # Creates a new fiber then calls `#enqueue` to add it to the execution + # context. + # + # May be called from any `ExecutionContext` (i.e. must be thread-safe). + def spawn(*, name : String? = nil, &block : ->) : Fiber + Fiber.new(name, self, &block).tap { |fiber| enqueue(fiber) } + end + + # Legacy support for the `same_thread` argument. Each execution context may + # decide to support it or not (e.g. a single threaded context can accept it). + abstract def spawn(*, name : String? = nil, same_thread : Bool, &block : ->) : Fiber + + abstract def stack_pool : Fiber::StackPool + abstract def stack_pool? : Fiber::StackPool? + + abstract def event_loop : Crystal::EventLoop + + # Enqueues a fiber to be resumed inside the execution context. + # + # May be called from any ExecutionContext (i.e. must be thread-safe). + abstract def enqueue(fiber : Fiber) : Nil +end diff --git a/src/execution_context/scheduler.cr b/src/execution_context/scheduler.cr new file mode 100644 index 000000000000..fe5acab96500 --- /dev/null +++ b/src/execution_context/scheduler.cr @@ -0,0 +1,83 @@ +module ExecutionContext + module Scheduler + @[AlwaysInline] + def self.current : Scheduler + Thread.current.current_scheduler + end + + protected abstract def thread : Thread + protected abstract def execution_context : ExecutionContext + + # Instantiates a fiber and enqueues it into the scheduler's local queue. + def spawn(*, name : String? = nil, &block : ->) : Fiber + fiber = Fiber.new(name, execution_context, &block) + enqueue(fiber) + fiber + end + + # Legacy support for the *same_thread* argument. Each execution context may + # decide to support it or not (e.g. a single threaded context can accept it). + abstract def spawn(*, name : String? = nil, same_thread : Bool, &block : ->) : Fiber + + # Suspends the execution of the current fiber and resumes the next runnable + # fiber. + # + # Unsafe. Must only be called on `ExecutionContext.current`. Prefer + # `ExecutionContext.reschedule` instead. + protected abstract def enqueue(fiber : Fiber) : Nil + + # Suspends the execution of the current fiber and resumes the next runnable + # fiber. + # + # Unsafe. Must only be called on `ExecutionContext.current`. Prefer + # `ExecutionContext.reschedule` instead. + protected abstract def reschedule : Nil + + # Suspends the execution of the current fiber and resumes *fiber*. + # + # The current fiber will never be resumed; you're responsible to reenqueue + # it. + # + # Unsafe. Must only be called on `ExecutionContext.current`. Prefer + # `ExecutionContext.resume` instead. + protected abstract def resume(fiber : Fiber) : Nil + + # Switches the thread from running the current fiber to run *fiber* instead. + # + # Handles thread safety around fiber stacks: locks the GC to not start a + # collection while we're switching context, releases the stack of a dead + # fiber. + # + # Unsafe. Must only be called by the current scheduler. Caller must ensure + # that the fiber indeed belongs to the current execution context, and that + # the fiber can indeed be resumed. + protected def swapcontext(fiber : Fiber) : Nil + current_fiber = thread.current_fiber + + {% unless flag?(:interpreted) %} + thread.dead_fiber = current_fiber if current_fiber.dead? + {% end %} + + GC.lock_read + thread.current_fiber = fiber + Fiber.swapcontext(pointerof(current_fiber.@context), pointerof(fiber.@context)) + GC.unlock_read + + # we switched context so we can't trust *self* anymore (it is the + # scheduler that rescheduled *fiber* which may be another scheduler) as + # well as any other local or instance variables (e.g. we must resolve + # `Thread.current` again) + # + # that being said, we can still trust the *current_fiber* local variable + # (it's the only exception) + + {% unless flag?(:interpreted) %} + if fiber = Thread.current.dead_fiber? + fiber.execution_context.stack_pool.release(fiber.@stack) + end + {% end %} + end + + abstract def status : String + end +end diff --git a/src/fiber.cr b/src/fiber.cr index b34a8762037d..39a9f2bf2b85 100644 --- a/src/fiber.cr +++ b/src/fiber.cr @@ -59,7 +59,10 @@ class Fiber property name : String? @alive = true - {% if flag?(:preview_mt) %} @current_thread = Atomic(Thread?).new(nil) {% end %} + + {% if flag?(:preview_mt) && !flag?(:execution_context) %} + @current_thread = Atomic(Thread?).new(nil) + {% end %} # :nodoc: property next : Fiber? @@ -67,6 +70,10 @@ class Fiber # :nodoc: property previous : Fiber? + {% if flag?(:execution_context) %} + property! execution_context : ExecutionContext + {% end %} + # :nodoc: def self.inactive(fiber : Fiber) fibers.delete(fiber) @@ -84,16 +91,19 @@ class Fiber fibers.each { |fiber| yield fiber } end + {% begin %} # Creates a new `Fiber` instance. # # When the fiber is executed, it runs *proc* in its context. # # *name* is an optional and used only as an internal reference. - def initialize(@name : String? = nil, &@proc : ->) + def initialize(@name : String? = nil, {% if flag?(:execution_context) %}@execution_context : ExecutionContext = ExecutionContext.current,{% end %} &@proc : ->) @context = Context.new @stack, @stack_bottom = {% if flag?(:interpreted) %} {Pointer(Void).null, Pointer(Void).null} + {% elsif flag?(:execution_context) %} + execution_context.stack_pool.checkout {% else %} Crystal::Scheduler.stack_pool.checkout {% end %} @@ -123,6 +133,7 @@ class Fiber Fiber.fibers.push(self) end + {% end %} # :nodoc: def initialize(@stack : Void*, thread) @@ -139,13 +150,30 @@ class Fiber {% end %} thread.gc_thread_handler, @stack_bottom = GC.current_thread_stack_bottom @name = "main" - {% if flag?(:preview_mt) %} @current_thread.set(thread) {% end %} + + {% if flag?(:preview_mt) && !flag?(:execution_context) %} + @current_thread.set(thread) + {% end %} + Fiber.fibers.push(self) + + # we don't initialize @execution_context here (we may not have an execution + # context yet), and we can't detect ExecutionContext.current (we may reach + # an infinite recursion). end # :nodoc: def run GC.unlock_read + + {% if flag?(:execution_context) && !flag?(:interpreted) %} + # if the fiber previously running on this thread has terminated, we can + # now safely release its stack + if fiber = Thread.current.dead_fiber? + fiber.execution_context.stack_pool.release(fiber.@stack) + end + {% end %} + @proc.call rescue ex if name = @name @@ -163,9 +191,17 @@ class Fiber @timeout_select_action = nil @alive = false - {% unless flag?(:interpreted) %} + + {% unless flag?(:interpreted) || flag?(:execution_context) %} + # interpreted: the interpreter is managing the stacks + # + # execution context: do not prematurely release the stack before we switch + # to another fiber so we don't end up with a thread reusing a stack for a + # new fiber while the current fiber isn't fully terminated (oops); even + # without the pool, we can't unmap before we swap context. Crystal::Scheduler.stack_pool.release(@stack) {% end %} + Fiber.suspend end @@ -207,7 +243,11 @@ class Fiber # puts "never reached" # ``` def resume : Nil - Crystal::Scheduler.resume(self) + {% if flag?(:execution_context) %} + ExecutionContext.resume(self) + {% else %} + Crystal::Scheduler.resume(self) + {% end %} end # Adds this fiber to the scheduler's runnables queue for the current thread. @@ -216,7 +256,11 @@ class Fiber # the next time it has the opportunity to reschedule to another fiber. There # are no guarantees when that will happen. def enqueue : Nil - Crystal::Scheduler.enqueue(self) + {% if flag?(:execution_context) %} + execution_context.enqueue(self) + {% else %} + Crystal::Scheduler.enqueue(self) + {% end %} end # :nodoc: @@ -284,7 +328,14 @@ class Fiber # end # ``` def self.yield : Nil - Crystal::Scheduler.yield + Crystal.trace :sched, "yield" + + {% if flag?(:execution_context) %} + Fiber.current.resume_event.add(0.seconds) + Fiber.suspend + {% else %} + Crystal::Scheduler.yield + {% end %} end # Suspends execution of the current fiber indefinitely. @@ -298,7 +349,11 @@ class Fiber # useful if the fiber needs to wait for something to happen (for example an IO # event, a message is ready in a channel, etc.) which triggers a re-enqueue. def self.suspend : Nil - Crystal::Scheduler.reschedule + {% if flag?(:execution_context) %} + ExecutionContext.reschedule + {% else %} + Crystal::Scheduler.reschedule + {% end %} end def to_s(io : IO) : Nil @@ -320,7 +375,7 @@ class Fiber GC.push_stack @context.stack_top, @stack_bottom end - {% if flag?(:preview_mt) %} + {% if flag?(:preview_mt) && !flag?(:execution_context) %} # :nodoc: def set_current_thread(thread = Thread.current) : Thread @current_thread.set(thread) diff --git a/src/io/evented.cr b/src/io/evented.cr index 1f95d1870b0b..b238830f284a 100644 --- a/src/io/evented.cr +++ b/src/io/evented.cr @@ -89,11 +89,19 @@ module IO::Evented @write_event.consume_each &.free @readers.consume_each do |readers| - Crystal::Scheduler.enqueue readers + {% if flag?(:execution_context) %} + readers.each { |fiber| fiber.execution_context.enqueue fiber } + {% else %} + Crystal::Scheduler.enqueue readers + {% end %} end @writers.consume_each do |writers| - Crystal::Scheduler.enqueue writers + {% if flag?(:execution_context) %} + writers.each { |fiber| fiber.execution_context.enqueue fiber } + {% else %} + Crystal::Scheduler.enqueue writers + {% end %} end end diff --git a/src/kernel.cr b/src/kernel.cr index 34763b994839..c2af8771824e 100644 --- a/src/kernel.cr +++ b/src/kernel.cr @@ -608,7 +608,11 @@ end Exception::CallStack.load_debug_info if ENV["CRYSTAL_LOAD_DEBUG_INFO"]? == "1" Exception::CallStack.setup_crash_handler - Crystal::Scheduler.init + {% if flag?(:execution_context) %} + ExecutionContext.init_default_context + {% else %} + Crystal::Scheduler.init + {% end %} {% if flag?(:win32) %} Crystal::System::Process.start_interrupt_loop From 632702db6444e9f8ca9063f470a96084d8f575fe Mon Sep 17 00:00:00 2001 From: Julien Portalier Date: Tue, 3 Dec 2024 18:28:53 +0100 Subject: [PATCH 05/14] Add Fiber::Queue singly-linked LIFO queue --- spec/std/fiber/queue_spec.cr | 183 +++++++++++++++++++++++++++++++++++ src/fiber.cr | 3 + src/fiber/queue.cr | 83 ++++++++++++++++ 3 files changed, 269 insertions(+) create mode 100644 spec/std/fiber/queue_spec.cr create mode 100644 src/fiber/queue.cr diff --git a/spec/std/fiber/queue_spec.cr b/spec/std/fiber/queue_spec.cr new file mode 100644 index 000000000000..39c67bc52bee --- /dev/null +++ b/spec/std/fiber/queue_spec.cr @@ -0,0 +1,183 @@ +require "../spec_helper" +require "fiber/queue" + +describe Fiber::Queue do + describe "#initialize" do + it "creates an empty queue" do + q = Fiber::Queue.new + q.@head.should be_nil + q.@tail.should be_nil + q.size.should eq(0) + q.empty?.should be_true + end + + it "creates a filled queue" do + f1 = Fiber.new(name: "f1") { } + f2 = Fiber.new(name: "f2") { } + f1.schedlink = f2 + f2.schedlink = nil + + q = Fiber::Queue.new(f2, f1, size: 2) + q.@head.should be(f2) + q.@tail.should be(f1) + q.size.should eq(2) + q.empty?.should be_false + end + end + + describe "#push" do + it "to head" do + q = Fiber::Queue.new + f1 = Fiber.new(name: "f1") { } + f2 = Fiber.new(name: "f2") { } + f3 = Fiber.new(name: "f3") { } + + # simulate fibers previously added to other queues + f1.schedlink = f3 + f2.schedlink = f1 + + # push first fiber + q.push(f1) + q.@head.should be(f1) + q.@tail.should be(f1) + f1.schedlink.should be_nil + q.size.should eq(1) + + # push second fiber + q.push(f2) + q.@head.should be(f2) + q.@tail.should be(f1) + f2.schedlink.should be(f1) + f1.schedlink.should be_nil + q.size.should eq(2) + + # push third fiber + q.push(f3) + q.@head.should be(f3) + q.@tail.should be(f1) + f3.schedlink.should be(f2) + f2.schedlink.should be(f1) + f1.schedlink.should be_nil + q.size.should eq(3) + end + end + + describe "#bulk_unshift" do + it "to empty queue" do + # manually create a queue + f1 = Fiber.new(name: "f1") { } + f2 = Fiber.new(name: "f2") { } + f3 = Fiber.new(name: "f3") { } + f3.schedlink = f2 + f2.schedlink = f1 + f1.schedlink = nil + q1 = Fiber::Queue.new(f3, f1, size: 3) + + # push in bulk + q2 = Fiber::Queue.new(nil, nil, size: 0) + q2.bulk_unshift(pointerof(q1)) + q2.@head.should be(f3) + q2.@tail.should be(f1) + q2.size.should eq(3) + end + + it "to filled queue" do + f1 = Fiber.new(name: "f1") { } + f2 = Fiber.new(name: "f2") { } + f3 = Fiber.new(name: "f3") { } + f4 = Fiber.new(name: "f4") { } + f5 = Fiber.new(name: "f5") { } + + # source queue + f3.schedlink = f2 + f2.schedlink = f1 + f1.schedlink = nil + q1 = Fiber::Queue.new(f3, f1, size: 3) + + # destination queue + f5.schedlink = f4 + f4.schedlink = nil + q2 = Fiber::Queue.new(f5, f4, size: 2) + + # push in bulk + q2.bulk_unshift(pointerof(q1)) + q2.@head.should be(f5) + q2.@tail.should be(f1) + q2.size.should eq(5) + + f5.schedlink.should be(f4) + f4.schedlink.should be(f3) + f3.schedlink.should be(f2) + f2.schedlink.should be(f1) + f1.schedlink.should be(nil) + end + end + + describe "#pop" do + it "from head" do + f1 = Fiber.new(name: "f1") { } + f2 = Fiber.new(name: "f2") { } + f3 = Fiber.new(name: "f3") { } + f3.schedlink = f2 + f2.schedlink = f1 + f1.schedlink = nil + q = Fiber::Queue.new(f3, f1, size: 3) + + # removes third element + q.pop.should be(f3) + q.@head.should be(f2) + q.@tail.should be(f1) + q.size.should eq(2) + + # removes second element + q.pop.should be(f2) + q.@head.should be(f1) + q.@tail.should be(f1) + q.size.should eq(1) + + # removes first element + q.pop.should be(f1) + q.@head.should be_nil + q.@tail.should be_nil + q.size.should eq(0) + + # empty queue + expect_raises(IndexError) { q.pop } + q.size.should eq(0) + end + end + + describe "#pop?" do + it "from head" do + f1 = Fiber.new(name: "f1") { } + f2 = Fiber.new(name: "f2") { } + f3 = Fiber.new(name: "f3") { } + f3.schedlink = f2 + f2.schedlink = f1 + f1.schedlink = nil + q = Fiber::Queue.new(f3, f1, size: 3) + + # removes third element + q.pop?.should be(f3) + q.@head.should be(f2) + q.@tail.should be(f1) + q.size.should eq(2) + + # removes second element + q.pop?.should be(f2) + q.@head.should be(f1) + q.@tail.should be(f1) + q.size.should eq(1) + + # removes first element + q.pop?.should be(f1) + q.@head.should be_nil + q.@tail.should be_nil + q.size.should eq(0) + + # empty queue + q.pop?.should be_nil + q.size.should eq(0) + end + end +end diff --git a/src/fiber.cr b/src/fiber.cr index 39a9f2bf2b85..6c95b9298d33 100644 --- a/src/fiber.cr +++ b/src/fiber.cr @@ -74,6 +74,9 @@ class Fiber property! execution_context : ExecutionContext {% end %} + # :nodoc: + property schedlink : Fiber? + # :nodoc: def self.inactive(fiber : Fiber) fibers.delete(fiber) diff --git a/src/fiber/queue.cr b/src/fiber/queue.cr new file mode 100644 index 000000000000..9695ac8b6a83 --- /dev/null +++ b/src/fiber/queue.cr @@ -0,0 +1,83 @@ +# The queue is modeled after Go's `gQueue`, distributed under a BSD-like +# license: + +class Fiber + # :nodoc: + # + # Singly-linked list of `Fiber`. + # Last-in, first-out (LIFO) semantic. + # A fiber can only exist within a single `Queue` at any time. + # + # Unlike `Crystal::PointerLinkedList` doubly-linked list, this `Queue` is + # meant to maintain a queue of runnable fibers, or to quickly collect an + # arbitrary number of fibers. + # + # Thread unsafe! An external lock is required for concurrent accesses. + struct Queue + getter size : Int32 + + def initialize(@head : Fiber? = nil, @tail : Fiber? = nil, @size = 0) + end + + def push(fiber : Fiber) : Nil + fiber.schedlink = @head + @head = fiber + @tail = fiber if @tail.nil? + @size += 1 + end + + def bulk_unshift(queue : Queue*) : Nil + return unless last = queue.value.@tail + last.schedlink = nil + + if tail = @tail + tail.schedlink = queue.value.@head + else + @head = queue.value.@head + end + @tail = queue.value.@tail + + @size += queue.value.size + end + + @[AlwaysInline] + def pop : Fiber + pop { raise IndexError.new } + end + + @[AlwaysInline] + def pop? : Fiber? + pop { nil } + end + + private def pop(&) + if fiber = @head + @head = fiber.schedlink + @tail = nil if @head.nil? + @size -= 1 + fiber.schedlink = nil + fiber + else + yield + end + end + + @[AlwaysInline] + def empty? : Bool + @head == nil + end + + def clear + @size = 0 + @head = @tail = nil + end + + def each(&) : Nil + cursor = @head + while cursor + yield cursor + cursor = cursor.schedlink + end + end + end +end From 852656d1c0f66dea0c1b1e5594eed7b00a32f529 Mon Sep 17 00:00:00 2001 From: Julien Portalier Date: Tue, 3 Dec 2024 19:39:22 +0100 Subject: [PATCH 06/14] Add Runnables and GlobalQueue for schedulers to keep fibers --- .../execution_context/global_queue_spec.cr | 225 +++++++++++++++ spec/std/execution_context/runnables_spec.cr | 264 ++++++++++++++++++ spec/std/execution_context/spec_helper.cr | 21 ++ src/execution_context/global_queue.cr | 104 +++++++ src/execution_context/runnables.cr | 210 ++++++++++++++ 5 files changed, 824 insertions(+) create mode 100644 spec/std/execution_context/global_queue_spec.cr create mode 100644 spec/std/execution_context/runnables_spec.cr create mode 100644 spec/std/execution_context/spec_helper.cr create mode 100644 src/execution_context/global_queue.cr create mode 100644 src/execution_context/runnables.cr diff --git a/spec/std/execution_context/global_queue_spec.cr b/spec/std/execution_context/global_queue_spec.cr new file mode 100644 index 000000000000..838a31406c01 --- /dev/null +++ b/spec/std/execution_context/global_queue_spec.cr @@ -0,0 +1,225 @@ +require "./spec_helper" + +describe ExecutionContext::GlobalQueue do + it "#initialize" do + q = ExecutionContext::GlobalQueue.new(Thread::Mutex.new) + q.empty?.should be_true + end + + it "#unsafe_push and #unsafe_pop" do + f1 = Fiber.new(name: "f1") { } + f2 = Fiber.new(name: "f2") { } + f3 = Fiber.new(name: "f3") { } + + q = ExecutionContext::GlobalQueue.new(Thread::Mutex.new) + q.unsafe_push(f1) + q.size.should eq(1) + + q.unsafe_push(f2) + q.unsafe_push(f3) + q.size.should eq(3) + + q.unsafe_pop?.should be(f3) + q.size.should eq(2) + + q.unsafe_pop?.should be(f2) + q.unsafe_pop?.should be(f1) + q.unsafe_pop?.should be_nil + q.size.should eq(0) + q.empty?.should be_true + end + + describe "#unsafe_grab?" do + it "can't grab from empty queue" do + q = ExecutionContext::GlobalQueue.new(Thread::Mutex.new) + runnables = ExecutionContext::Runnables(6).new(q) + q.unsafe_grab?(runnables, 4).should be_nil + end + + it "grabs fibers" do + q = ExecutionContext::GlobalQueue.new(Thread::Mutex.new) + fibers = 10.times.map { |i| Fiber.new(name: "f#{i}") { } }.to_a + fibers.each { |f| q.unsafe_push(f) } + + runnables = ExecutionContext::Runnables(6).new(q) + fiber = q.unsafe_grab?(runnables, 4) + + # returned the last enqueued fiber + fiber.should be(fibers[9]) + + # enqueued the next 2 fibers + runnables.size.should eq(2) + runnables.get?.should be(fibers[8]) + runnables.get?.should be(fibers[7]) + + # the remaining fibers are still there: + 6.downto(0).each do |i| + q.unsafe_pop?.should be(fibers[i]) + end + end + + it "can't grab more than available" do + f = Fiber.new { } + q = ExecutionContext::GlobalQueue.new(Thread::Mutex.new) + q.unsafe_push(f) + + # dequeues the unique fiber + runnables = ExecutionContext::Runnables(6).new(q) + fiber = q.unsafe_grab?(runnables, 4) + fiber.should be(f) + + # had nothing left to dequeue + runnables.size.should eq(0) + end + + it "clamps divisor to 1" do + f = Fiber.new { } + q = ExecutionContext::GlobalQueue.new(Thread::Mutex.new) + q.unsafe_push(f) + + # dequeues the unique fiber + runnables = ExecutionContext::Runnables(6).new(q) + fiber = q.unsafe_grab?(runnables, 0) + fiber.should be(f) + + # had nothing left to dequeue + runnables.size.should eq(0) + end + end + + # interpreter doesn't support threads yet (#14287) + pending_interpreted describe: "thread safety" do + it "one by one" do + fibers = StaticArray(ExecutionContext::FiberCounter, 763).new do |i| + ExecutionContext::FiberCounter.new(Fiber.new(name: "f#{i}") { }) + end + + n = 7 + increments = 15 + queue = ExecutionContext::GlobalQueue.new(Thread::Mutex.new) + ready = Thread::WaitGroup.new(n) + shutdown = Thread::WaitGroup.new(n) + + n.times do |i| + Thread.new(name: "ONE-#{i}") do |thread| + slept = 0 + ready.done + + loop do + if fiber = queue.pop? + fc = fibers.find { |x| x.@fiber == fiber }.not_nil! + queue.push(fiber) if fc.increment < increments + slept = 0 + elsif slept < 100 + slept += 1 + Thread.sleep(1.nanosecond) # don't burn CPU + else + break + end + end + rescue exception + Crystal::System.print_error "\nthread: #{thread.name}: exception: #{exception}" + ensure + shutdown.done + end + end + ready.wait + + fibers.each_with_index do |fc, i| + queue.push(fc.@fiber) + Thread.sleep(10.nanoseconds) if i % 10 == 9 + end + + shutdown.wait + + # must have dequeued each fiber exactly X times + fibers.each { |fc| fc.counter.should eq(increments) } + end + + it "bulk operations" do + n = 7 + increments = 15 + + fibers = StaticArray(ExecutionContext::FiberCounter, 765).new do |i| # 765 can be divided by 3 and 5 + ExecutionContext::FiberCounter.new(Fiber.new(name: "f#{i}") { }) + end + + queue = ExecutionContext::GlobalQueue.new(Thread::Mutex.new) + ready = Thread::WaitGroup.new(n) + shutdown = Thread::WaitGroup.new(n) + + n.times do |i| + Thread.new(name: "BULK-#{i}") do |thread| + slept = 0 + + r = ExecutionContext::Runnables(3).new(queue) + + batch = Fiber::Queue.new + size = 0 + + reenqueue = -> { + if size > 0 + queue.bulk_push(pointerof(batch)) + names = [] of String? + batch.each { |f| names << f.name } + batch.clear + size = 0 + end + } + + execute = ->(fiber : Fiber) { + fc = fibers.find { |x| x.@fiber == fiber }.not_nil! + + if fc.increment < increments + batch.push(fc.@fiber) + size += 1 + end + } + + ready.done + + loop do + if fiber = r.get? + execute.call(fiber) + slept = 0 + next + end + + if fiber = queue.grab?(r, 1) + reenqueue.call + execute.call(fiber) + slept = 0 + next + end + + if slept >= 100 + break + end + + reenqueue.call + slept += 1 + Thread.sleep(1.nanosecond) # don't burn CPU + end + rescue exception + Crystal::System.print_error "\nthread #{thread.name} raised: #{exception}" + ensure + shutdown.done + end + end + ready.wait + + # enqueue in batches of 5 + 0.step(to: fibers.size - 1, by: 5) do |i| + q = Fiber::Queue.new + 5.times { |j| q.push(fibers[i + j].@fiber) } + queue.bulk_push(pointerof(q)) + Thread.sleep(10.nanoseconds) if i % 4 == 3 + end + + shutdown.wait + + # must have dequeued each fiber exactly X times (no less, no more) + fibers.each { |fc| fc.counter.should eq(increments) } + end + end +end diff --git a/spec/std/execution_context/runnables_spec.cr b/spec/std/execution_context/runnables_spec.cr new file mode 100644 index 000000000000..6fa342675402 --- /dev/null +++ b/spec/std/execution_context/runnables_spec.cr @@ -0,0 +1,264 @@ +require "./spec_helper" + +describe ExecutionContext::Runnables do + it "#initialize" do + g = ExecutionContext::GlobalQueue.new(Thread::Mutex.new) + r = ExecutionContext::Runnables(16).new(g) + r.capacity.should eq(16) + end + + describe "#push" do + it "enqueues the fiber in local queue" do + fibers = 4.times.map { |i| Fiber.new(name: "f#{i}") { } }.to_a + + # local enqueue + g = ExecutionContext::GlobalQueue.new(Thread::Mutex.new) + r = ExecutionContext::Runnables(4).new(g) + fibers.each { |f| r.push(f) } + + # local dequeue + fibers.each { |f| r.get?.should be(f) } + r.get?.should be_nil + + # didn't push to global queue + g.pop?.should be_nil + end + + it "moves half the local queue to the global queue on overflow" do + fibers = 5.times.map { |i| Fiber.new(name: "f#{i}") { } }.to_a + + # local enqueue + overflow + g = ExecutionContext::GlobalQueue.new(Thread::Mutex.new) + r = ExecutionContext::Runnables(4).new(g) + fibers.each { |f| r.push(f) } + + # kept half of local queue + r.get?.should be(fibers[2]) + r.get?.should be(fibers[3]) + + # moved half of local queue + last push to global queue + g.pop?.should eq(fibers[0]) + g.pop?.should eq(fibers[1]) + g.pop?.should eq(fibers[4]) + end + + it "can always push up to capacity" do + g = ExecutionContext::GlobalQueue.new(Thread::Mutex.new) + r = ExecutionContext::Runnables(4).new(g) + + 4.times do + # local + 4.times { r.push(Fiber.new { }) } + 2.times { r.get? } + 2.times { r.push(Fiber.new { }) } + + # overflow (2+1 fibers are sent to global queue + 1 local) + 2.times { r.push(Fiber.new { }) } + + # clear + 3.times { r.get? } + end + + # on each iteration we pushed 2+1 fibers to the global queue + g.size.should eq(12) + + # grab fibers back from the global queue + fiber = g.unsafe_grab?(r, divisor: 1) + fiber.should_not be_nil + r.get?.should_not be_nil + r.get?.should be_nil + end + end + + describe "#bulk_push" do + it "fills the local queue" do + q = Fiber::Queue.new + fibers = 4.times.map { |i| Fiber.new(name: "f#{i}") { } }.to_a + fibers.each { |f| q.push(f) } + + # local enqueue + g = ExecutionContext::GlobalQueue.new(Thread::Mutex.new) + r = ExecutionContext::Runnables(4).new(g) + r.bulk_push(pointerof(q)) + + fibers.reverse_each { |f| r.get?.should be(f) } + g.empty?.should be_true + end + + it "pushes the overflow to the global queue" do + q = Fiber::Queue.new + fibers = 7.times.map { |i| Fiber.new(name: "f#{i}") { } }.to_a + fibers.each { |f| q.push(f) } + + # local enqueue + overflow + g = ExecutionContext::GlobalQueue.new(Thread::Mutex.new) + r = ExecutionContext::Runnables(4).new(g) + r.bulk_push(pointerof(q)) + + # filled the local queue + r.get?.should eq(fibers[6]) + r.get?.should eq(fibers[5]) + r.get?.should be(fibers[4]) + r.get?.should be(fibers[3]) + + # moved the rest to the global queue + g.pop?.should eq(fibers[2]) + g.pop?.should eq(fibers[1]) + g.pop?.should eq(fibers[0]) + end + end + + describe "#get?" do + # TODO: need specific tests (though we already use it in the above tests?) + end + + describe "#steal_from" do + it "steals from another runnables" do + g = ExecutionContext::GlobalQueue.new(Thread::Mutex.new) + fibers = 6.times.map { |i| Fiber.new(name: "f#{i}") { } }.to_a + + # fill the source queue + r1 = ExecutionContext::Runnables(16).new(g) + fibers.each { |f| r1.push(f) } + + # steal from source queue + r2 = ExecutionContext::Runnables(16).new(g) + fiber = r2.steal_from(r1) + + # stole half of the runnable fibers + fiber.should be(fibers[2]) + r2.get?.should be(fibers[0]) + r2.get?.should be(fibers[1]) + r2.get?.should be_nil + + # left the other half + r1.get?.should be(fibers[3]) + r1.get?.should be(fibers[4]) + r1.get?.should be(fibers[5]) + r1.get?.should be_nil + + # global queue is left untouched + g.empty?.should be_true + end + + it "steals the last fiber" do + g = ExecutionContext::GlobalQueue.new(Thread::Mutex.new) + lone = Fiber.new(name: "lone") { } + + # fill the source queue + r1 = ExecutionContext::Runnables(16).new(g) + r1.push(lone) + + # steal from source queue + r2 = ExecutionContext::Runnables(16).new(g) + fiber = r2.steal_from(r1) + + # stole the fiber & local queue is still empty + fiber.should be(lone) + r2.get?.should be_nil + + # left nothing in original queue + r1.get?.should be_nil + + # global queue is left untouched + g.empty?.should be_true + end + + it "steals nothing" do + g = ExecutionContext::GlobalQueue.new(Thread::Mutex.new) + r1 = ExecutionContext::Runnables(16).new(g) + r2 = ExecutionContext::Runnables(16).new(g) + + fiber = r2.steal_from(r1) + fiber.should be_nil + r2.get?.should be_nil + r1.get?.should be_nil + end + end + + # interpreter doesn't support threads yet (#14287) + pending_interpreted describe: "thread safety" do + it "stress test" do + n = 7 + increments = 7919 + + # less fibers than space in runnables (so threads can starve) + # 54 is roughly half of 16 × 7 and can be divided by 9 (for batch enqueues below) + fibers = Array(ExecutionContext::FiberCounter).new(54) do |i| + ExecutionContext::FiberCounter.new(Fiber.new(name: "f#{i}") { }) + end + + global_queue = ExecutionContext::GlobalQueue.new(Thread::Mutex.new) + ready = Thread::WaitGroup.new(n) + shutdown = Thread::WaitGroup.new(n) + + all_runnables = Array(ExecutionContext::Runnables(16)).new(n) do + ExecutionContext::Runnables(16).new(global_queue) + end + + n.times do |i| + Thread.new(name: "RUN-#{i}") do |thread| + runnables = all_runnables[i] + slept = 0 + + execute = ->(fiber : Fiber) { + fc = fibers.find { |x| x.@fiber == fiber }.not_nil! + runnables.push(fiber) if fc.increment < increments + } + + ready.done + + loop do + # dequeue from local queue + if fiber = runnables.get? + execute.call(fiber) + slept = 0 + next + end + + # steal from another queue + while (r = all_runnables.sample) == runnables + end + if fiber = runnables.steal_from(r) + execute.call(fiber) + slept = 0 + next + end + + # dequeue from global queue + if fiber = global_queue.grab?(runnables, n) + execute.call(fiber) + slept = 0 + next + end + + if slept >= 100 + break + end + + slept += 1 + Thread.sleep(1.nanosecond) # don't burn CPU + end + rescue exception + Crystal::System.print_error "\nthread #{thread.name} raised: #{exception}" + ensure + shutdown.done + end + end + ready.wait + + # enqueue in batches + 0.step(to: fibers.size - 1, by: 9) do |i| + q = Fiber::Queue.new + 9.times { |j| q.push(fibers[i + j].@fiber) } + global_queue.bulk_push(pointerof(q)) + Thread.sleep(10.nanoseconds) if i % 2 == 1 + end + + shutdown.wait + + # must have dequeued each fiber exactly X times (no less, no more) + fibers.each { |fc| fc.counter.should eq(increments) } + end + end +end diff --git a/spec/std/execution_context/spec_helper.cr b/spec/std/execution_context/spec_helper.cr new file mode 100644 index 000000000000..9a1dbb881cee --- /dev/null +++ b/spec/std/execution_context/spec_helper.cr @@ -0,0 +1,21 @@ +require "../spec_helper" +require "crystal/system/thread_wait_group" +require "execution_context/runnables" +require "execution_context/global_queue" + +module ExecutionContext + class FiberCounter + def initialize(@fiber : Fiber) + @counter = Atomic(Int32).new(0) + end + + # fetch and add + def increment + @counter.add(1, :relaxed) + 1 + end + + def counter + @counter.get(:relaxed) + end + end +end diff --git a/src/execution_context/global_queue.cr b/src/execution_context/global_queue.cr new file mode 100644 index 000000000000..22535ab01ed6 --- /dev/null +++ b/src/execution_context/global_queue.cr @@ -0,0 +1,104 @@ +# The queue is a port of Go's `globrunq*` functions, distributed under a +# BSD-like license: +# + +require "../fiber/queue" +require "./runnables" + +module ExecutionContext + # Global queue of runnable fibers. + # Unbounded. + # Shared by all schedulers in an execution context. + # + # Basically a `Fiber::Queue` wrapped in a `Thread::Mutex`, at the exception of + # the `#grab?` method that tries to grab 1/Nth of the queue at once. + class GlobalQueue + def initialize(@mutex : Thread::Mutex) + @queue = Fiber::Queue.new + end + + # Grabs the lock and enqueues a runnable fiber on the global runnable queue. + def push(fiber : Fiber) : Nil + @mutex.synchronize { unsafe_push(fiber) } + end + + # Enqueues a runnable fiber on the global runnable queue. Assumes the lock + # is currently held. + def unsafe_push(fiber : Fiber) : Nil + @queue.push(fiber) + end + + # Grabs the lock and puts a runnable fiber on the global runnable queue. + def bulk_push(queue : Fiber::Queue*) : Nil + @mutex.synchronize { unsafe_bulk_push(queue) } + end + + # Puts a runnable fiber on the global runnable queue. Assumes the lock is + # currently held. + def unsafe_bulk_push(queue : Fiber::Queue*) : Nil + @queue.bulk_unshift(queue) + end + + # Grabs the lock and dequeues one runnable fiber from the global runnable + # queue. + def pop? : Fiber? + @mutex.synchronize { unsafe_pop? } + end + + # Dequeues one runnable fiber from the global runnable queue. Assumes the + # lock is currently held. + def unsafe_pop? : Fiber? + @queue.pop? + end + + # Grabs the lock then tries to grab a batch of fibers from the global + # runnable queue. Returns the next runnable fiber or `nil` if the queue was + # empty. + # + # `divisor` is meant for fair distribution of fibers across threads in the + # execution context; it should be the number of threads. + def grab?(runnables : Runnables, divisor : Int32) : Fiber? + @mutex.synchronize { unsafe_grab?(runnables, divisor) } + end + + # Try to grab a batch of fibers from the global runnable queue. Returns the + # next runnable fiber or `nil` if the queue was empty. Assumes the lock is + # currently held. + # + # `divisor` is meant for fair distribution of fibers across threads in the + # execution context; it should be the number of threads. + def unsafe_grab?(runnables : Runnables, divisor : Int32) : Fiber? + # ported from Go: globrunqget + return if @queue.empty? + + divisor = 1 if divisor < 1 + size = @queue.size + + n = { + size, # can't grab more than available + size // divisor + 1, # divide + try to take at least 1 fiber + runnables.capacity // 2, # refill half the destination queue + }.min + + fiber = @queue.pop? + + # OPTIMIZE: q = @queue.split(n - 1) then `runnables.push(pointerof(q))` (?) + (n - 1).times do + break unless f = @queue.pop? + runnables.push(f) + end + + fiber + end + + @[AlwaysInline] + def empty? : Bool + @queue.empty? + end + + @[AlwaysInline] + def size : Int32 + @queue.size + end + end +end diff --git a/src/execution_context/runnables.cr b/src/execution_context/runnables.cr new file mode 100644 index 000000000000..6be2fda446c0 --- /dev/null +++ b/src/execution_context/runnables.cr @@ -0,0 +1,210 @@ +# The queue is a port of Go's `runq*` functions, distributed under a BSD-like +# license: +# +# The queue derivates from the chase-lev lock-free queue with adaptations: +# +# - single ring buffer (per scheduler); +# - on overflow: bulk push half the ring to `GlobalQueue`; +# - on empty: bulk grab up to half the ring from `GlobalQueue`; +# - bulk push operation; + +require "../fiber/queue" +require "./global_queue" + +module ExecutionContext + # :nodoc: + # + # Local queue or runnable fibers for schedulers. + # Bounded. + # First-in, first-out semantics (FIFO). + # Single producer, multiple consumers thread safety. + # + # Private to an execution context scheduler, except for stealing methods that + # can be called from any thread in the execution context. + class Runnables(N) + def initialize(@global_queue : GlobalQueue) + # head is an index to the buffer where the next fiber to dequeue is. + # + # tail is an index to the buffer where the next fiber to enqueue will be + # (on the next push). + # + # head is always behind tail (not empty) or equal (empty) but never after + # tail (the queue would have a negative size => bug). + @head = Atomic(UInt32).new(0) + @tail = Atomic(UInt32).new(0) + @buffer = uninitialized Fiber[N] + end + + @[AlwaysInline] + def capacity : Int32 + N + end + + # Tries to push fiber on the local runnable queue. If the run queue is full, + # pushes fiber on the global queue, which will grab the global lock. + # + # Executed only by the owner. + def push(fiber : Fiber) : Nil + # ported from Go: runqput + loop do + head = @head.get(:acquire) # sync with consumers + tail = @tail.get(:relaxed) + + if (tail &- head) < N + # put fiber to local queue + @buffer.to_unsafe[tail % N] = fiber + + # make the fiber available for consumption + @tail.set(tail &+ 1, :release) + return + end + + if push_slow(fiber, head, tail) + return + end + + # failed to advance head (another scheduler stole fibers), + # the queue isn't full, now the push above must succeed + end + end + + private def push_slow(fiber : Fiber, head : UInt32, tail : UInt32) : Bool + # ported from Go: runqputslow + n = (tail &- head) // 2 + raise "BUG: queue is not full" if n != N // 2 + + # first, try to grab half of the fibers from local queue + batch = uninitialized Fiber[N] # actually N // 2 + 1 but that doesn't compile + n.times do |i| + batch.to_unsafe[i] = @buffer.to_unsafe[(head &+ i) % N] + end + _, success = @head.compare_and_set(head, head &+ n, :acquire_release, :acquire) + return false unless success + + # append fiber to the batch + batch.to_unsafe[n] = fiber + + # link the fibers + n.times do |i| + batch.to_unsafe[i].schedlink = batch.to_unsafe[i &+ 1] + end + queue = Fiber::Queue.new(batch.to_unsafe[0], batch.to_unsafe[n], size: (n &+ 1).to_i32) + + # now put the batch on global queue (grabs the global lock) + @global_queue.bulk_push(pointerof(queue)) + + true + end + + # Tries to enqueue all the fibers in `queue` into the local queue. If the + # queue is full, the overflow will be pushed to the global queue; in that + # case this will temporarily acquire the global queue lock. + # + # Executed only by the owner. + def bulk_push(queue : Fiber::Queue*) : Nil + # ported from Go: runqputbatch + head = @head.get(:acquire) # sync with other consumers + tail = @tail.get(:relaxed) + + while !queue.value.empty? && (tail &- head) < N + fiber = queue.value.pop + @buffer.to_unsafe[tail % N] = fiber + tail &+= 1 + end + + # make the fibers available for consumption + @tail.set(tail, :release) + + # put any overflow on global queue + @global_queue.bulk_push(queue) unless queue.value.empty? + end + + # Dequeues the next runnable fiber from the local queue. + # + # Executed only by the owner. + # TODO: rename as `#shift?` + def get? : Fiber? + # ported from Go: runqget + + head = @head.get(:acquire) # sync with other consumers + loop do + tail = @tail.get(:relaxed) + return if tail == head + + fiber = @buffer.to_unsafe[head % N] + head, success = @head.compare_and_set(head, head &+ 1, :acquire_release, :acquire) + return fiber if success + end + end + + # Steals half the fibers from the local queue of `src` and puts them onto + # the local queue. Returns one of the stolen fibers, or `nil` on failure. + # + # Only executed from the owner (when the local queue is empty). + def steal_from(src : Runnables) : Fiber? + # ported from Go: runqsteal + + tail = @tail.get(:relaxed) + n = src.grab(@buffer.to_unsafe, tail) + return if n == 0 + + # 'dequeue' last fiber from @buffer + n &-= 1 + fiber = @buffer.to_unsafe[(tail &+ n) % N] + return fiber if n == 0 + + head = @head.get(:acquire) # sync with consumers + if tail &- head &+ n >= N + raise "BUG: local queue overflow" + end + + # make the fibers available for consumption + @tail.set(tail &+ n, :release) + + fiber + end + + # Grabs a batch of fibers from local queue into `buffer` of size N (normally + # the ring buffer of another `Runnables`) starting at `buffer_head`. Returns + # number of grabbed fibers. + # + # Can be executed by any scheduler. + protected def grab(buffer : Fiber*, buffer_head : UInt32) : UInt32 + # ported from Go: runqgrab + + head = @head.get(:acquire) # sync with other consumers + loop do + tail = @tail.get(:acquire) # sync with the producer + + n = tail &- head + n -= n // 2 + return 0_u32 if n == 0 # queue is empty + + if n > N // 2 + # read inconsistent head and tail + head = @head.get(:acquire) + next + end + + n.times do |i| + fiber = @buffer.to_unsafe[(head &+ i) % N] + buffer[(buffer_head &+ i) % N] = fiber + end + + # try to mark the fiber as consumed + head, success = @head.compare_and_set(head, head &+ n, :acquire_release, :acquire) + return n if success + end + end + + @[AlwaysInline] + def empty? : Bool + @head.get(:relaxed) == @tail.get(:relaxed) + end + + @[AlwaysInline] + def size : UInt32 + @tail.get(:relaxed) &- @head.get(:relaxed) + end + end +end From b66d1865dd1d7ed6f2d391fb21213a0add3b0190 Mon Sep 17 00:00:00 2001 From: Julien Portalier Date: Fri, 6 Dec 2024 15:01:08 +0100 Subject: [PATCH 07/14] Alt EventLoop#run(queue*, blocking) method --- src/crystal/event_loop.cr | 7 +++++ src/crystal/event_loop/iocp.cr | 8 ++++++ src/crystal/event_loop/libevent.cr | 44 +++++++++++++++++++++++++----- src/crystal/event_loop/polling.cr | 9 +++++- src/io/evented.cr | 14 ++++++++-- 5 files changed, 72 insertions(+), 10 deletions(-) diff --git a/src/crystal/event_loop.cr b/src/crystal/event_loop.cr index 0294abc2ef3d..1b2887a1c535 100644 --- a/src/crystal/event_loop.cr +++ b/src/crystal/event_loop.cr @@ -54,6 +54,13 @@ abstract class Crystal::EventLoop # events. abstract def run(blocking : Bool) : Bool + {% if flag?(:execution_context) %} + # Same as `#run` but collects runnable fibers into *queue* instead of + # enqueueing in parallel, so the caller is responsible and in control for + # when and how the fibers will be enqueued. + abstract def run(queue : Fiber::Queue*, blocking : Bool) : Nil + {% end %} + # Tells a blocking run loop to no longer wait for events to activate. It may # for example enqueue a NOOP event with an immediate (or past) timeout. Having # activated an event, the loop shall return, allowing the blocked thread to diff --git a/src/crystal/event_loop/iocp.cr b/src/crystal/event_loop/iocp.cr index 6e4175e3daee..3c84b2ce927d 100644 --- a/src/crystal/event_loop/iocp.cr +++ b/src/crystal/event_loop/iocp.cr @@ -55,6 +55,7 @@ class Crystal::EventLoop::IOCP < Crystal::EventLoop iocp end + # thread unsafe def run(blocking : Bool) : Bool enqueued = false @@ -66,6 +67,13 @@ class Crystal::EventLoop::IOCP < Crystal::EventLoop enqueued end + {% if flag?(:execution_context) %} + # thread unsafe + def run(queue : Fiber::Queue*, blocking : Bool) : Nil + run_impl(blocking) { |fiber| queue.value.push(fiber) } + end + {% end %} + # Runs the event loop and enqueues the fiber for the next upcoming event or # completion. private def run_impl(blocking : Bool, &) : Nil diff --git a/src/crystal/event_loop/libevent.cr b/src/crystal/event_loop/libevent.cr index 9c0b3d33b15c..459dccbc13ee 100644 --- a/src/crystal/event_loop/libevent.cr +++ b/src/crystal/event_loop/libevent.cr @@ -20,26 +20,56 @@ class Crystal::EventLoop::LibEvent < Crystal::EventLoop event_base.loop(flags) end + {% if flag?(:execution_context) %} + def run(queue : Fiber::Queue*, blocking : Bool) : Nil + Crystal.trace :evloop, "run", fiber: fiber, blocking: blocking + @runnables = queue + run(blocking) + ensure + @runnables = nil + end + + def callback_enqueue(fiber : Fiber) : Nil + Crystal.trace :evloop, "callback", fiber: fiber + if queue = @runnables + queue.value.push(fiber) + else + raise "BUG: libevent callback executed outside of #run(queue*) call" + end + end + {% end %} + def interrupt : Nil event_base.loop_exit end - # Create a new resume event for a fiber. + # Create a new resume event for a fiber (sleep). def create_resume_event(fiber : Fiber) : Crystal::EventLoop::LibEvent::Event event_base.new_event(-1, LibEvent2::EventFlags::None, fiber) do |s, flags, data| - data.as(Fiber).enqueue + f = data.as(Fiber) + {% if flag?(:execution_context) %} + event_loop = Crystal::EventLoop.current.as(Crystal::EventLoop::LibEvent) + event_loop.callback_enqueue(f) + {% else %} + f.enqueue + {% end %} end end - # Creates a timeout_event. + # Creates a timeout event (timeout action of select expression). def create_timeout_event(fiber) : Crystal::EventLoop::LibEvent::Event event_base.new_event(-1, LibEvent2::EventFlags::None, fiber) do |s, flags, data| f = data.as(Fiber) - if (select_action = f.timeout_select_action) + if select_action = f.timeout_select_action f.timeout_select_action = nil - select_action.time_expired(f) - else - f.enqueue + if select_action.time_expired? + {% if flag?(:execution_context) %} + event_loop = Crystal::EventLoop.current.as(Crystal::EventLoop::LibEvent) + event_loop.callback_enqueue(f) + {% else %} + f.enqueue + {% end %} + end end end end diff --git a/src/crystal/event_loop/polling.cr b/src/crystal/event_loop/polling.cr index f7fc36082a0e..24d94af02bcc 100644 --- a/src/crystal/event_loop/polling.cr +++ b/src/crystal/event_loop/polling.cr @@ -112,7 +112,7 @@ abstract class Crystal::EventLoop::Polling < Crystal::EventLoop end {% end %} - # NOTE: thread unsafe + # thread unsafe def run(blocking : Bool) : Bool system_run(blocking) do |fiber| {% if flag?(:execution_context) %} @@ -124,6 +124,13 @@ abstract class Crystal::EventLoop::Polling < Crystal::EventLoop true end + {% if flag?(:execution_context) %} + # thread unsafe + def run(queue : Fiber::Queue*, blocking : Bool) : Nil + system_run(blocking) { |fiber| queue.value.push(fiber) } + end + {% end %} + # fiber interface, see Crystal::EventLoop def create_resume_event(fiber : Fiber) : FiberEvent diff --git a/src/io/evented.cr b/src/io/evented.cr index b238830f284a..db601a83964f 100644 --- a/src/io/evented.cr +++ b/src/io/evented.cr @@ -20,7 +20,12 @@ module IO::Evented @read_timed_out = timed_out if reader = @readers.get?.try &.shift? - reader.enqueue + {% if flag?(:execution_context) && Crystal::EventLoop.has_constant?(:LibEvent) %} + event_loop = Crystal::EventLoop.current.as(Crystal::EventLoop::LibEvent) + event_loop.callback_enqueue(reader) + {% else %} + reader.enqueue + {% end %} end end @@ -29,7 +34,12 @@ module IO::Evented @write_timed_out = timed_out if writer = @writers.get?.try &.shift? - writer.enqueue + {% if flag?(:execution_context) && Crystal::EventLoop.has_constant?(:LibEvent) %} + event_loop = Crystal::EventLoop.current.as(Crystal::EventLoop::LibEvent) + event_loop.callback_enqueue(reader) + {% else %} + writer.enqueue + {% end %} end end From 961425cc08d60f6a52272cd28469ee82754ee7ea Mon Sep 17 00:00:00 2001 From: Julien Portalier Date: Fri, 6 Dec 2024 16:06:04 +0100 Subject: [PATCH 08/14] Add ExecutionContext::SingleThreaded scheduler Introduces the first EC scheduler that runs in a single thread. Uses the same queues (Runnables, GlobalQueue) as the multi-threaded scheduler that will come next. The Runnables local queue could be simplified (no parallel accesses, hence no need for atomics) at the expense of duplicating the implementation. The scheduler doesn't need to actively park the thread, since the event loops always block (when told to), even when they are no events, which acts as parking the thread. --- spec/std/thread_spec.cr | 8 +- src/execution_context/execution_context.cr | 3 +- src/execution_context/single_threaded.cr | 284 +++++++++++++++++++++ 3 files changed, 292 insertions(+), 3 deletions(-) create mode 100644 src/execution_context/single_threaded.cr diff --git a/spec/std/thread_spec.cr b/spec/std/thread_spec.cr index 5a43c7e429d1..bdfc31a9a65c 100644 --- a/spec/std/thread_spec.cr +++ b/spec/std/thread_spec.cr @@ -44,9 +44,13 @@ pending_interpreted describe: Thread do end it "names the thread" do - Thread.current.name.should be_nil - name = nil + {% if flag?(:execution_context) %} + Thread.current.name.should eq("DEFAULT") + {% else %} + Thread.current.name.should be_nil + {% end %} + name = nil thread = Thread.new(name: "some-name") do name = Thread.current.name end diff --git a/src/execution_context/execution_context.cr b/src/execution_context/execution_context.cr index 4342945e3812..48a80640dc99 100644 --- a/src/execution_context/execution_context.cr +++ b/src/execution_context/execution_context.cr @@ -4,6 +4,7 @@ require "../crystal/system/thread_linked_list" require "../fiber" require "../fiber/stack_pool" require "./scheduler" +require "./single_threaded" {% raise "ERROR: execution contexts require the `preview_mt` compilation flag" unless flag?(:preview_mt) %} @@ -17,7 +18,7 @@ module ExecutionContext # :nodoc: def self.init_default_context : Nil - raise NotImplementedError.new("No execution context implementations (yet)") + @@default = SingleThreaded.default end # Returns the default number of workers to start in the execution context. diff --git a/src/execution_context/single_threaded.cr b/src/execution_context/single_threaded.cr new file mode 100644 index 000000000000..3ae11579e0ce --- /dev/null +++ b/src/execution_context/single_threaded.cr @@ -0,0 +1,284 @@ +require "./global_queue" +require "./runnables" + +module ExecutionContext + # ST scheduler. Owns a single thread. + # + # Fully concurrent, limited parallelism: concurrency is limited to that + # thread; fibers running in this context will thus never run in parallel to + # each other, but they may still run in parallel to fibers running in other + # contexts (thus on another thread). + class SingleThreaded + include ExecutionContext + include ExecutionContext::Scheduler + + getter name : String + + protected getter thread : Thread + @main_fiber : Fiber + + @mutex : Thread::Mutex + @global_queue : GlobalQueue + @runnables : Runnables(256) + + getter stack_pool : Fiber::StackPool = Fiber::StackPool.new + getter event_loop : Crystal::EventLoop = Crystal::EventLoop.create + + @waiting = Atomic(Bool).new(false) + @parked = Atomic(Bool).new(false) + @spinning = Atomic(Bool).new(false) + @tick : Int32 = 0 + + # :nodoc: + protected def self.default : self + new("DEFAULT", hijack: true) + end + + def self.new(name : String) : self + new(name, hijack: false) + end + + protected def initialize(@name : String, hijack : Bool) + @mutex = Thread::Mutex.new + @global_queue = GlobalQueue.new(@mutex) + @runnables = Runnables(256).new(@global_queue) + + @thread = uninitialized Thread + @main_fiber = uninitialized Fiber + @thread = hijack ? hijack_current_thread : start_thread + + ExecutionContext.execution_contexts.push(self) + end + + # :nodoc: + def execution_context : self + self + end + + def stack_pool? : Fiber::StackPool? + @stack_pool + end + + # Initializes the scheduler on the current thread (usually the executable's + # main thread). + private def hijack_current_thread : Thread + thread = Thread.current + thread.internal_name = @name + thread.execution_context = self + thread.current_scheduler = self + @main_fiber = Fiber.new("#{@name}:loop", self) { run_loop } + thread + end + + # Creates a new thread to initialize the scheduler. + private def start_thread : Thread + Thread.new(name: @name) do |thread| + thread.execution_context = self + thread.current_scheduler = self + @main_fiber = thread.main_fiber + @main_fiber.name = "#{@name}:loop" + run_loop + end + end + + # :nodoc: + def spawn(*, name : String? = nil, same_thread : Bool, &block : ->) : Fiber + # whatever the value of same_thread: the fibers will always run on the + # same thread + self.spawn(name: name, &block) + end + + def enqueue(fiber : Fiber) : Nil + if ExecutionContext.current == self + # local enqueue + Crystal.trace :sched, "enqueue", fiber: fiber + @runnables.push(fiber) + else + # cross context enqueue + Crystal.trace :sched, "enqueue", fiber: fiber, to_context: self + @global_queue.push(fiber) + wake_scheduler + end + end + + # Enqueue a list of fibers in a single operation and returns a fiber to + # resume immediately. + # + # This is called after running the event loop for example. + private def enqueue_many(queue : Fiber::Queue*) : Fiber? + if fiber = queue.value.pop? + @runnables.bulk_push(queue) unless queue.value.empty? + fiber + end + end + + protected def reschedule : Nil + Crystal.trace :sched, "reschedule" + if fiber = quick_dequeue? + resume fiber unless fiber == @thread.current_fiber + else + # nothing to do: switch back to the main loop to spin/park + resume @main_fiber + end + end + + protected def resume(fiber : Fiber) : Nil + unless fiber.resumable? + if fiber.dead? + raise "BUG: tried to resume dead fiber #{fiber} (#{inspect})" + else + raise "BUG: can't resume running fiber #{fiber} (#{inspect})" + end + end + swapcontext(fiber) + end + + @[AlwaysInline] + private def quick_dequeue? : Fiber? + # every once in a while: dequeue from global queue to avoid two fibers + # constantly respawing each other to completely occupy the local queue + if (@tick &+= 1) % 61 == 0 + if fiber = @global_queue.pop? + return fiber + end + end + + # try local queue + if fiber = @runnables.get? + return fiber + end + + # try to refill local queue + if fiber = @global_queue.grab?(@runnables, divisor: 1) + return fiber + end + + # run the event loop to see if any event is activable + queue = Fiber::Queue.new + @event_loop.run(pointerof(queue), blocking: false) + return enqueue_many(pointerof(queue)) + end + + private def run_loop : Nil + Crystal.trace :sched, "started" + + loop do + if fiber = find_next_runnable + spin_stop if @spinning.get(:relaxed) + resume fiber + else + # the event loop enqueued a fiber (or was interrupted) or the + # scheduler was unparked: go for the next iteration + end + rescue exception + Crystal.print_error_buffered("BUG: %s#run_loop [%s] crashed", + self.class.name, @name, exception: exception) + end + end + + private def find_next_runnable : Fiber? + find_next_runnable do |fiber| + return fiber if fiber + end + end + + private def find_next_runnable(&) : Nil + queue = Fiber::Queue.new + + # nothing to do: start spinning + spinning do + yield @global_queue.grab?(@runnables, divisor: 1) + + @event_loop.run(pointerof(queue), blocking: false) + yield enqueue_many(pointerof(queue)) + end + + # block on the event loop, waiting for pending event(s) to activate + waiting do + # there is a time window between stop spinning and start waiting during + # which another context may have enqueued a fiber, check again before + # waiting on the event loop to avoid missing a runnable fiber which (may + # block for a long time): + yield @global_queue.grab?(@runnables, divisor: 1) + + @event_loop.run(pointerof(queue), blocking: true) + yield enqueue_many(pointerof(queue)) + + # the event loop was interrupted: restart the loop + return + end + end + + private def spinning(&) + spin_start + + 4.times do |iter| + spin_backoff(iter) unless iter == 0 + yield + end + + spin_stop + end + + private def spin_start : Nil + @spinning.set(true, :release) + end + + private def spin_stop : Nil + @spinning.set(false, :release) + end + + @[AlwaysInline] + private def spin_backoff(iter) + # OPTIMIZE: consider exponential backoff, but beware of latency to notice + # cross context enqueues + Thread.yield + end + + @[AlwaysInline] + private def waiting(&) + @waiting.set(true, :release) + begin + yield + ensure + @waiting.set(false, :release) + end + end + + # This method runs in parallel to the rest of the ST scheduler! + # + # This is called from another context _after_ enqueueing into the global + # queue to try and wakeup the ST thread running in parallel that may be + # running, spinning or waiting on the event loop. + private def wake_scheduler : Nil + if @spinning.get(:acquire) + return + end + + if @waiting.get(:acquire) + @event_loop.interrupt + end + end + + @[AlwaysInline] + def inspect(io : IO) : Nil + to_s(io) + end + + def to_s(io : IO) : Nil + io << "#<" << self.class.name << ":0x" + object_id.to_s(io, 16) + io << ' ' << name << '>' + end + + def status : String + if @spinning.get(:relaxed) + "spinning" + elsif @waiting.get(:relaxed) + "event-loop" + else + "running" + end + end + end +end From 8e290ad40b494d2a6c5d6af61f304cad2e066b1e Mon Sep 17 00:00:00 2001 From: Julien Portalier Date: Mon, 9 Dec 2024 16:01:40 +0100 Subject: [PATCH 09/14] Add ExecutionContext::MultiThreaded scheduler Introduces the second EC scheduler that runs in multiple threads. Uses the thread-safe queues (Runnables, GlobalQueue). Contrary to the ST scheduler, the MT scheduler needs to actively park the thread in addition to waiting on the event loop, because only one thread is allowed to run the event loop. --- spec/std/spec_helper.cr | 2 +- spec/std/thread_spec.cr | 6 +- spec/support/mt_abort_timeout.cr | 2 +- src/execution_context/execution_context.cr | 7 +- src/execution_context/multi_threaded.cr | 277 ++++++++++++++++++ .../multi_threaded/scheduler.cr | 273 +++++++++++++++++ 6 files changed, 563 insertions(+), 4 deletions(-) create mode 100644 src/execution_context/multi_threaded.cr create mode 100644 src/execution_context/multi_threaded/scheduler.cr diff --git a/spec/std/spec_helper.cr b/spec/std/spec_helper.cr index 8bb0a9e1a2f2..723186f22a65 100644 --- a/spec/std/spec_helper.cr +++ b/spec/std/spec_helper.cr @@ -43,7 +43,7 @@ def spawn_and_check(before : Proc(_), file = __FILE__, line = __LINE__, &block : # This is a workaround to ensure the "before" fiber # is unscheduled. Otherwise it might stay alive running the event loop - spawn(same_thread: true) do + spawn(same_thread: !{{flag?(:execution_context) && flag?(:mt)}}) do while x.get != 2 Fiber.yield end diff --git a/spec/std/thread_spec.cr b/spec/std/thread_spec.cr index bdfc31a9a65c..ccc78c1e9bc8 100644 --- a/spec/std/thread_spec.cr +++ b/spec/std/thread_spec.cr @@ -45,7 +45,11 @@ pending_interpreted describe: Thread do it "names the thread" do {% if flag?(:execution_context) %} - Thread.current.name.should eq("DEFAULT") + {% if flag?(:mt) %} + Thread.current.name.should match(/^DEFAULT-\d+$/) + {% else %} + Thread.current.name.should eq("DEFAULT") + {% end %} {% else %} Thread.current.name.should be_nil {% end %} diff --git a/spec/support/mt_abort_timeout.cr b/spec/support/mt_abort_timeout.cr index 7339da6c07a1..dc78f7f8552c 100644 --- a/spec/support/mt_abort_timeout.cr +++ b/spec/support/mt_abort_timeout.cr @@ -5,7 +5,7 @@ private SPEC_TIMEOUT = 15.seconds Spec.around_each do |example| done = Channel(Exception?).new - spawn(same_thread: true) do + spawn(same_thread: !{{flag?(:execution_context) && flag?(:mt)}}) do begin example.run rescue e diff --git a/src/execution_context/execution_context.cr b/src/execution_context/execution_context.cr index 48a80640dc99..0d5ccc298928 100644 --- a/src/execution_context/execution_context.cr +++ b/src/execution_context/execution_context.cr @@ -5,6 +5,7 @@ require "../fiber" require "../fiber/stack_pool" require "./scheduler" require "./single_threaded" +require "./multi_threaded" {% raise "ERROR: execution contexts require the `preview_mt` compilation flag" unless flag?(:preview_mt) %} @@ -18,7 +19,11 @@ module ExecutionContext # :nodoc: def self.init_default_context : Nil - @@default = SingleThreaded.default + {% if flag?(:mt) %} + @@default = MultiThreaded.default(default_workers_count) + {% else %} + @@default = SingleThreaded.default + {% end %} end # Returns the default number of workers to start in the execution context. diff --git a/src/execution_context/multi_threaded.cr b/src/execution_context/multi_threaded.cr new file mode 100644 index 000000000000..8a862e93b191 --- /dev/null +++ b/src/execution_context/multi_threaded.cr @@ -0,0 +1,277 @@ +require "./global_queue" +require "./multi_threaded/scheduler" + +module ExecutionContext + # MT scheduler. + # + # Owns multiple threads and starts a scheduler in each one. The number of + # threads is dynamic. Setting the minimum and maximum to the same value will + # start a fixed number of threads. + # + # Fully concurrent, fully parallel: fibers running in this context can be + # resumed by any thread in the context; fibers can run concurrently and in + # parallel to each other, in addition to running in parallel to every other + # fibers running in other contexts. + class MultiThreaded + include ExecutionContext + + getter name : String + + @mutex : Thread::Mutex + @condition : Thread::ConditionVariable + protected getter global_queue : GlobalQueue + + getter stack_pool : Fiber::StackPool = Fiber::StackPool.new + getter event_loop : Crystal::EventLoop = Crystal::EventLoop.create + @event_loop_lock = Atomic(Bool).new(false) + + @parked = Atomic(Int32).new(0) + @spinning = Atomic(Int32).new(0) + + # :nodoc: + protected def self.default(size : Int32) : self + new("DEFAULT", 1..size, hijack: true) + end + + # Starts a context with a maximum number of threads. Threads aren't started + # right away, but will be started as needed to increase parallelism up to + # the configured maximum. + def self.new(name : String, size : Int32) : self + new(name, 0..size, hijack: false) + end + + # Starts a context with a maximum number of threads. Threads aren't started + # right away, but will be started as needed to increase parallelism up to + # the configured maximum. + def self.new(name : String, size : Range(Nil, Int32)) : self + new(name, 0..size.end, hijack: false) + end + + # Starts a context with a minimum and a maximum number of threads. The + # minimum number of threads will be started right away, then threads will be + # started as needed to increase parallelism up to the configured maximum. + def self.new(name : String, size : Range(Int32, Int32)) : self + new(name, size, hijack: false) + end + + protected def initialize(@name : String, @size : Range(Int32, Int32), hijack : Bool) + raise ArgumentError.new("#{self.class.name} needs at least one thread") if @size.end < 1 + + @mutex = Thread::Mutex.new + @condition = Thread::ConditionVariable.new + + @global_queue = GlobalQueue.new(@mutex) + @schedulers = Array(Scheduler).new(@size.end) + @threads = Array(Thread).new(@size.end) + + @rng = Random::PCG32.new + + start_schedulers(hijack) + start_initial_threads(hijack) + + ExecutionContext.execution_contexts.push(self) + end + + # The number of threads that have been started. + def size : Int32 + @threads.size + end + + # The maximum number of threads that can be started. + def capacity : Int32 + @size.end + end + + def stack_pool? : Fiber::StackPool? + @stack_pool + end + + private def start_schedulers(hijack) + @size.end.times do |index| + @schedulers << Scheduler.new(self, "#{@name}-#{index}") + end + end + + private def start_initial_threads(hijack) + @size.begin.times do |index| + scheduler = @schedulers[index] + + if hijack && index == 0 + @threads << hijack_current_thread(scheduler, index) + else + @threads << start_thread(scheduler, index) + end + end + end + + # Attaches *scheduler* to the current `Thread`, usually the executable's + # main thread. Starts a `Fiber` to run the scheduler loop. + private def hijack_current_thread(scheduler, index) : Thread + thread = Thread.current + thread.internal_name = scheduler.name + thread.execution_context = self + thread.current_scheduler = scheduler + + scheduler.thread = thread + scheduler.main_fiber = Fiber.new("#{scheduler.name}:loop", self) do + scheduler.run_loop + end + + thread + end + + # Start a new `Thread` and attaches *scheduler*. Runs the scheduler loop + # directly in the thread's main `Fiber`. + private def start_thread(scheduler, index) : Thread + Thread.new(name: scheduler.name) do |thread| + thread.execution_context = self + thread.current_scheduler = scheduler + + scheduler.thread = thread + scheduler.main_fiber = thread.main_fiber + scheduler.main_fiber.name = "#{scheduler.name}:loop" + scheduler.run_loop + end + end + + def spawn(*, name : String? = nil, same_thread : Bool, &block : ->) : Fiber + raise ArgumentError.new("#{self.class.name}#spawn doesn't support same_thread:true") if same_thread + self.spawn(name: name, &block) + end + + def enqueue(fiber : Fiber) : Nil + if ExecutionContext.current == self + # local enqueue: push to local queue of current scheduler + ExecutionContext::Scheduler.current.enqueue(fiber) + else + # cross context: push to global queue + Crystal.trace :sched, "enqueue", fiber: fiber, to_context: self + @global_queue.push(fiber) + wake_scheduler + end + end + + # Pick a scheduler at random then iterate all schedulers to try to steal + # fibers from. + protected def steal(& : Scheduler ->) : Nil + return if size == 1 + + i = @rng.next_int + n = @schedulers.size + + n.times do |j| + if scheduler = @schedulers[(i &+ j) % n]? + yield scheduler + end + end + end + + protected def park_thread(&) : Fiber? + @mutex.synchronize do + # avoid races by checking queues again + if fiber = yield + return fiber + end + + Crystal.trace :sched, "park" + @parked.add(1, :acquire_release) + + @condition.wait(@mutex) + + # we don't decrement @parked because #wake_scheduler did + Crystal.trace :sched, "wakeup" + end + + nil + end + + # This method always runs in parallel! + # + # This can be called from any thread in the context but can also be called + # from external execution contexts, in which case the context may have its + # last thread about to park itself, and we must prevent the last thread from + # parking when there is a parallel cross context enqueue! + # + # OPTIMIZE: instead of blindly spending time (blocking progress on the + # current thread) to unpark a thread / start a new thread we could move the + # responsibility to an external OBSERVER to increase parallelism in a MT + # context when it detects pending work. + protected def wake_scheduler : Nil + # another thread is spinning: nothing to do (it shall notice the enqueue) + if @spinning.get(:relaxed) > 0 + return + end + + # interrupt a thread waiting on the event loop + if @event_loop_lock.get(:relaxed) + @event_loop.interrupt + return + end + + # we can check @parked without locking the mutex because we can't push to + # the global queue _and_ park the thread at the same time, so either the + # thread is already parked (and we must awake it) or it noticed (or will + # notice) the fiber in the global queue; + # + # we still rely on an atomic to make sure the actual value is visible by + # the current thread + if @parked.get(:acquire) > 0 + @mutex.synchronize do + # avoid race conditions + return if @parked.get(:relaxed) == 0 + return if @spinning.get(:relaxed) > 0 + + # increase the number of spinning threads _now_ to avoid multiple + # threads from trying to wakeup multiple threads at the same time + # + # we must also decrement the number of parked threads because another + # thread could lock the mutex and increment @spinning again before the + # signaled thread is resumed + spinning = @spinning.add(1, :acquire_release) + parked = @parked.sub(1, :acquire_release) + + @condition.signal + end + return + end + + # check if we can start another thread; no need for atomics, the values + # shall be rather stable over time and we check them again inside the + # mutex + return if @threads.size == @size.end + + @mutex.synchronize do + index = @threads.size + return if index == @size.end # check again + + @threads << start_thread(@schedulers[index], index) + end + end + + # Only one thread is allowed to run the event loop. Yields then returns true + # if the lock was acquired, otherwise returns false immediately. + protected def lock_evloop?(&) : Bool + if @event_loop_lock.swap(true, :acquire) == false + begin + yield + ensure + @event_loop_lock.set(false, :release) + end + true + else + false + end + end + + @[AlwaysInline] + def inspect(io : IO) : Nil + to_s(io) + end + + def to_s(io : IO) : Nil + io << "#<" << self.class.name << ":0x" + object_id.to_s(io, 16) + io << ' ' << name << '>' + end + end +end diff --git a/src/execution_context/multi_threaded/scheduler.cr b/src/execution_context/multi_threaded/scheduler.cr new file mode 100644 index 000000000000..270d25e7439b --- /dev/null +++ b/src/execution_context/multi_threaded/scheduler.cr @@ -0,0 +1,273 @@ +require "crystal/pointer_linked_list" +require "../runnables" + +module ExecutionContext + class MultiThreaded + # MT fiber scheduler. + # + # Owns a single thread inside a MT execution context. + class Scheduler + include ExecutionContext::Scheduler + + getter name : String + + # :nodoc: + property execution_context : MultiThreaded + protected property! thread : Thread + protected property! main_fiber : Fiber + + @global_queue : GlobalQueue + @runnables : Runnables(256) + @event_loop : Crystal::EventLoop + + @tick : Int32 = 0 + @spinning = false + @waiting = false + @parked = false + + protected def initialize(@execution_context, @name) + @global_queue = @execution_context.global_queue + @runnables = Runnables(256).new(@global_queue) + @event_loop = @execution_context.event_loop + end + + # :nodoc: + def spawn(*, name : String? = nil, same_thread : Bool, &block : ->) : Fiber + raise RuntimeError.new("#{self.class.name}#spawn doesn't support same_thread:true") if same_thread + self.spawn(name: name, &block) + end + + # Unlike `ExecutionContext::MultiThreaded#enqueue` this method is only + # safe to call on `ExecutionContext.current` which should always be the + # case, since cross context enqueues must call + # `ExecutionContext::MultiThreaded#enqueue` through `Fiber#enqueue`. + protected def enqueue(fiber : Fiber) : Nil + Crystal.trace :sched, "enqueue", fiber: fiber + @runnables.push(fiber) + @execution_context.wake_scheduler unless @execution_context.capacity == 1 + end + + # Enqueue a list of fibers in a single operation and returns a fiber to + # resume immediately. + # + # This is called after running the event loop for example. + private def enqueue_many(queue : Fiber::Queue*) : Fiber? + if fiber = queue.value.pop? + Crystal.trace :sched, "enqueue", size: queue.value.size, fiber: fiber + unless queue.value.empty? + @runnables.bulk_push(queue) + @execution_context.wake_scheduler unless @execution_context.capacity == 1 + end + fiber + end + end + + protected def reschedule : Nil + Crystal.trace :sched, "reschedule" + if fiber = quick_dequeue? + resume fiber unless fiber == thread.current_fiber + else + # nothing to do: switch back to the main loop to spin/wait/park + resume main_fiber + end + end + + protected def resume(fiber : Fiber) : Nil + Crystal.trace :sched, "resume", fiber: fiber + + # in a multithreaded environment the fiber may be dequeued before its + # running context has been saved on the stack (thread A tries to resume + # fiber but thread B didn't saved its context yet); we must wait until + # the context switch assembly saved all registers on the stack and set + # the fiber as resumable. + until fiber.resumable? + if fiber.dead? + raise "BUG: tried to resume dead fiber #{fiber} (#{inspect})" + end + + # OPTIMIZE: if the thread saving the fiber context has been preempted, + # this will block the current thread from progressing... shall we + # abort and reenqueue the fiber after MAX iterations? + Intrinsics.pause + end + + swapcontext(fiber) + end + + @[AlwaysInline] + private def quick_dequeue? : Fiber? + # every once in a while: dequeue from global queue to avoid two fibers + # constantly respawing each other to completely occupy the local queue + if (@tick &+= 1) % 61 == 0 + if fiber = @global_queue.pop? + return fiber + end + end + + # dequeue from local queue + if fiber = @runnables.get? + return fiber + end + end + + protected def run_loop : Nil + Crystal.trace :sched, "started" + + loop do + if fiber = find_next_runnable + spin_stop if @spinning + resume fiber + else + # the event loop enqueued a fiber (or was interrupted) or the + # scheduler was unparked: go for the next iteration + end + rescue exception + Crystal.print_error_buffered("BUG: %s#run_loop [%s] crashed", + self.class.name, @name, exception: exception) + end + end + + private def find_next_runnable : Fiber? + find_next_runnable do |fiber| + return fiber if fiber + end + end + + private def find_next_runnable(&) : Nil + queue = Fiber::Queue.new + + # nothing to do: start spinning + spinning do + yield @global_queue.grab?(@runnables, divisor: @execution_context.size) + + if @execution_context.lock_evloop? { @event_loop.run(pointerof(queue), blocking: false) } + if fiber = enqueue_many(pointerof(queue)) + spin_stop + yield fiber + end + end + + yield try_steal? + end + + # wait on the event loop for events and timers to activate + evloop_ran = @execution_context.lock_evloop? do + @waiting = true + + # there is a time window between stop spinning and start waiting + # during which another context may have enqueued a fiber, check again + # before blocking on the event loop to avoid missing a runnable fiber, + # which may block for a long time: + yield @global_queue.grab?(@runnables, divisor: @execution_context.size) + + # block on the event loop until an event is ready or the loop is + # interrupted + @event_loop.run(pointerof(queue), blocking: true) + ensure + @waiting = false + end + + if evloop_ran + yield enqueue_many(pointerof(queue)) + + # the event loop was interrupted: restart the loop + return + end + + # no runnable fiber and another thread is already running the event + # loop: park the thread until another scheduler or another context + # enqueues a fiber + @execution_context.park_thread do + # by the time we acquire the lock, another thread may have enqueued + # fiber(s) and already tried to wakeup a thread (race) so we must + # check again; we don't check the scheduler's local queue (it's empty) + + yield @global_queue.unsafe_grab?(@runnables, divisor: @execution_context.size) + yield try_steal? + + @parked = true + nil + end + @parked = false + + # immediately mark the scheduler as spinning (we just unparked); we + # don't increment the number of spinning threads since + # `ExecutionContext::MultiThreaded#wake_scheduler` already did + @spinning = true + end + + # This method always runs in parallel! + private def try_steal? : Fiber? + @execution_context.steal do |other| + if other == self + # no need to steal from ourselves + next + end + + if fiber = @runnables.steal_from(other.@runnables) + Crystal.trace :sched, "stole", from: other, size: @runnables.size, fiber: fiber + return fiber + end + end + end + + # OPTIMIZE: skip spinning if there are enough threads spinning already + private def spinning(&) + spin_start + + 4.times do |iter| + spin_backoff(iter) unless iter == 0 + yield + end + + spin_stop + end + + @[AlwaysInline] + private def spin_start : Nil + return if @spinning + + @spinning = true + @execution_context.@spinning.add(1, :acquire_release) + end + + @[AlwaysInline] + private def spin_stop : Nil + return unless @spinning + + @execution_context.@spinning.sub(1, :acquire_release) + @spinning = false + end + + @[AlwaysInline] + private def spin_backoff(iter) + # OPTIMIZE: consider exponential backoff, but beware of edge cases, like + # creating latency before we notice a cross context enqueue, for example + Thread.yield + end + + @[AlwaysInline] + def inspect(io : IO) : Nil + to_s(io) + end + + def to_s(io : IO) : Nil + io << "#<" << self.class.name << ":0x" + object_id.to_s(io, 16) + io << ' ' << @name << '>' + end + + def status : String + if @spinning + "spinning" + elsif @waiting + "event-loop" + elsif @parked + "parked" + else + "running" + end + end + end + end +end From da55a9fa4bb6eee5ae46a10ea6092fded65f41ac Mon Sep 17 00:00:00 2001 From: Julien Portalier Date: Mon, 9 Dec 2024 18:31:51 +0100 Subject: [PATCH 10/14] Add ExecutionContext::Isolated scheduler Introduces the last EC scheduler that runs a single fiber in a single thread. Contrary to the other schedulers, concurrency is disabled. Like the ST scheduler, the scheduler doesn't need to actively park the thread and merely waits on the event loop. --- spec/std/execution_context/spec_helper.cr | 32 ++++ src/execution_context/execution_context.cr | 1 + src/execution_context/isolated.cr | 183 +++++++++++++++++++++ 3 files changed, 216 insertions(+) create mode 100644 src/execution_context/isolated.cr diff --git a/spec/std/execution_context/spec_helper.cr b/spec/std/execution_context/spec_helper.cr index 9a1dbb881cee..73ed3eefe751 100644 --- a/spec/std/execution_context/spec_helper.cr +++ b/spec/std/execution_context/spec_helper.cr @@ -18,4 +18,36 @@ module ExecutionContext @counter.get(:relaxed) end end + + class TestTimeout + def initialize(@timeout : Time::Span = 2.seconds) + @start = Time.monotonic + @cancelled = Atomic(Bool).new(false) + end + + def cancel : Nil + @cancelled.set(true) + end + + def elapsed? + (Time.monotonic - @start) >= @timeout + end + + def done? + return true if @cancelled.get + raise "timeout reached" if elapsed? + false + end + + def sleep(interval = 100.milliseconds) : Nil + until done? + ::sleep interval + end + end + + def reset : Nil + @start = Time.monotonic + @cancelled.set(false) + end + end end diff --git a/src/execution_context/execution_context.cr b/src/execution_context/execution_context.cr index 0d5ccc298928..8c51c250cf90 100644 --- a/src/execution_context/execution_context.cr +++ b/src/execution_context/execution_context.cr @@ -6,6 +6,7 @@ require "../fiber/stack_pool" require "./scheduler" require "./single_threaded" require "./multi_threaded" +require "./isolated" {% raise "ERROR: execution contexts require the `preview_mt` compilation flag" unless flag?(:preview_mt) %} diff --git a/src/execution_context/isolated.cr b/src/execution_context/isolated.cr new file mode 100644 index 000000000000..73ff72efc278 --- /dev/null +++ b/src/execution_context/isolated.cr @@ -0,0 +1,183 @@ +require "fiber/queue" + +module ExecutionContext + # ST scheduler. Owns a single thread running a single fiber. + # + # Concurrency is disabled: calls to `#spawn` will create fibers in another + # execution context (defaults to `ExecutionContext.default`). Any calls that + # result in waiting (e.g. sleep, or socket read/write) will block the thread + # since there are no other fibers to switch to. + # + # The fiber will still run in parallel to other fibers running in other + # execution contexts. + # + # Isolated fibers can still communicate with other fibers running in other + # execution contexts using standard means, such as `Channel(T)`, `WaitGroup` + # or `Mutex`. They can also execute IO operations or sleep just like any other + # fiber. + # + # Example: + # + # ``` + # ExecutionContext::Isolated.new("Gtk") { Gtk.main } + # ``` + class Isolated + include ExecutionContext + include ExecutionContext::Scheduler + + getter name : String + + @mutex : Thread::Mutex + protected getter thread : Thread + @main_fiber : Fiber + + getter event_loop : Crystal::EventLoop = Crystal::EventLoop.create + + getter? running : Bool = true + @enqueued = false + @waiting = false + + def initialize(@name : String, @spawn_context = ExecutionContext.default, &@func : ->) + @mutex = Thread::Mutex.new + @thread = uninitialized Thread + @main_fiber = uninitialized Fiber + @thread = start_thread + ExecutionContext.execution_contexts.push(self) + end + + private def start_thread : Thread + Thread.new(name: @name) do |thread| + @thread = thread + thread.execution_context = self + thread.current_scheduler = self + thread.main_fiber.name = @name + @main_fiber = thread.main_fiber + run + end + end + + # :nodoc: + @[AlwaysInline] + def execution_context : Isolated + self + end + + # :nodoc: + def stack_pool : Fiber::StackPool + raise RuntimeError.new("No stack pool for isolated contexts") + end + + # :nodoc: + def stack_pool? : Fiber::StackPool? + end + + @[AlwaysInline] + def spawn(*, name : String? = nil, &block : ->) : Fiber + @spawn_context.spawn(name: name, &block) + end + + # :nodoc: + @[AlwaysInline] + def spawn(*, name : String? = nil, same_thread : Bool, &block : ->) : Fiber + raise ArgumentError.new("#{self.class.name}#spawn doesn't support same_thread:true") if same_thread + @spawn_context.spawn(name: name, &block) + end + + def enqueue(fiber : Fiber) : Nil + Crystal.trace :sched, "enqueue", fiber: fiber, context: self + + unless fiber == @main_fiber + raise RuntimeError.new("Concurrency is disabled in isolated contexts") + end + + @mutex.synchronize do + @enqueued = true + + if @waiting + # wake up the blocked thread + @waiting = false + @event_loop.interrupt + else + # race: enqueued before the other thread started waiting + end + end + end + + protected def reschedule : Nil + Crystal.trace :sched, "reschedule" + + loop do + @mutex.synchronize do + # race: another thread already re-enqueued the fiber + if @enqueued + Crystal.trace :sched, "resume" + @enqueued = false + @waiting = false + return + end + @waiting = true + end + + # wait on the event loop + queue = Fiber::Queue.new + @event_loop.run(pointerof(queue), blocking: true) + + if fiber = queue.pop? + break if fiber == @main_fiber && queue.empty? + raise RuntimeError.new("Concurrency is disabled in isolated contexts") + end + + # the evloop got interrupted: restart + end + + # cleanup + @mutex.synchronize do + @waiting = false + @enqueued = false + end + + Crystal.trace :sched, "resume" + end + + protected def resume(fiber : Fiber) : Nil + # in theory we could resume @main_fiber, but this method may only be + # called from the current execution context, which is @main_fiber; it + # doesn't make any sense for a fiber to resume itself + raise RuntimeError.new("Can't resume #{fiber} in #{self}") + end + + private def run + Crystal.trace :sched, "started" + @func.call + ensure + @running = false + ExecutionContext.execution_contexts.delete(self) + end + + @[AlwaysInline] + def join : Nil + @thread.join + end + + @[AlwaysInline] + def inspect(io : IO) : Nil + to_s(io) + end + + def to_s(io : IO) : Nil + io << "#<" << self.class.name << ":0x" + object_id.to_s(io, 16) + io << ' ' << name << '>' + end + + def status : String + if @waiting + "event-loop" + elsif @running + "running" + else + "shutdown" + end + end + end +end From 3bb10e5644c845c8df3f4f6cf89bd8f92b4b4888 Mon Sep 17 00:00:00 2001 From: Julien Portalier Date: Fri, 13 Dec 2024 18:49:52 +0100 Subject: [PATCH 11/14] Win32: use isolated execution context instead of bare thread --- src/crystal/system/win32/file_descriptor.cr | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/crystal/system/win32/file_descriptor.cr b/src/crystal/system/win32/file_descriptor.cr index 4a99d82e9134..c70f4c7edfc0 100644 --- a/src/crystal/system/win32/file_descriptor.cr +++ b/src/crystal/system/win32/file_descriptor.cr @@ -519,7 +519,11 @@ private module ConsoleUtils @@read_requests = Deque(ReadRequest).new @@bytes_read = Deque(Int32).new @@mtx = ::Thread::Mutex.new - @@reader_thread = ::Thread.new { reader_loop } + {% if flag?(:execution_context) %} + @@reader_thread = ExecutionContext.new("READER-LOOP") { reader_loop } + {% else %} + @@reader_thread = ::Thread.new { reader_loop } + {% end %} private def self.reader_loop while true From a230257aa6ee7c9af8663eb205ad543baa23912f Mon Sep 17 00:00:00 2001 From: Julien Portalier Date: Fri, 13 Dec 2024 18:50:18 +0100 Subject: [PATCH 12/14] std specs: use isolated execution context instead of bare thread --- .../execution_context/global_queue_spec.cr | 21 +++++++------------ spec/std/execution_context/runnables_spec.cr | 11 ++++------ spec/std/file_spec.cr | 3 ++- spec/std/thread/condition_variable_spec.cr | 7 ++++--- spec/std/thread/mutex_spec.cr | 7 ++++--- spec/std/thread_spec.cr | 11 +++++----- spec/support/thread.cr | 9 ++++++++ 7 files changed, 36 insertions(+), 33 deletions(-) create mode 100644 spec/support/thread.cr diff --git a/spec/std/execution_context/global_queue_spec.cr b/spec/std/execution_context/global_queue_spec.cr index 838a31406c01..92ad12db00d2 100644 --- a/spec/std/execution_context/global_queue_spec.cr +++ b/spec/std/execution_context/global_queue_spec.cr @@ -1,4 +1,5 @@ require "./spec_helper" +require "../../support/thread" describe ExecutionContext::GlobalQueue do it "#initialize" do @@ -98,10 +99,10 @@ describe ExecutionContext::GlobalQueue do increments = 15 queue = ExecutionContext::GlobalQueue.new(Thread::Mutex.new) ready = Thread::WaitGroup.new(n) - shutdown = Thread::WaitGroup.new(n) + threads = Array(Thread).new(n) n.times do |i| - Thread.new(name: "ONE-#{i}") do |thread| + threads << new_thread(name: "ONE-#{i}") do slept = 0 ready.done @@ -117,10 +118,6 @@ describe ExecutionContext::GlobalQueue do break end end - rescue exception - Crystal::System.print_error "\nthread: #{thread.name}: exception: #{exception}" - ensure - shutdown.done end end ready.wait @@ -130,7 +127,7 @@ describe ExecutionContext::GlobalQueue do Thread.sleep(10.nanoseconds) if i % 10 == 9 end - shutdown.wait + threads.each(&.join) # must have dequeued each fiber exactly X times fibers.each { |fc| fc.counter.should eq(increments) } @@ -146,10 +143,10 @@ describe ExecutionContext::GlobalQueue do queue = ExecutionContext::GlobalQueue.new(Thread::Mutex.new) ready = Thread::WaitGroup.new(n) - shutdown = Thread::WaitGroup.new(n) + threads = Array(Thread).new(n) n.times do |i| - Thread.new(name: "BULK-#{i}") do |thread| + threads << new_thread("BULK-#{i}") do slept = 0 r = ExecutionContext::Runnables(3).new(queue) @@ -200,10 +197,6 @@ describe ExecutionContext::GlobalQueue do slept += 1 Thread.sleep(1.nanosecond) # don't burn CPU end - rescue exception - Crystal::System.print_error "\nthread #{thread.name} raised: #{exception}" - ensure - shutdown.done end end ready.wait @@ -216,7 +209,7 @@ describe ExecutionContext::GlobalQueue do Thread.sleep(10.nanoseconds) if i % 4 == 3 end - shutdown.wait + threads.each(&.join) # must have dequeued each fiber exactly X times (no less, no more) fibers.each { |fc| fc.counter.should eq(increments) } diff --git a/spec/std/execution_context/runnables_spec.cr b/spec/std/execution_context/runnables_spec.cr index 6fa342675402..a31c59c86e0c 100644 --- a/spec/std/execution_context/runnables_spec.cr +++ b/spec/std/execution_context/runnables_spec.cr @@ -1,4 +1,5 @@ require "./spec_helper" +require "../../support/thread" describe ExecutionContext::Runnables do it "#initialize" do @@ -190,14 +191,14 @@ describe ExecutionContext::Runnables do global_queue = ExecutionContext::GlobalQueue.new(Thread::Mutex.new) ready = Thread::WaitGroup.new(n) - shutdown = Thread::WaitGroup.new(n) + threads = Array(Thread).new(n) all_runnables = Array(ExecutionContext::Runnables(16)).new(n) do ExecutionContext::Runnables(16).new(global_queue) end n.times do |i| - Thread.new(name: "RUN-#{i}") do |thread| + threads << new_thread(name: "RUN-#{i}") do runnables = all_runnables[i] slept = 0 @@ -239,10 +240,6 @@ describe ExecutionContext::Runnables do slept += 1 Thread.sleep(1.nanosecond) # don't burn CPU end - rescue exception - Crystal::System.print_error "\nthread #{thread.name} raised: #{exception}" - ensure - shutdown.done end end ready.wait @@ -255,7 +252,7 @@ describe ExecutionContext::Runnables do Thread.sleep(10.nanoseconds) if i % 2 == 1 end - shutdown.wait + threads.map(&.join) # must have dequeued each fiber exactly X times (no less, no more) fibers.each { |fc| fc.counter.should eq(increments) } diff --git a/spec/std/file_spec.cr b/spec/std/file_spec.cr index 0f88b2028c2f..29add93ad6f5 100644 --- a/spec/std/file_spec.cr +++ b/spec/std/file_spec.cr @@ -1,4 +1,5 @@ require "./spec_helper" +require "../support/thread" private def it_raises_on_null_byte(operation, file = __FILE__, line = __LINE__, end_line = __END_LINE__, &block) it "errors on #{operation}", file, line, end_line do @@ -109,7 +110,7 @@ describe "File" do # thread or process also opened the file; we should pass # O_NONBLOCK to the open(2) call itself, not afterwards file = nil - Thread.new { file = File.new(path, "w", blocking: nil) } + new_thread { file = File.new(path, "w", blocking: nil) } begin File.open(path, "r", blocking: false) do |file| diff --git a/spec/std/thread/condition_variable_spec.cr b/spec/std/thread/condition_variable_spec.cr index 1bf78f797357..f682b418e6f3 100644 --- a/spec/std/thread/condition_variable_spec.cr +++ b/spec/std/thread/condition_variable_spec.cr @@ -1,4 +1,5 @@ require "../spec_helper" +require "../../support/thread" # interpreter doesn't support threads yet (#14287) pending_interpreted describe: Thread::ConditionVariable do @@ -7,7 +8,7 @@ pending_interpreted describe: Thread::ConditionVariable do cond = Thread::ConditionVariable.new mutex.synchronize do - Thread.new do + new_thread do mutex.synchronize { cond.signal } end @@ -22,7 +23,7 @@ pending_interpreted describe: Thread::ConditionVariable do waiting = 0 5.times do - Thread.new do + new_thread do mutex.synchronize do waiting += 1 cv1.wait(mutex) @@ -78,7 +79,7 @@ pending_interpreted describe: Thread::ConditionVariable do cond = Thread::ConditionVariable.new mutex.synchronize do - Thread.new do + new_thread do mutex.synchronize { cond.signal } end diff --git a/spec/std/thread/mutex_spec.cr b/spec/std/thread/mutex_spec.cr index 99f3c5d385c3..8fb8e484e935 100644 --- a/spec/std/thread/mutex_spec.cr +++ b/spec/std/thread/mutex_spec.cr @@ -1,4 +1,5 @@ require "../spec_helper" +require "../../support/thread" # interpreter doesn't support threads yet (#14287) pending_interpreted describe: Thread::Mutex do @@ -7,7 +8,7 @@ pending_interpreted describe: Thread::Mutex do mutex = Thread::Mutex.new threads = Array.new(10) do - Thread.new do + new_thread do mutex.synchronize { a += 1 } end end @@ -22,7 +23,7 @@ pending_interpreted describe: Thread::Mutex do mutex.try_lock.should be_false expect_raises(RuntimeError) { mutex.lock } mutex.unlock - Thread.new { mutex.synchronize { } }.join + new_thread { mutex.synchronize { } }.join end it "won't unlock from another thread" do @@ -30,7 +31,7 @@ pending_interpreted describe: Thread::Mutex do mutex.lock expect_raises(RuntimeError) do - Thread.new { mutex.unlock }.join + new_thread { mutex.unlock }.join end mutex.unlock diff --git a/spec/std/thread_spec.cr b/spec/std/thread_spec.cr index ccc78c1e9bc8..3dd550cffe4d 100644 --- a/spec/std/thread_spec.cr +++ b/spec/std/thread_spec.cr @@ -1,16 +1,17 @@ require "./spec_helper" +require "../support/thread" # interpreter doesn't support threads yet (#14287) pending_interpreted describe: Thread do it "allows passing an argumentless fun to execute" do a = 0 - thread = Thread.new { a = 1; 10 } + thread = new_thread { a = 1; 10 } thread.join a.should eq(1) end it "raises inside thread and gets it on join" do - thread = Thread.new { raise "OH NO" } + thread = new_thread { raise "OH NO" } expect_raises Exception, "OH NO" do thread.join end @@ -18,7 +19,7 @@ pending_interpreted describe: Thread do it "returns current thread object" do current = nil - thread = Thread.new { current = Thread.current } + thread = new_thread { current = Thread.current } thread.join current.should be(thread) current.should_not be(Thread.current) @@ -31,7 +32,7 @@ pending_interpreted describe: Thread do it "yields the processor" do done = false - thread = Thread.new do + thread = new_thread do 3.times { Thread.yield } done = true end @@ -55,7 +56,7 @@ pending_interpreted describe: Thread do {% end %} name = nil - thread = Thread.new(name: "some-name") do + thread = new_thread(name: "some-name") do name = Thread.current.name end thread.name.should eq("some-name") diff --git a/spec/support/thread.cr b/spec/support/thread.cr new file mode 100644 index 000000000000..df8c0aa37268 --- /dev/null +++ b/spec/support/thread.cr @@ -0,0 +1,9 @@ +{% begin %} + def new_thread(name = nil, &block) : Thread + {% if flag?(:execution_context) %} + ExecutionContext::Isolated.new(name: name || "SPEC") { block.call }.@thread + {% else %} + Thread.new(name) { block.call } + {% end %} + end +{% end %} From ba15364218fe2864b6fff462807568d9e9bdb1ca Mon Sep 17 00:00:00 2001 From: Julien Portalier Date: Fri, 20 Dec 2024 19:31:30 +0100 Subject: [PATCH 13/14] WIP: add monitor thread --- src/execution_context/execution_context.cr | 2 + src/execution_context/monitor.cr | 87 ++++++++++++++++++++++ 2 files changed, 89 insertions(+) create mode 100644 src/execution_context/monitor.cr diff --git a/src/execution_context/execution_context.cr b/src/execution_context/execution_context.cr index 8c51c250cf90..c4cdbc2f336b 100644 --- a/src/execution_context/execution_context.cr +++ b/src/execution_context/execution_context.cr @@ -7,6 +7,7 @@ require "./scheduler" require "./single_threaded" require "./multi_threaded" require "./isolated" +require "./monitor" {% raise "ERROR: execution contexts require the `preview_mt` compilation flag" unless flag?(:preview_mt) %} @@ -25,6 +26,7 @@ module ExecutionContext {% else %} @@default = SingleThreaded.default {% end %} + @@monitor = Monitor.new end # Returns the default number of workers to start in the execution context. diff --git a/src/execution_context/monitor.cr b/src/execution_context/monitor.cr new file mode 100644 index 000000000000..bf71892102c7 --- /dev/null +++ b/src/execution_context/monitor.cr @@ -0,0 +1,87 @@ +module ExecutionContext + class Monitor + struct Timer + def initialize(@every : Time::Span) + @last = Time.monotonic + end + + def elapsed?(now) + ret = @last + @every <= now + @last = now if ret + ret + end + end + + DEFAULT_EVERY = 10.milliseconds + DEFAULT_COLLECT_STACKS_EVERY = 5.seconds + + def initialize( + @every = DEFAULT_EVERY, + collect_stacks_every = DEFAULT_COLLECT_STACKS_EVERY, + ) + @collect_stacks_timer = Timer.new(collect_stacks_every) + + # FIXME: should be an ExecutionContext::Isolated instead of bare Thread? + # it might print to STDERR (requires evloop) for example; it may also + # allocate memory, for example to raise an exception (gc can run in the + # thread, running finalizers) which is probably not an issue. + @thread = uninitialized Thread + @thread = Thread.new(name: "SYSMON") { run_loop } + end + + # TODO: slow parallelism: instead of actively trying to wakeup, which can be + # expensive and a source of contention leading to waste more time than + # running the enqueued fiber(s) directly, the monitor thread could check the + # queues of MT schedulers and decide to start/wake threads, it could also + # complain that a fiber has been asked to yield numerous times. + # + # TODO: detect schedulers that have been stuck running the same fiber since + # the previous iteration (check current fiber & scheduler tick to avoid ABA + # issues), then mark the fiber to trigger a cooperative yield, for example, + # `Fiber.maybe_yield` could be called at potential cancellation points that + # would otherwise not need to block now (IO, mutexes, schedulers, manually + # called in loops, ...) which could lead fiber execution time be more fair. + # + # TODO: if an execution context didn't have the opportunity to run its + # event-loop since the previous iteration, then the monitor thread may + # choose to run it; it would avoid a set of fibers to always resume + # themselves at the expense of pending events. + # + # TODO: run the GC on low application activity? + private def run_loop : Nil + every do |now| + collect_stacks if @collect_stacks_timer.elapsed?(now) + end + end + + # Executes the block at exact intervals (depending on the OS scheduler + # precision and overall OS load), without counting the time to execute the + # block. + # + # OPTIMIZE: exponential backoff (and/or park) when all schedulers are + # pending to reduce CPU usage; thread wake up would have to signal the + # monitor thread. + private def every(&) + remaining = @every + + loop do + Thread.sleep(remaining) + now = Time.monotonic + yield(now) + remaining = (now + @every - Time.monotonic).clamp(Time::Span.zero..) + rescue exception + Crystal.print_error_buffered("BUG: %s#every crashed", + self.class.name, exception: exception) + end + end + + # Iterates each ExecutionContext and collects unused Fiber stacks. + # + # OPTIMIZE: should maybe happen during GC collections (?) + private def collect_stacks + Crystal.trace :sched, "collect_stacks" do + ExecutionContext.each(&.stack_pool?.try(&.collect)) + end + end + end +end From f13115627c601f66728b903d935fe31f234b910b Mon Sep 17 00:00:00 2001 From: Julien Portalier Date: Tue, 7 Jan 2025 12:14:39 +0100 Subject: [PATCH 14/14] Fix: signal handler musn't depend on the event loop Only a limited set of POSIX functions are signal safe, and the system functions that the event loop implementations can rely on isn't in the list (e.g. epoll, kevent, malloc, ...). --- src/crystal/system/unix/signal.cr | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/crystal/system/unix/signal.cr b/src/crystal/system/unix/signal.cr index f65b529bf0fb..e3675843c450 100644 --- a/src/crystal/system/unix/signal.cr +++ b/src/crystal/system/unix/signal.cr @@ -33,7 +33,7 @@ module Crystal::System::Signal action.sa_flags = LibC::SA_RESTART action.sa_sigaction = LibC::SigactionHandlerT.new do |value, _, _| - writer.write_bytes(value) unless writer.closed? + FileDescriptor.write_fully(writer.fd, pointerof(value)) unless writer.closed? end LibC.sigemptyset(pointerof(action.@sa_mask)) LibC.sigaction(signal, pointerof(action), nil) @@ -85,7 +85,9 @@ module Crystal::System::Signal private def self.start_loop spawn(name: "signal-loop") do loop do - value = reader.read_bytes(Int32) + buf = uninitialized StaticArray(UInt8, 4) + reader.read_fully(buf.to_slice) + value = buf.unsafe_as(Int32) rescue IO::Error next else