diff --git a/src/concurrent.cr b/src/concurrent.cr index 07ae945a84f6..1f1ad04bfd06 100644 --- a/src/concurrent.cr +++ b/src/concurrent.cr @@ -1,8 +1,13 @@ require "fiber" require "channel" -require "crystal/scheduler" require "crystal/tracing" +{% if flag?(:execution_context) %} + require "execution_context" +{% else %} + require "crystal/scheduler" +{% end %} + # Blocks the current fiber for the specified number of seconds. # # While this fiber is waiting this time, other ready-to-execute @@ -12,8 +17,7 @@ def sleep(seconds : Number) : Nil if seconds < 0 raise ArgumentError.new "Sleep seconds must be positive" end - - Crystal::Scheduler.sleep(seconds.seconds) + sleep(seconds.seconds) end # Blocks the current Fiber for the specified time span. @@ -21,16 +25,28 @@ end # While this fiber is waiting this time, other ready-to-execute # fibers might start their execution. def sleep(time : Time::Span) : Nil - Crystal::Scheduler.sleep(time) + Crystal.trace :sched, "sleep", for: time + + {% if flag?(:execution_context) %} + Fiber.current.resume_event.add(time) + ExecutionContext.reschedule + {% else %} + Crystal::Scheduler.sleep(time) + {% end %} end # Blocks the current fiber forever. # # Meanwhile, other ready-to-execute fibers might start their execution. def sleep : Nil - Crystal::Scheduler.reschedule + {% if flag?(:execution_context) %} + ExecutionContext.reschedule + {% else %} + Crystal::Scheduler.reschedule + {% end %} end +{% begin %} # Spawns a new fiber. # # NOTE: The newly created fiber doesn't run as soon as spawned. @@ -64,12 +80,17 @@ end # wg.wait # ``` def spawn(*, name : String? = nil, same_thread = false, &block) - fiber = Fiber.new(name, &block) - Crystal.trace :sched, "spawn", fiber: fiber - {% if flag?(:preview_mt) %} fiber.set_current_thread if same_thread {% end %} - fiber.enqueue - fiber + {% if flag?(:execution_context) %} + ExecutionContext::Scheduler.current.spawn(name: name, same_thread: same_thread, &block) + {% else %} + fiber = Fiber.new(name, &block) + Crystal.trace :sched, "spawn", fiber: fiber + {% if flag?(:preview_mt) %} fiber.set_current_thread if same_thread {% end %} + fiber.enqueue + fiber + {% end %} end +{% end %} # Spawns a fiber by first creating a `Proc`, passing the *call*'s # expressions to it, and letting the `Proc` finally invoke the *call*. diff --git a/src/crystal/event_loop.cr b/src/crystal/event_loop.cr index 45fc9e4f8558..7366ac319667 100644 --- a/src/crystal/event_loop.cr +++ b/src/crystal/event_loop.cr @@ -23,12 +23,20 @@ abstract class Crystal::EventLoop @[AlwaysInline] def self.current : self - Crystal::Scheduler.event_loop + {% if flag?(:execution_context) %} + ExecutionContext.current.event_loop + {% else %} + Crystal::Scheduler.event_loop + {% end %} end @[AlwaysInline] - def self.current? : self? - Crystal::Scheduler.event_loop? + def self.current? : self | Nil + {% if flag?(:execution_context) %} + ExecutionContext.current.event_loop + {% else %} + Crystal::Scheduler.event_loop? + {% end %} end # Runs the loop. diff --git a/src/crystal/event_loop/polling.cr b/src/crystal/event_loop/polling.cr index 774cc7060715..dbca6f659e08 100644 --- a/src/crystal/event_loop/polling.cr +++ b/src/crystal/event_loop/polling.cr @@ -115,7 +115,11 @@ abstract class Crystal::EventLoop::Polling < Crystal::EventLoop # NOTE: thread unsafe def run(blocking : Bool) : Bool system_run(blocking) do |fiber| - Crystal::Scheduler.enqueue(fiber) + {% if flag?(:execution_context) %} + fiber.execution_context.enqueue(fiber) + {% else %} + Crystal::Scheduler.enqueue(fiber) + {% end %} end true end @@ -303,13 +307,21 @@ abstract class Crystal::EventLoop::Polling < Crystal::EventLoop Polling.arena.free(index) do |pd| pd.value.@readers.ready_all do |event| pd.value.@event_loop.try(&.unsafe_resume_io(event) do |fiber| - Crystal::Scheduler.enqueue(fiber) + {% if flag?(:execution_context) %} + fiber.execution_context.enqueue(fiber) + {% else %} + Crystal::Scheduler.enqueue(fiber) + {% end %} end) end pd.value.@writers.ready_all do |event| pd.value.@event_loop.try(&.unsafe_resume_io(event) do |fiber| - Crystal::Scheduler.enqueue(fiber) + {% if flag?(:execution_context) %} + fiber.execution_context.enqueue(fiber) + {% else %} + Crystal::Scheduler.enqueue(fiber) + {% end %} end) end diff --git a/src/crystal/scheduler.cr b/src/crystal/scheduler.cr index 637adcffa6a1..c8c8978be280 100644 --- a/src/crystal/scheduler.cr +++ b/src/crystal/scheduler.cr @@ -1,3 +1,5 @@ +{% skip_file if flag?(:execution_context) %} + require "crystal/event_loop" require "crystal/system/print_error" require "./fiber_channel" @@ -67,7 +69,6 @@ class Crystal::Scheduler end def self.sleep(time : Time::Span) : Nil - Crystal.trace :sched, "sleep", for: time Thread.current.scheduler.sleep(time) end diff --git a/src/crystal/system/thread.cr b/src/crystal/system/thread.cr index 92136d1f3989..2b5e06498798 100644 --- a/src/crystal/system/thread.cr +++ b/src/crystal/system/thread.cr @@ -68,6 +68,39 @@ class Thread getter name : String? + {% if flag?(:execution_context) %} + # :nodoc: + getter! execution_context : ExecutionContext + + # :nodoc: + property! current_scheduler : ExecutionContext::Scheduler + + # :nodoc: + def execution_context=(@execution_context : ExecutionContext) : ExecutionContext + main_fiber.execution_context = execution_context + end + + # :nodoc: + def dead_fiber=(@dead_fiber : Fiber) : Fiber + end + + # :nodoc: + def dead_fiber? : Fiber? + if fiber = @dead_fiber + @dead_fiber = nil + fiber + end + end + {% else %} + # :nodoc: + getter scheduler : Crystal::Scheduler { Crystal::Scheduler.new(self) } + + # :nodoc: + def scheduler? : ::Crystal::Scheduler? + @scheduler + end + {% end %} + def self.unsafe_each(&) # nothing to iterate when @@threads is nil + don't lazily allocate in a # method called from a GC collection callback! @@ -154,14 +187,6 @@ class Thread thread.name = name end - # :nodoc: - getter scheduler : Crystal::Scheduler { Crystal::Scheduler.new(self) } - - # :nodoc: - def scheduler? : ::Crystal::Scheduler? - @scheduler - end - protected def start Thread.threads.push(self) Thread.current = self diff --git a/src/crystal/system/unix/signal.cr b/src/crystal/system/unix/signal.cr index a68108ad327a..7505032d85bc 100644 --- a/src/crystal/system/unix/signal.cr +++ b/src/crystal/system/unix/signal.cr @@ -2,6 +2,7 @@ require "c/signal" require "c/stdio" require "c/sys/wait" require "c/unistd" +require "../print_error" module Crystal::System::Signal # The number of libc functions that can be called safely from a signal(2) diff --git a/src/crystal/tracing.cr b/src/crystal/tracing.cr index 56c99624f8da..4148477a33c4 100644 --- a/src/crystal/tracing.cr +++ b/src/crystal/tracing.cr @@ -81,6 +81,16 @@ module Crystal write value.name || '?' end + {% if flag?(:execution_context) %} + def write(value : ExecutionContext) : Nil + write value.name + end + + def write(value : ExecutionContext::Scheduler) : Nil + write value.name + end + {% end %} + def write(value : Pointer) : Nil write "0x" System.to_int_slice(value.address, 16, true, 2) { |bytes| write(bytes) } diff --git a/src/execution_context/execution_context.cr b/src/execution_context/execution_context.cr new file mode 100644 index 000000000000..08ef187734d8 --- /dev/null +++ b/src/execution_context/execution_context.cr @@ -0,0 +1,100 @@ +require "../crystal/event_loop" +require "../crystal/system/thread" +require "../crystal/system/thread_linked_list" +require "../fiber" +require "../fiber/stack_pool" +require "./scheduler" + +{% raise "ERROR: execution contexts require the `preview_mt` compilation flag" unless flag?(:preview_mt) %} + +module ExecutionContext + @@default : ExecutionContext? + + @[AlwaysInline] + def self.default : ExecutionContext + @@default.not_nil!("expected default execution context to have been setup") + end + + # :nodoc: + def self.init_default_context : Nil + raise NotImplementedError.new("No execution context implementations (yet)") + end + + # Returns the default number of workers to start in the execution context. + def self.default_workers_count : Int32 + ENV["CRYSTAL_WORKERS"]?.try(&.to_i?) || Math.min(System.cpu_count.to_i, 32) + end + + # :nodoc: + protected class_getter(execution_contexts) { Thread::LinkedList(ExecutionContext).new } + + # :nodoc: + property next : ExecutionContext? + + # :nodoc: + property previous : ExecutionContext? + + # :nodoc: + def self.unsafe_each(&) : Nil + @@execution_contexts.try(&.unsafe_each { |execution_context| yield execution_context }) + end + + def self.each(&) : Nil + execution_contexts.each { |execution_context| yield execution_context } + end + + @[AlwaysInline] + def self.current : ExecutionContext + Thread.current.execution_context + end + + # Tells the current scheduler to suspend the current fiber and resume the + # next runnable fiber. The current fiber will never be resumed; you're + # responsible to reenqueue it. + # + # This method is safe as it only operates on the current `ExecutionContext` + # and `Scheduler`. + @[AlwaysInline] + def self.reschedule : Nil + Scheduler.current.reschedule + end + + # Tells the current scheduler to suspend the current fiber and to resume + # *fiber* instead. The current fiber will never be resumed; you're responsible + # to reenqueue it. + # + # Raises `RuntimeError` if the fiber doesn't belong to the current execution + # context. + # + # This method is safe as it only operates on the current `ExecutionContext` + # and `Scheduler`. + def self.resume(fiber : Fiber) : Nil + if fiber.execution_context == current + Scheduler.current.resume(fiber) + else + raise RuntimeError.new("Can't resume fiber from #{fiber.execution_context} into #{current.execution_context}") + end + end + + # Creates a new fiber then calls `#enqueue` to add it to the execution + # context. + # + # May be called from any `ExecutionContext` (i.e. must be thread-safe). + def spawn(*, name : String? = nil, &block : ->) : Fiber + Fiber.new(name, self, &block).tap { |fiber| enqueue(fiber) } + end + + # Legacy support for the `same_thread` argument. Each execution context may + # decide to support it or not (e.g. a single threaded context can accept it). + abstract def spawn(*, name : String? = nil, same_thread : Bool, &block : ->) : Fiber + + abstract def stack_pool : Fiber::StackPool + abstract def stack_pool? : Fiber::StackPool? + + abstract def event_loop : Crystal::EventLoop + + # Enqueues a fiber to be resumed inside the execution context. + # + # May be called from any ExecutionContext (i.e. must be thread-safe). + abstract def enqueue(fiber : Fiber) : Nil +end diff --git a/src/execution_context/scheduler.cr b/src/execution_context/scheduler.cr new file mode 100644 index 000000000000..06068d07ea87 --- /dev/null +++ b/src/execution_context/scheduler.cr @@ -0,0 +1,83 @@ +module ExecutionContext + module Scheduler + @[AlwaysInline] + def self.current : Scheduler + Thread.current.current_scheduler + end + + abstract def thread : Thread + abstract def execution_context : ExecutionContext + + # Instantiates a fiber and enqueues it into the scheduler's local queue. + def spawn(*, name : String? = nil, &block : ->) : Fiber + fiber = Fiber.new(name, execution_context, &block) + enqueue(fiber) + fiber + end + + # Legacy support for the *same_thread* argument. Each execution context may + # decide to support it or not (e.g. a single threaded context can accept it). + abstract def spawn(*, name : String? = nil, same_thread : Bool, &block : ->) : Fiber + + # Suspends the execution of the current fiber and resumes the next runnable + # fiber. + # + # Unsafe. Must only be called on `ExecutionContext.current`. Prefer + # `ExecutionContext.reschedule` instead. + protected abstract def enqueue(fiber : Fiber) : Nil + + # Suspends the execution of the current fiber and resumes the next runnable + # fiber. + # + # Unsafe. Must only be called on `ExecutionContext.current`. Prefer + # `ExecutionContext.reschedule` instead. + protected abstract def reschedule : Nil + + # Suspends the execution of the current fiber and resumes *fiber*. + # + # The current fiber will never be resumed; you're responsible to reenqueue + # it. + # + # Unsafe. Must only be called on `ExecutionContext.current`. Prefer + # `ExecutionContext.resume` instead. + protected abstract def resume(fiber : Fiber) : Nil + + # Switches the thread from running the current fiber to run *fiber* instead. + # + # Handles thread safety around fiber stacks: locks the GC to not start a + # collection while we're switching context, releases the stack of a dead + # fiber. + # + # Unsafe. Must only be called by the current scheduler. Caller must ensure + # that the fiber indeed belongs to the current execution context, and that + # the fiber can indeed be resumed. + protected def swapcontext(fiber : Fiber) : Nil + current_fiber = thread.current_fiber + + {% unless flag?(:interpreted) %} + thread.dead_fiber = current_fiber if current_fiber.dead? + {% end %} + + GC.lock_read + thread.current_fiber = fiber + Fiber.swapcontext(pointerof(current_fiber.@context), pointerof(fiber.@context)) + GC.unlock_read + + # we switched context so we can't trust *self* anymore (it is the + # scheduler that rescheduled *fiber* which may be another scheduler) as + # well as any other local or instance variables (e.g. we must resolve + # `Thread.current` again) + # + # that being said, we can still trust the *current_fiber* local variable + # (it's the only exception) + + {% unless flag?(:interpreted) %} + if fiber = Thread.current.dead_fiber? + fiber.execution_context.stack_pool.release(fiber.@stack) + end + {% end %} + end + + abstract def status : String + end +end diff --git a/src/fiber.cr b/src/fiber.cr index b34a8762037d..39a9f2bf2b85 100644 --- a/src/fiber.cr +++ b/src/fiber.cr @@ -59,7 +59,10 @@ class Fiber property name : String? @alive = true - {% if flag?(:preview_mt) %} @current_thread = Atomic(Thread?).new(nil) {% end %} + + {% if flag?(:preview_mt) && !flag?(:execution_context) %} + @current_thread = Atomic(Thread?).new(nil) + {% end %} # :nodoc: property next : Fiber? @@ -67,6 +70,10 @@ class Fiber # :nodoc: property previous : Fiber? + {% if flag?(:execution_context) %} + property! execution_context : ExecutionContext + {% end %} + # :nodoc: def self.inactive(fiber : Fiber) fibers.delete(fiber) @@ -84,16 +91,19 @@ class Fiber fibers.each { |fiber| yield fiber } end + {% begin %} # Creates a new `Fiber` instance. # # When the fiber is executed, it runs *proc* in its context. # # *name* is an optional and used only as an internal reference. - def initialize(@name : String? = nil, &@proc : ->) + def initialize(@name : String? = nil, {% if flag?(:execution_context) %}@execution_context : ExecutionContext = ExecutionContext.current,{% end %} &@proc : ->) @context = Context.new @stack, @stack_bottom = {% if flag?(:interpreted) %} {Pointer(Void).null, Pointer(Void).null} + {% elsif flag?(:execution_context) %} + execution_context.stack_pool.checkout {% else %} Crystal::Scheduler.stack_pool.checkout {% end %} @@ -123,6 +133,7 @@ class Fiber Fiber.fibers.push(self) end + {% end %} # :nodoc: def initialize(@stack : Void*, thread) @@ -139,13 +150,30 @@ class Fiber {% end %} thread.gc_thread_handler, @stack_bottom = GC.current_thread_stack_bottom @name = "main" - {% if flag?(:preview_mt) %} @current_thread.set(thread) {% end %} + + {% if flag?(:preview_mt) && !flag?(:execution_context) %} + @current_thread.set(thread) + {% end %} + Fiber.fibers.push(self) + + # we don't initialize @execution_context here (we may not have an execution + # context yet), and we can't detect ExecutionContext.current (we may reach + # an infinite recursion). end # :nodoc: def run GC.unlock_read + + {% if flag?(:execution_context) && !flag?(:interpreted) %} + # if the fiber previously running on this thread has terminated, we can + # now safely release its stack + if fiber = Thread.current.dead_fiber? + fiber.execution_context.stack_pool.release(fiber.@stack) + end + {% end %} + @proc.call rescue ex if name = @name @@ -163,9 +191,17 @@ class Fiber @timeout_select_action = nil @alive = false - {% unless flag?(:interpreted) %} + + {% unless flag?(:interpreted) || flag?(:execution_context) %} + # interpreted: the interpreter is managing the stacks + # + # execution context: do not prematurely release the stack before we switch + # to another fiber so we don't end up with a thread reusing a stack for a + # new fiber while the current fiber isn't fully terminated (oops); even + # without the pool, we can't unmap before we swap context. Crystal::Scheduler.stack_pool.release(@stack) {% end %} + Fiber.suspend end @@ -207,7 +243,11 @@ class Fiber # puts "never reached" # ``` def resume : Nil - Crystal::Scheduler.resume(self) + {% if flag?(:execution_context) %} + ExecutionContext.resume(self) + {% else %} + Crystal::Scheduler.resume(self) + {% end %} end # Adds this fiber to the scheduler's runnables queue for the current thread. @@ -216,7 +256,11 @@ class Fiber # the next time it has the opportunity to reschedule to another fiber. There # are no guarantees when that will happen. def enqueue : Nil - Crystal::Scheduler.enqueue(self) + {% if flag?(:execution_context) %} + execution_context.enqueue(self) + {% else %} + Crystal::Scheduler.enqueue(self) + {% end %} end # :nodoc: @@ -284,7 +328,14 @@ class Fiber # end # ``` def self.yield : Nil - Crystal::Scheduler.yield + Crystal.trace :sched, "yield" + + {% if flag?(:execution_context) %} + Fiber.current.resume_event.add(0.seconds) + Fiber.suspend + {% else %} + Crystal::Scheduler.yield + {% end %} end # Suspends execution of the current fiber indefinitely. @@ -298,7 +349,11 @@ class Fiber # useful if the fiber needs to wait for something to happen (for example an IO # event, a message is ready in a channel, etc.) which triggers a re-enqueue. def self.suspend : Nil - Crystal::Scheduler.reschedule + {% if flag?(:execution_context) %} + ExecutionContext.reschedule + {% else %} + Crystal::Scheduler.reschedule + {% end %} end def to_s(io : IO) : Nil @@ -320,7 +375,7 @@ class Fiber GC.push_stack @context.stack_top, @stack_bottom end - {% if flag?(:preview_mt) %} + {% if flag?(:preview_mt) && !flag?(:execution_context) %} # :nodoc: def set_current_thread(thread = Thread.current) : Thread @current_thread.set(thread) diff --git a/src/io/evented.cr b/src/io/evented.cr index 1f95d1870b0b..b238830f284a 100644 --- a/src/io/evented.cr +++ b/src/io/evented.cr @@ -89,11 +89,19 @@ module IO::Evented @write_event.consume_each &.free @readers.consume_each do |readers| - Crystal::Scheduler.enqueue readers + {% if flag?(:execution_context) %} + readers.each { |fiber| fiber.execution_context.enqueue fiber } + {% else %} + Crystal::Scheduler.enqueue readers + {% end %} end @writers.consume_each do |writers| - Crystal::Scheduler.enqueue writers + {% if flag?(:execution_context) %} + writers.each { |fiber| fiber.execution_context.enqueue fiber } + {% else %} + Crystal::Scheduler.enqueue writers + {% end %} end end diff --git a/src/kernel.cr b/src/kernel.cr index 34763b994839..c2af8771824e 100644 --- a/src/kernel.cr +++ b/src/kernel.cr @@ -608,7 +608,11 @@ end Exception::CallStack.load_debug_info if ENV["CRYSTAL_LOAD_DEBUG_INFO"]? == "1" Exception::CallStack.setup_crash_handler - Crystal::Scheduler.init + {% if flag?(:execution_context) %} + ExecutionContext.init_default_context + {% else %} + Crystal::Scheduler.init + {% end %} {% if flag?(:win32) %} Crystal::System::Process.start_interrupt_loop