Skip to content

Commit

Permalink
Skeleton for ExecutionContext types as per RFC #2
Browse files Browse the repository at this point in the history
- Add the `ExecutionContext` module;
- Add the `ExecutionContext::Scheduler` module;
- Add the `execution_context` compile-time flag.

When the `execution_context` flag is set:

- Don't load `Crystal::Scheduler`;
- Plug `ExecutionContext` instead of `Crystal::Scheduler`.
  • Loading branch information
ysbaddaden committed Jan 7, 2025
1 parent 7b11dcc commit 98022ab
Show file tree
Hide file tree
Showing 12 changed files with 365 additions and 37 deletions.
41 changes: 31 additions & 10 deletions src/concurrent.cr
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
require "fiber"
require "channel"
require "crystal/scheduler"
require "crystal/tracing"

{% if flag?(:execution_context) %}
require "execution_context"
{% else %}
require "crystal/scheduler"
{% end %}

# Blocks the current fiber for the specified number of seconds.
#
# While this fiber is waiting this time, other ready-to-execute
Expand All @@ -12,25 +17,36 @@ def sleep(seconds : Number) : Nil
if seconds < 0
raise ArgumentError.new "Sleep seconds must be positive"
end

Crystal::Scheduler.sleep(seconds.seconds)
sleep(seconds.seconds)
end

# Blocks the current Fiber for the specified time span.
#
# While this fiber is waiting this time, other ready-to-execute
# fibers might start their execution.
def sleep(time : Time::Span) : Nil
Crystal::Scheduler.sleep(time)
Crystal.trace :sched, "sleep", for: time

{% if flag?(:execution_context) %}
Fiber.current.resume_event.add(time)
ExecutionContext.reschedule
{% else %}
Crystal::Scheduler.sleep(time)
{% end %}
end

# Blocks the current fiber forever.
#
# Meanwhile, other ready-to-execute fibers might start their execution.
def sleep : Nil
Crystal::Scheduler.reschedule
{% if flag?(:execution_context) %}
ExecutionContext.reschedule
{% else %}
Crystal::Scheduler.reschedule
{% end %}
end

{% begin %}
# Spawns a new fiber.
#
# NOTE: The newly created fiber doesn't run as soon as spawned.
Expand Down Expand Up @@ -64,12 +80,17 @@ end
# wg.wait
# ```
def spawn(*, name : String? = nil, same_thread = false, &block)
fiber = Fiber.new(name, &block)
Crystal.trace :sched, "spawn", fiber: fiber
{% if flag?(:preview_mt) %} fiber.set_current_thread if same_thread {% end %}
fiber.enqueue
fiber
{% if flag?(:execution_context) %}
ExecutionContext::Scheduler.current.spawn(name: name, same_thread: same_thread, &block)
{% else %}
fiber = Fiber.new(name, &block)
Crystal.trace :sched, "spawn", fiber: fiber
{% if flag?(:preview_mt) %} fiber.set_current_thread if same_thread {% end %}
fiber.enqueue
fiber
{% end %}
end
{% end %}

# Spawns a fiber by first creating a `Proc`, passing the *call*'s
# expressions to it, and letting the `Proc` finally invoke the *call*.
Expand Down
14 changes: 11 additions & 3 deletions src/crystal/event_loop.cr
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,20 @@ abstract class Crystal::EventLoop

@[AlwaysInline]
def self.current : self
Crystal::Scheduler.event_loop
{% if flag?(:execution_context) %}
ExecutionContext.current.event_loop
{% else %}
Crystal::Scheduler.event_loop
{% end %}
end

@[AlwaysInline]
def self.current? : self?
Crystal::Scheduler.event_loop?
def self.current? : self | Nil
{% if flag?(:execution_context) %}
ExecutionContext.current.event_loop
{% else %}
Crystal::Scheduler.event_loop?
{% end %}
end

# Runs the loop.
Expand Down
18 changes: 15 additions & 3 deletions src/crystal/event_loop/polling.cr
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,11 @@ abstract class Crystal::EventLoop::Polling < Crystal::EventLoop
# NOTE: thread unsafe
def run(blocking : Bool) : Bool
system_run(blocking) do |fiber|
Crystal::Scheduler.enqueue(fiber)
{% if flag?(:execution_context) %}
fiber.execution_context.enqueue(fiber)
{% else %}
Crystal::Scheduler.enqueue(fiber)
{% end %}
end
true
end
Expand Down Expand Up @@ -303,13 +307,21 @@ abstract class Crystal::EventLoop::Polling < Crystal::EventLoop
Polling.arena.free(index) do |pd|
pd.value.@readers.ready_all do |event|
pd.value.@event_loop.try(&.unsafe_resume_io(event) do |fiber|
Crystal::Scheduler.enqueue(fiber)
{% if flag?(:execution_context) %}
fiber.execution_context.enqueue(fiber)
{% else %}
Crystal::Scheduler.enqueue(fiber)
{% end %}
end)
end

pd.value.@writers.ready_all do |event|
pd.value.@event_loop.try(&.unsafe_resume_io(event) do |fiber|
Crystal::Scheduler.enqueue(fiber)
{% if flag?(:execution_context) %}
fiber.execution_context.enqueue(fiber)
{% else %}
Crystal::Scheduler.enqueue(fiber)
{% end %}
end)
end

Expand Down
3 changes: 2 additions & 1 deletion src/crystal/scheduler.cr
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
{% skip_file if flag?(:execution_context) %}

require "crystal/event_loop"
require "crystal/system/print_error"
require "fiber"
Expand Down Expand Up @@ -66,7 +68,6 @@ class Crystal::Scheduler
end

def self.sleep(time : Time::Span) : Nil
Crystal.trace :sched, "sleep", for: time
Thread.current.scheduler.sleep(time)
end

Expand Down
41 changes: 33 additions & 8 deletions src/crystal/system/thread.cr
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,39 @@ class Thread

getter name : String?

{% if flag?(:execution_context) %}
# :nodoc:
getter! execution_context : ExecutionContext

# :nodoc:
property! current_scheduler : ExecutionContext::Scheduler

# :nodoc:
def execution_context=(@execution_context : ExecutionContext) : ExecutionContext
main_fiber.execution_context = execution_context
end

# :nodoc:
def dead_fiber=(@dead_fiber : Fiber) : Fiber
end

# :nodoc:
def dead_fiber? : Fiber?
if fiber = @dead_fiber
@dead_fiber = nil
fiber
end
end
{% else %}
# :nodoc:
getter scheduler : Crystal::Scheduler { Crystal::Scheduler.new(self) }

# :nodoc:
def scheduler? : ::Crystal::Scheduler?
@scheduler
end
{% end %}

def self.unsafe_each(&)
# nothing to iterate when @@threads is nil + don't lazily allocate in a
# method called from a GC collection callback!
Expand Down Expand Up @@ -154,14 +187,6 @@ class Thread
thread.name = name
end

# :nodoc:
getter scheduler : Crystal::Scheduler { Crystal::Scheduler.new(self) }

# :nodoc:
def scheduler? : ::Crystal::Scheduler?
@scheduler
end

protected def start
Thread.threads.push(self)
Thread.current = self
Expand Down
1 change: 1 addition & 0 deletions src/crystal/system/unix/signal.cr
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ require "c/signal"
require "c/stdio"
require "c/sys/wait"
require "c/unistd"
require "../print_error"

module Crystal::System::Signal
# The number of libc functions that can be called safely from a signal(2)
Expand Down
10 changes: 10 additions & 0 deletions src/crystal/tracing.cr
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,16 @@ module Crystal
write value.name || '?'
end

{% if flag?(:execution_context) %}
def write(value : ExecutionContext) : Nil
write value.name
end

def write(value : ExecutionContext::Scheduler) : Nil
write value.name
end
{% end %}

def write(value : Pointer) : Nil
write "0x"
System.to_int_slice(value.address, 16, true, 2) { |bytes| write(bytes) }
Expand Down
100 changes: 100 additions & 0 deletions src/execution_context/execution_context.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
require "../crystal/event_loop"
require "../crystal/system/thread"
require "../crystal/system/thread_linked_list"
require "../fiber"
require "../fiber/stack_pool"
require "./scheduler"

{% raise "ERROR: execution contexts require the `preview_mt` compilation flag" unless flag?(:preview_mt) %}

module ExecutionContext
@@default : ExecutionContext?

@[AlwaysInline]
def self.default : ExecutionContext
@@default.not_nil!("expected default execution context to have been setup")
end

# :nodoc:
def self.init_default_context : Nil
raise NotImplementedError.new("No execution context implementations (yet)")
end

# Returns the default number of workers to start in the execution context.
def self.default_workers_count : Int32
ENV["CRYSTAL_WORKERS"]?.try(&.to_i?) || Math.min(System.cpu_count.to_i, 32)
end

# :nodoc:
protected class_getter(execution_contexts) { Thread::LinkedList(ExecutionContext).new }

# :nodoc:
property next : ExecutionContext?

# :nodoc:
property previous : ExecutionContext?

# :nodoc:
def self.unsafe_each(&) : Nil
@@execution_contexts.try(&.unsafe_each { |execution_context| yield execution_context })
end

def self.each(&) : Nil
execution_contexts.each { |execution_context| yield execution_context }
end

@[AlwaysInline]
def self.current : ExecutionContext
Thread.current.execution_context
end

# Tells the current scheduler to suspend the current fiber and resume the
# next runnable fiber. The current fiber will never be resumed; you're
# responsible to reenqueue it.
#
# This method is safe as it only operates on the current `ExecutionContext`
# and `Scheduler`.
@[AlwaysInline]
def self.reschedule : Nil
Scheduler.current.reschedule
end

# Tells the current scheduler to suspend the current fiber and to resume
# *fiber* instead. The current fiber will never be resumed; you're responsible
# to reenqueue it.
#
# Raises `RuntimeError` if the fiber doesn't belong to the current execution
# context.
#
# This method is safe as it only operates on the current `ExecutionContext`
# and `Scheduler`.
def self.resume(fiber : Fiber) : Nil
if fiber.execution_context == current
Scheduler.current.resume(fiber)
else
raise RuntimeError.new("Can't resume fiber from #{fiber.execution_context} into #{current}")
end
end

# Creates a new fiber then calls `#enqueue` to add it to the execution
# context.
#
# May be called from any `ExecutionContext` (i.e. must be thread-safe).
def spawn(*, name : String? = nil, &block : ->) : Fiber
Fiber.new(name, self, &block).tap { |fiber| enqueue(fiber) }
end

# Legacy support for the `same_thread` argument. Each execution context may
# decide to support it or not (e.g. a single threaded context can accept it).
abstract def spawn(*, name : String? = nil, same_thread : Bool, &block : ->) : Fiber

abstract def stack_pool : Fiber::StackPool
abstract def stack_pool? : Fiber::StackPool?

abstract def event_loop : Crystal::EventLoop

# Enqueues a fiber to be resumed inside the execution context.
#
# May be called from any ExecutionContext (i.e. must be thread-safe).
abstract def enqueue(fiber : Fiber) : Nil
end
Loading

0 comments on commit 98022ab

Please sign in to comment.