diff --git a/spec/std/execution_context/global_queue_spec.cr b/spec/std/execution_context/global_queue_spec.cr new file mode 100644 index 000000000000..92ad12db00d2 --- /dev/null +++ b/spec/std/execution_context/global_queue_spec.cr @@ -0,0 +1,218 @@ +require "./spec_helper" +require "../../support/thread" + +describe ExecutionContext::GlobalQueue do + it "#initialize" do + q = ExecutionContext::GlobalQueue.new(Thread::Mutex.new) + q.empty?.should be_true + end + + it "#unsafe_push and #unsafe_pop" do + f1 = Fiber.new(name: "f1") { } + f2 = Fiber.new(name: "f2") { } + f3 = Fiber.new(name: "f3") { } + + q = ExecutionContext::GlobalQueue.new(Thread::Mutex.new) + q.unsafe_push(f1) + q.size.should eq(1) + + q.unsafe_push(f2) + q.unsafe_push(f3) + q.size.should eq(3) + + q.unsafe_pop?.should be(f3) + q.size.should eq(2) + + q.unsafe_pop?.should be(f2) + q.unsafe_pop?.should be(f1) + q.unsafe_pop?.should be_nil + q.size.should eq(0) + q.empty?.should be_true + end + + describe "#unsafe_grab?" do + it "can't grab from empty queue" do + q = ExecutionContext::GlobalQueue.new(Thread::Mutex.new) + runnables = ExecutionContext::Runnables(6).new(q) + q.unsafe_grab?(runnables, 4).should be_nil + end + + it "grabs fibers" do + q = ExecutionContext::GlobalQueue.new(Thread::Mutex.new) + fibers = 10.times.map { |i| Fiber.new(name: "f#{i}") { } }.to_a + fibers.each { |f| q.unsafe_push(f) } + + runnables = ExecutionContext::Runnables(6).new(q) + fiber = q.unsafe_grab?(runnables, 4) + + # returned the last enqueued fiber + fiber.should be(fibers[9]) + + # enqueued the next 2 fibers + runnables.size.should eq(2) + runnables.get?.should be(fibers[8]) + runnables.get?.should be(fibers[7]) + + # the remaining fibers are still there: + 6.downto(0).each do |i| + q.unsafe_pop?.should be(fibers[i]) + end + end + + it "can't grab more than available" do + f = Fiber.new { } + q = ExecutionContext::GlobalQueue.new(Thread::Mutex.new) + q.unsafe_push(f) + + # dequeues the unique fiber + runnables = ExecutionContext::Runnables(6).new(q) + fiber = q.unsafe_grab?(runnables, 4) + fiber.should be(f) + + # had nothing left to dequeue + runnables.size.should eq(0) + end + + it "clamps divisor to 1" do + f = Fiber.new { } + q = ExecutionContext::GlobalQueue.new(Thread::Mutex.new) + q.unsafe_push(f) + + # dequeues the unique fiber + runnables = ExecutionContext::Runnables(6).new(q) + fiber = q.unsafe_grab?(runnables, 0) + fiber.should be(f) + + # had nothing left to dequeue + runnables.size.should eq(0) + end + end + + # interpreter doesn't support threads yet (#14287) + pending_interpreted describe: "thread safety" do + it "one by one" do + fibers = StaticArray(ExecutionContext::FiberCounter, 763).new do |i| + ExecutionContext::FiberCounter.new(Fiber.new(name: "f#{i}") { }) + end + + n = 7 + increments = 15 + queue = ExecutionContext::GlobalQueue.new(Thread::Mutex.new) + ready = Thread::WaitGroup.new(n) + threads = Array(Thread).new(n) + + n.times do |i| + threads << new_thread(name: "ONE-#{i}") do + slept = 0 + ready.done + + loop do + if fiber = queue.pop? + fc = fibers.find { |x| x.@fiber == fiber }.not_nil! + queue.push(fiber) if fc.increment < increments + slept = 0 + elsif slept < 100 + slept += 1 + Thread.sleep(1.nanosecond) # don't burn CPU + else + break + end + end + end + end + ready.wait + + fibers.each_with_index do |fc, i| + queue.push(fc.@fiber) + Thread.sleep(10.nanoseconds) if i % 10 == 9 + end + + threads.each(&.join) + + # must have dequeued each fiber exactly X times + fibers.each { |fc| fc.counter.should eq(increments) } + end + + it "bulk operations" do + n = 7 + increments = 15 + + fibers = StaticArray(ExecutionContext::FiberCounter, 765).new do |i| # 765 can be divided by 3 and 5 + ExecutionContext::FiberCounter.new(Fiber.new(name: "f#{i}") { }) + end + + queue = ExecutionContext::GlobalQueue.new(Thread::Mutex.new) + ready = Thread::WaitGroup.new(n) + threads = Array(Thread).new(n) + + n.times do |i| + threads << new_thread("BULK-#{i}") do + slept = 0 + + r = ExecutionContext::Runnables(3).new(queue) + + batch = Fiber::Queue.new + size = 0 + + reenqueue = -> { + if size > 0 + queue.bulk_push(pointerof(batch)) + names = [] of String? + batch.each { |f| names << f.name } + batch.clear + size = 0 + end + } + + execute = ->(fiber : Fiber) { + fc = fibers.find { |x| x.@fiber == fiber }.not_nil! + + if fc.increment < increments + batch.push(fc.@fiber) + size += 1 + end + } + + ready.done + + loop do + if fiber = r.get? + execute.call(fiber) + slept = 0 + next + end + + if fiber = queue.grab?(r, 1) + reenqueue.call + execute.call(fiber) + slept = 0 + next + end + + if slept >= 100 + break + end + + reenqueue.call + slept += 1 + Thread.sleep(1.nanosecond) # don't burn CPU + end + end + end + ready.wait + + # enqueue in batches of 5 + 0.step(to: fibers.size - 1, by: 5) do |i| + q = Fiber::Queue.new + 5.times { |j| q.push(fibers[i + j].@fiber) } + queue.bulk_push(pointerof(q)) + Thread.sleep(10.nanoseconds) if i % 4 == 3 + end + + threads.each(&.join) + + # must have dequeued each fiber exactly X times (no less, no more) + fibers.each { |fc| fc.counter.should eq(increments) } + end + end +end diff --git a/spec/std/execution_context/runnables_spec.cr b/spec/std/execution_context/runnables_spec.cr new file mode 100644 index 000000000000..a31c59c86e0c --- /dev/null +++ b/spec/std/execution_context/runnables_spec.cr @@ -0,0 +1,261 @@ +require "./spec_helper" +require "../../support/thread" + +describe ExecutionContext::Runnables do + it "#initialize" do + g = ExecutionContext::GlobalQueue.new(Thread::Mutex.new) + r = ExecutionContext::Runnables(16).new(g) + r.capacity.should eq(16) + end + + describe "#push" do + it "enqueues the fiber in local queue" do + fibers = 4.times.map { |i| Fiber.new(name: "f#{i}") { } }.to_a + + # local enqueue + g = ExecutionContext::GlobalQueue.new(Thread::Mutex.new) + r = ExecutionContext::Runnables(4).new(g) + fibers.each { |f| r.push(f) } + + # local dequeue + fibers.each { |f| r.get?.should be(f) } + r.get?.should be_nil + + # didn't push to global queue + g.pop?.should be_nil + end + + it "moves half the local queue to the global queue on overflow" do + fibers = 5.times.map { |i| Fiber.new(name: "f#{i}") { } }.to_a + + # local enqueue + overflow + g = ExecutionContext::GlobalQueue.new(Thread::Mutex.new) + r = ExecutionContext::Runnables(4).new(g) + fibers.each { |f| r.push(f) } + + # kept half of local queue + r.get?.should be(fibers[2]) + r.get?.should be(fibers[3]) + + # moved half of local queue + last push to global queue + g.pop?.should eq(fibers[0]) + g.pop?.should eq(fibers[1]) + g.pop?.should eq(fibers[4]) + end + + it "can always push up to capacity" do + g = ExecutionContext::GlobalQueue.new(Thread::Mutex.new) + r = ExecutionContext::Runnables(4).new(g) + + 4.times do + # local + 4.times { r.push(Fiber.new { }) } + 2.times { r.get? } + 2.times { r.push(Fiber.new { }) } + + # overflow (2+1 fibers are sent to global queue + 1 local) + 2.times { r.push(Fiber.new { }) } + + # clear + 3.times { r.get? } + end + + # on each iteration we pushed 2+1 fibers to the global queue + g.size.should eq(12) + + # grab fibers back from the global queue + fiber = g.unsafe_grab?(r, divisor: 1) + fiber.should_not be_nil + r.get?.should_not be_nil + r.get?.should be_nil + end + end + + describe "#bulk_push" do + it "fills the local queue" do + q = Fiber::Queue.new + fibers = 4.times.map { |i| Fiber.new(name: "f#{i}") { } }.to_a + fibers.each { |f| q.push(f) } + + # local enqueue + g = ExecutionContext::GlobalQueue.new(Thread::Mutex.new) + r = ExecutionContext::Runnables(4).new(g) + r.bulk_push(pointerof(q)) + + fibers.reverse_each { |f| r.get?.should be(f) } + g.empty?.should be_true + end + + it "pushes the overflow to the global queue" do + q = Fiber::Queue.new + fibers = 7.times.map { |i| Fiber.new(name: "f#{i}") { } }.to_a + fibers.each { |f| q.push(f) } + + # local enqueue + overflow + g = ExecutionContext::GlobalQueue.new(Thread::Mutex.new) + r = ExecutionContext::Runnables(4).new(g) + r.bulk_push(pointerof(q)) + + # filled the local queue + r.get?.should eq(fibers[6]) + r.get?.should eq(fibers[5]) + r.get?.should be(fibers[4]) + r.get?.should be(fibers[3]) + + # moved the rest to the global queue + g.pop?.should eq(fibers[2]) + g.pop?.should eq(fibers[1]) + g.pop?.should eq(fibers[0]) + end + end + + describe "#get?" do + # TODO: need specific tests (though we already use it in the above tests?) + end + + describe "#steal_from" do + it "steals from another runnables" do + g = ExecutionContext::GlobalQueue.new(Thread::Mutex.new) + fibers = 6.times.map { |i| Fiber.new(name: "f#{i}") { } }.to_a + + # fill the source queue + r1 = ExecutionContext::Runnables(16).new(g) + fibers.each { |f| r1.push(f) } + + # steal from source queue + r2 = ExecutionContext::Runnables(16).new(g) + fiber = r2.steal_from(r1) + + # stole half of the runnable fibers + fiber.should be(fibers[2]) + r2.get?.should be(fibers[0]) + r2.get?.should be(fibers[1]) + r2.get?.should be_nil + + # left the other half + r1.get?.should be(fibers[3]) + r1.get?.should be(fibers[4]) + r1.get?.should be(fibers[5]) + r1.get?.should be_nil + + # global queue is left untouched + g.empty?.should be_true + end + + it "steals the last fiber" do + g = ExecutionContext::GlobalQueue.new(Thread::Mutex.new) + lone = Fiber.new(name: "lone") { } + + # fill the source queue + r1 = ExecutionContext::Runnables(16).new(g) + r1.push(lone) + + # steal from source queue + r2 = ExecutionContext::Runnables(16).new(g) + fiber = r2.steal_from(r1) + + # stole the fiber & local queue is still empty + fiber.should be(lone) + r2.get?.should be_nil + + # left nothing in original queue + r1.get?.should be_nil + + # global queue is left untouched + g.empty?.should be_true + end + + it "steals nothing" do + g = ExecutionContext::GlobalQueue.new(Thread::Mutex.new) + r1 = ExecutionContext::Runnables(16).new(g) + r2 = ExecutionContext::Runnables(16).new(g) + + fiber = r2.steal_from(r1) + fiber.should be_nil + r2.get?.should be_nil + r1.get?.should be_nil + end + end + + # interpreter doesn't support threads yet (#14287) + pending_interpreted describe: "thread safety" do + it "stress test" do + n = 7 + increments = 7919 + + # less fibers than space in runnables (so threads can starve) + # 54 is roughly half of 16 × 7 and can be divided by 9 (for batch enqueues below) + fibers = Array(ExecutionContext::FiberCounter).new(54) do |i| + ExecutionContext::FiberCounter.new(Fiber.new(name: "f#{i}") { }) + end + + global_queue = ExecutionContext::GlobalQueue.new(Thread::Mutex.new) + ready = Thread::WaitGroup.new(n) + threads = Array(Thread).new(n) + + all_runnables = Array(ExecutionContext::Runnables(16)).new(n) do + ExecutionContext::Runnables(16).new(global_queue) + end + + n.times do |i| + threads << new_thread(name: "RUN-#{i}") do + runnables = all_runnables[i] + slept = 0 + + execute = ->(fiber : Fiber) { + fc = fibers.find { |x| x.@fiber == fiber }.not_nil! + runnables.push(fiber) if fc.increment < increments + } + + ready.done + + loop do + # dequeue from local queue + if fiber = runnables.get? + execute.call(fiber) + slept = 0 + next + end + + # steal from another queue + while (r = all_runnables.sample) == runnables + end + if fiber = runnables.steal_from(r) + execute.call(fiber) + slept = 0 + next + end + + # dequeue from global queue + if fiber = global_queue.grab?(runnables, n) + execute.call(fiber) + slept = 0 + next + end + + if slept >= 100 + break + end + + slept += 1 + Thread.sleep(1.nanosecond) # don't burn CPU + end + end + end + ready.wait + + # enqueue in batches + 0.step(to: fibers.size - 1, by: 9) do |i| + q = Fiber::Queue.new + 9.times { |j| q.push(fibers[i + j].@fiber) } + global_queue.bulk_push(pointerof(q)) + Thread.sleep(10.nanoseconds) if i % 2 == 1 + end + + threads.map(&.join) + + # must have dequeued each fiber exactly X times (no less, no more) + fibers.each { |fc| fc.counter.should eq(increments) } + end + end +end diff --git a/spec/std/execution_context/spec_helper.cr b/spec/std/execution_context/spec_helper.cr new file mode 100644 index 000000000000..73ed3eefe751 --- /dev/null +++ b/spec/std/execution_context/spec_helper.cr @@ -0,0 +1,53 @@ +require "../spec_helper" +require "crystal/system/thread_wait_group" +require "execution_context/runnables" +require "execution_context/global_queue" + +module ExecutionContext + class FiberCounter + def initialize(@fiber : Fiber) + @counter = Atomic(Int32).new(0) + end + + # fetch and add + def increment + @counter.add(1, :relaxed) + 1 + end + + def counter + @counter.get(:relaxed) + end + end + + class TestTimeout + def initialize(@timeout : Time::Span = 2.seconds) + @start = Time.monotonic + @cancelled = Atomic(Bool).new(false) + end + + def cancel : Nil + @cancelled.set(true) + end + + def elapsed? + (Time.monotonic - @start) >= @timeout + end + + def done? + return true if @cancelled.get + raise "timeout reached" if elapsed? + false + end + + def sleep(interval = 100.milliseconds) : Nil + until done? + ::sleep interval + end + end + + def reset : Nil + @start = Time.monotonic + @cancelled.set(false) + end + end +end diff --git a/spec/std/fiber/queue_spec.cr b/spec/std/fiber/queue_spec.cr new file mode 100644 index 000000000000..39c67bc52bee --- /dev/null +++ b/spec/std/fiber/queue_spec.cr @@ -0,0 +1,183 @@ +require "../spec_helper" +require "fiber/queue" + +describe Fiber::Queue do + describe "#initialize" do + it "creates an empty queue" do + q = Fiber::Queue.new + q.@head.should be_nil + q.@tail.should be_nil + q.size.should eq(0) + q.empty?.should be_true + end + + it "creates a filled queue" do + f1 = Fiber.new(name: "f1") { } + f2 = Fiber.new(name: "f2") { } + f1.schedlink = f2 + f2.schedlink = nil + + q = Fiber::Queue.new(f2, f1, size: 2) + q.@head.should be(f2) + q.@tail.should be(f1) + q.size.should eq(2) + q.empty?.should be_false + end + end + + describe "#push" do + it "to head" do + q = Fiber::Queue.new + f1 = Fiber.new(name: "f1") { } + f2 = Fiber.new(name: "f2") { } + f3 = Fiber.new(name: "f3") { } + + # simulate fibers previously added to other queues + f1.schedlink = f3 + f2.schedlink = f1 + + # push first fiber + q.push(f1) + q.@head.should be(f1) + q.@tail.should be(f1) + f1.schedlink.should be_nil + q.size.should eq(1) + + # push second fiber + q.push(f2) + q.@head.should be(f2) + q.@tail.should be(f1) + f2.schedlink.should be(f1) + f1.schedlink.should be_nil + q.size.should eq(2) + + # push third fiber + q.push(f3) + q.@head.should be(f3) + q.@tail.should be(f1) + f3.schedlink.should be(f2) + f2.schedlink.should be(f1) + f1.schedlink.should be_nil + q.size.should eq(3) + end + end + + describe "#bulk_unshift" do + it "to empty queue" do + # manually create a queue + f1 = Fiber.new(name: "f1") { } + f2 = Fiber.new(name: "f2") { } + f3 = Fiber.new(name: "f3") { } + f3.schedlink = f2 + f2.schedlink = f1 + f1.schedlink = nil + q1 = Fiber::Queue.new(f3, f1, size: 3) + + # push in bulk + q2 = Fiber::Queue.new(nil, nil, size: 0) + q2.bulk_unshift(pointerof(q1)) + q2.@head.should be(f3) + q2.@tail.should be(f1) + q2.size.should eq(3) + end + + it "to filled queue" do + f1 = Fiber.new(name: "f1") { } + f2 = Fiber.new(name: "f2") { } + f3 = Fiber.new(name: "f3") { } + f4 = Fiber.new(name: "f4") { } + f5 = Fiber.new(name: "f5") { } + + # source queue + f3.schedlink = f2 + f2.schedlink = f1 + f1.schedlink = nil + q1 = Fiber::Queue.new(f3, f1, size: 3) + + # destination queue + f5.schedlink = f4 + f4.schedlink = nil + q2 = Fiber::Queue.new(f5, f4, size: 2) + + # push in bulk + q2.bulk_unshift(pointerof(q1)) + q2.@head.should be(f5) + q2.@tail.should be(f1) + q2.size.should eq(5) + + f5.schedlink.should be(f4) + f4.schedlink.should be(f3) + f3.schedlink.should be(f2) + f2.schedlink.should be(f1) + f1.schedlink.should be(nil) + end + end + + describe "#pop" do + it "from head" do + f1 = Fiber.new(name: "f1") { } + f2 = Fiber.new(name: "f2") { } + f3 = Fiber.new(name: "f3") { } + f3.schedlink = f2 + f2.schedlink = f1 + f1.schedlink = nil + q = Fiber::Queue.new(f3, f1, size: 3) + + # removes third element + q.pop.should be(f3) + q.@head.should be(f2) + q.@tail.should be(f1) + q.size.should eq(2) + + # removes second element + q.pop.should be(f2) + q.@head.should be(f1) + q.@tail.should be(f1) + q.size.should eq(1) + + # removes first element + q.pop.should be(f1) + q.@head.should be_nil + q.@tail.should be_nil + q.size.should eq(0) + + # empty queue + expect_raises(IndexError) { q.pop } + q.size.should eq(0) + end + end + + describe "#pop?" do + it "from head" do + f1 = Fiber.new(name: "f1") { } + f2 = Fiber.new(name: "f2") { } + f3 = Fiber.new(name: "f3") { } + f3.schedlink = f2 + f2.schedlink = f1 + f1.schedlink = nil + q = Fiber::Queue.new(f3, f1, size: 3) + + # removes third element + q.pop?.should be(f3) + q.@head.should be(f2) + q.@tail.should be(f1) + q.size.should eq(2) + + # removes second element + q.pop?.should be(f2) + q.@head.should be(f1) + q.@tail.should be(f1) + q.size.should eq(1) + + # removes first element + q.pop?.should be(f1) + q.@head.should be_nil + q.@tail.should be_nil + q.size.should eq(0) + + # empty queue + q.pop?.should be_nil + q.size.should eq(0) + end + end +end diff --git a/spec/std/file_spec.cr b/spec/std/file_spec.cr index 0f88b2028c2f..29add93ad6f5 100644 --- a/spec/std/file_spec.cr +++ b/spec/std/file_spec.cr @@ -1,4 +1,5 @@ require "./spec_helper" +require "../support/thread" private def it_raises_on_null_byte(operation, file = __FILE__, line = __LINE__, end_line = __END_LINE__, &block) it "errors on #{operation}", file, line, end_line do @@ -109,7 +110,7 @@ describe "File" do # thread or process also opened the file; we should pass # O_NONBLOCK to the open(2) call itself, not afterwards file = nil - Thread.new { file = File.new(path, "w", blocking: nil) } + new_thread { file = File.new(path, "w", blocking: nil) } begin File.open(path, "r", blocking: false) do |file| diff --git a/spec/std/spec_helper.cr b/spec/std/spec_helper.cr index 8bb0a9e1a2f2..723186f22a65 100644 --- a/spec/std/spec_helper.cr +++ b/spec/std/spec_helper.cr @@ -43,7 +43,7 @@ def spawn_and_check(before : Proc(_), file = __FILE__, line = __LINE__, &block : # This is a workaround to ensure the "before" fiber # is unscheduled. Otherwise it might stay alive running the event loop - spawn(same_thread: true) do + spawn(same_thread: !{{flag?(:execution_context) && flag?(:mt)}}) do while x.get != 2 Fiber.yield end diff --git a/spec/std/thread/condition_variable_spec.cr b/spec/std/thread/condition_variable_spec.cr index 1bf78f797357..f682b418e6f3 100644 --- a/spec/std/thread/condition_variable_spec.cr +++ b/spec/std/thread/condition_variable_spec.cr @@ -1,4 +1,5 @@ require "../spec_helper" +require "../../support/thread" # interpreter doesn't support threads yet (#14287) pending_interpreted describe: Thread::ConditionVariable do @@ -7,7 +8,7 @@ pending_interpreted describe: Thread::ConditionVariable do cond = Thread::ConditionVariable.new mutex.synchronize do - Thread.new do + new_thread do mutex.synchronize { cond.signal } end @@ -22,7 +23,7 @@ pending_interpreted describe: Thread::ConditionVariable do waiting = 0 5.times do - Thread.new do + new_thread do mutex.synchronize do waiting += 1 cv1.wait(mutex) @@ -78,7 +79,7 @@ pending_interpreted describe: Thread::ConditionVariable do cond = Thread::ConditionVariable.new mutex.synchronize do - Thread.new do + new_thread do mutex.synchronize { cond.signal } end diff --git a/spec/std/thread/mutex_spec.cr b/spec/std/thread/mutex_spec.cr index 99f3c5d385c3..8fb8e484e935 100644 --- a/spec/std/thread/mutex_spec.cr +++ b/spec/std/thread/mutex_spec.cr @@ -1,4 +1,5 @@ require "../spec_helper" +require "../../support/thread" # interpreter doesn't support threads yet (#14287) pending_interpreted describe: Thread::Mutex do @@ -7,7 +8,7 @@ pending_interpreted describe: Thread::Mutex do mutex = Thread::Mutex.new threads = Array.new(10) do - Thread.new do + new_thread do mutex.synchronize { a += 1 } end end @@ -22,7 +23,7 @@ pending_interpreted describe: Thread::Mutex do mutex.try_lock.should be_false expect_raises(RuntimeError) { mutex.lock } mutex.unlock - Thread.new { mutex.synchronize { } }.join + new_thread { mutex.synchronize { } }.join end it "won't unlock from another thread" do @@ -30,7 +31,7 @@ pending_interpreted describe: Thread::Mutex do mutex.lock expect_raises(RuntimeError) do - Thread.new { mutex.unlock }.join + new_thread { mutex.unlock }.join end mutex.unlock diff --git a/spec/std/thread_spec.cr b/spec/std/thread_spec.cr index 5a43c7e429d1..3dd550cffe4d 100644 --- a/spec/std/thread_spec.cr +++ b/spec/std/thread_spec.cr @@ -1,16 +1,17 @@ require "./spec_helper" +require "../support/thread" # interpreter doesn't support threads yet (#14287) pending_interpreted describe: Thread do it "allows passing an argumentless fun to execute" do a = 0 - thread = Thread.new { a = 1; 10 } + thread = new_thread { a = 1; 10 } thread.join a.should eq(1) end it "raises inside thread and gets it on join" do - thread = Thread.new { raise "OH NO" } + thread = new_thread { raise "OH NO" } expect_raises Exception, "OH NO" do thread.join end @@ -18,7 +19,7 @@ pending_interpreted describe: Thread do it "returns current thread object" do current = nil - thread = Thread.new { current = Thread.current } + thread = new_thread { current = Thread.current } thread.join current.should be(thread) current.should_not be(Thread.current) @@ -31,7 +32,7 @@ pending_interpreted describe: Thread do it "yields the processor" do done = false - thread = Thread.new do + thread = new_thread do 3.times { Thread.yield } done = true end @@ -44,10 +45,18 @@ pending_interpreted describe: Thread do end it "names the thread" do - Thread.current.name.should be_nil - name = nil + {% if flag?(:execution_context) %} + {% if flag?(:mt) %} + Thread.current.name.should match(/^DEFAULT-\d+$/) + {% else %} + Thread.current.name.should eq("DEFAULT") + {% end %} + {% else %} + Thread.current.name.should be_nil + {% end %} - thread = Thread.new(name: "some-name") do + name = nil + thread = new_thread(name: "some-name") do name = Thread.current.name end thread.name.should eq("some-name") diff --git a/spec/support/mt_abort_timeout.cr b/spec/support/mt_abort_timeout.cr index 7339da6c07a1..dc78f7f8552c 100644 --- a/spec/support/mt_abort_timeout.cr +++ b/spec/support/mt_abort_timeout.cr @@ -5,7 +5,7 @@ private SPEC_TIMEOUT = 15.seconds Spec.around_each do |example| done = Channel(Exception?).new - spawn(same_thread: true) do + spawn(same_thread: !{{flag?(:execution_context) && flag?(:mt)}}) do begin example.run rescue e diff --git a/spec/support/thread.cr b/spec/support/thread.cr new file mode 100644 index 000000000000..df8c0aa37268 --- /dev/null +++ b/spec/support/thread.cr @@ -0,0 +1,9 @@ +{% begin %} + def new_thread(name = nil, &block) : Thread + {% if flag?(:execution_context) %} + ExecutionContext::Isolated.new(name: name || "SPEC") { block.call }.@thread + {% else %} + Thread.new(name) { block.call } + {% end %} + end +{% end %} diff --git a/src/concurrent.cr b/src/concurrent.cr index 07ae945a84f6..1f1ad04bfd06 100644 --- a/src/concurrent.cr +++ b/src/concurrent.cr @@ -1,8 +1,13 @@ require "fiber" require "channel" -require "crystal/scheduler" require "crystal/tracing" +{% if flag?(:execution_context) %} + require "execution_context" +{% else %} + require "crystal/scheduler" +{% end %} + # Blocks the current fiber for the specified number of seconds. # # While this fiber is waiting this time, other ready-to-execute @@ -12,8 +17,7 @@ def sleep(seconds : Number) : Nil if seconds < 0 raise ArgumentError.new "Sleep seconds must be positive" end - - Crystal::Scheduler.sleep(seconds.seconds) + sleep(seconds.seconds) end # Blocks the current Fiber for the specified time span. @@ -21,16 +25,28 @@ end # While this fiber is waiting this time, other ready-to-execute # fibers might start their execution. def sleep(time : Time::Span) : Nil - Crystal::Scheduler.sleep(time) + Crystal.trace :sched, "sleep", for: time + + {% if flag?(:execution_context) %} + Fiber.current.resume_event.add(time) + ExecutionContext.reschedule + {% else %} + Crystal::Scheduler.sleep(time) + {% end %} end # Blocks the current fiber forever. # # Meanwhile, other ready-to-execute fibers might start their execution. def sleep : Nil - Crystal::Scheduler.reschedule + {% if flag?(:execution_context) %} + ExecutionContext.reschedule + {% else %} + Crystal::Scheduler.reschedule + {% end %} end +{% begin %} # Spawns a new fiber. # # NOTE: The newly created fiber doesn't run as soon as spawned. @@ -64,12 +80,17 @@ end # wg.wait # ``` def spawn(*, name : String? = nil, same_thread = false, &block) - fiber = Fiber.new(name, &block) - Crystal.trace :sched, "spawn", fiber: fiber - {% if flag?(:preview_mt) %} fiber.set_current_thread if same_thread {% end %} - fiber.enqueue - fiber + {% if flag?(:execution_context) %} + ExecutionContext::Scheduler.current.spawn(name: name, same_thread: same_thread, &block) + {% else %} + fiber = Fiber.new(name, &block) + Crystal.trace :sched, "spawn", fiber: fiber + {% if flag?(:preview_mt) %} fiber.set_current_thread if same_thread {% end %} + fiber.enqueue + fiber + {% end %} end +{% end %} # Spawns a fiber by first creating a `Proc`, passing the *call*'s # expressions to it, and letting the `Proc` finally invoke the *call*. diff --git a/src/crystal/event_loop.cr b/src/crystal/event_loop.cr index 00bcb86040b6..1b2887a1c535 100644 --- a/src/crystal/event_loop.cr +++ b/src/crystal/event_loop.cr @@ -27,12 +27,20 @@ abstract class Crystal::EventLoop @[AlwaysInline] def self.current : self - Crystal::Scheduler.event_loop + {% if flag?(:execution_context) %} + ExecutionContext.current.event_loop + {% else %} + Crystal::Scheduler.event_loop + {% end %} end @[AlwaysInline] - def self.current? : self? - Crystal::Scheduler.event_loop? + def self.current? : self | Nil + {% if flag?(:execution_context) %} + ExecutionContext.current.event_loop + {% else %} + Crystal::Scheduler.event_loop? + {% end %} end # Runs the loop. @@ -46,6 +54,13 @@ abstract class Crystal::EventLoop # events. abstract def run(blocking : Bool) : Bool + {% if flag?(:execution_context) %} + # Same as `#run` but collects runnable fibers into *queue* instead of + # enqueueing in parallel, so the caller is responsible and in control for + # when and how the fibers will be enqueued. + abstract def run(queue : Fiber::Queue*, blocking : Bool) : Nil + {% end %} + # Tells a blocking run loop to no longer wait for events to activate. It may # for example enqueue a NOOP event with an immediate (or past) timeout. Having # activated an event, the loop shall return, allowing the blocked thread to diff --git a/src/crystal/event_loop/iocp.cr b/src/crystal/event_loop/iocp.cr index 6e4175e3daee..3c84b2ce927d 100644 --- a/src/crystal/event_loop/iocp.cr +++ b/src/crystal/event_loop/iocp.cr @@ -55,6 +55,7 @@ class Crystal::EventLoop::IOCP < Crystal::EventLoop iocp end + # thread unsafe def run(blocking : Bool) : Bool enqueued = false @@ -66,6 +67,13 @@ class Crystal::EventLoop::IOCP < Crystal::EventLoop enqueued end + {% if flag?(:execution_context) %} + # thread unsafe + def run(queue : Fiber::Queue*, blocking : Bool) : Nil + run_impl(blocking) { |fiber| queue.value.push(fiber) } + end + {% end %} + # Runs the event loop and enqueues the fiber for the next upcoming event or # completion. private def run_impl(blocking : Bool, &) : Nil diff --git a/src/crystal/event_loop/libevent.cr b/src/crystal/event_loop/libevent.cr index 9c0b3d33b15c..459dccbc13ee 100644 --- a/src/crystal/event_loop/libevent.cr +++ b/src/crystal/event_loop/libevent.cr @@ -20,26 +20,56 @@ class Crystal::EventLoop::LibEvent < Crystal::EventLoop event_base.loop(flags) end + {% if flag?(:execution_context) %} + def run(queue : Fiber::Queue*, blocking : Bool) : Nil + Crystal.trace :evloop, "run", fiber: fiber, blocking: blocking + @runnables = queue + run(blocking) + ensure + @runnables = nil + end + + def callback_enqueue(fiber : Fiber) : Nil + Crystal.trace :evloop, "callback", fiber: fiber + if queue = @runnables + queue.value.push(fiber) + else + raise "BUG: libevent callback executed outside of #run(queue*) call" + end + end + {% end %} + def interrupt : Nil event_base.loop_exit end - # Create a new resume event for a fiber. + # Create a new resume event for a fiber (sleep). def create_resume_event(fiber : Fiber) : Crystal::EventLoop::LibEvent::Event event_base.new_event(-1, LibEvent2::EventFlags::None, fiber) do |s, flags, data| - data.as(Fiber).enqueue + f = data.as(Fiber) + {% if flag?(:execution_context) %} + event_loop = Crystal::EventLoop.current.as(Crystal::EventLoop::LibEvent) + event_loop.callback_enqueue(f) + {% else %} + f.enqueue + {% end %} end end - # Creates a timeout_event. + # Creates a timeout event (timeout action of select expression). def create_timeout_event(fiber) : Crystal::EventLoop::LibEvent::Event event_base.new_event(-1, LibEvent2::EventFlags::None, fiber) do |s, flags, data| f = data.as(Fiber) - if (select_action = f.timeout_select_action) + if select_action = f.timeout_select_action f.timeout_select_action = nil - select_action.time_expired(f) - else - f.enqueue + if select_action.time_expired? + {% if flag?(:execution_context) %} + event_loop = Crystal::EventLoop.current.as(Crystal::EventLoop::LibEvent) + event_loop.callback_enqueue(f) + {% else %} + f.enqueue + {% end %} + end end end end diff --git a/src/crystal/event_loop/polling.cr b/src/crystal/event_loop/polling.cr index 2fe86ad5b649..24d94af02bcc 100644 --- a/src/crystal/event_loop/polling.cr +++ b/src/crystal/event_loop/polling.cr @@ -112,14 +112,25 @@ abstract class Crystal::EventLoop::Polling < Crystal::EventLoop end {% end %} - # NOTE: thread unsafe + # thread unsafe def run(blocking : Bool) : Bool system_run(blocking) do |fiber| - Crystal::Scheduler.enqueue(fiber) + {% if flag?(:execution_context) %} + fiber.execution_context.enqueue(fiber) + {% else %} + Crystal::Scheduler.enqueue(fiber) + {% end %} end true end + {% if flag?(:execution_context) %} + # thread unsafe + def run(queue : Fiber::Queue*, blocking : Bool) : Nil + system_run(blocking) { |fiber| queue.value.push(fiber) } + end + {% end %} + # fiber interface, see Crystal::EventLoop def create_resume_event(fiber : Fiber) : FiberEvent @@ -303,13 +314,21 @@ abstract class Crystal::EventLoop::Polling < Crystal::EventLoop Polling.arena.free(index) do |pd| pd.value.@readers.ready_all do |event| pd.value.@event_loop.try(&.unsafe_resume_io(event) do |fiber| - Crystal::Scheduler.enqueue(fiber) + {% if flag?(:execution_context) %} + fiber.execution_context.enqueue(fiber) + {% else %} + Crystal::Scheduler.enqueue(fiber) + {% end %} end) end pd.value.@writers.ready_all do |event| pd.value.@event_loop.try(&.unsafe_resume_io(event) do |fiber| - Crystal::Scheduler.enqueue(fiber) + {% if flag?(:execution_context) %} + fiber.execution_context.enqueue(fiber) + {% else %} + Crystal::Scheduler.enqueue(fiber) + {% end %} end) end diff --git a/src/crystal/print_buffered.cr b/src/crystal/print_buffered.cr new file mode 100644 index 000000000000..854fa0951021 --- /dev/null +++ b/src/crystal/print_buffered.cr @@ -0,0 +1,43 @@ +module Crystal + # Prepares an error message, with an optional exception or backtrace, to an + # in-memory buffer, before writing to an IO, usually STDERR, in a single write + # operation. + # + # Avoids intermingled messages caused by multiple threads writing to a STDIO + # in parallel. This may still happen, since writes may not be atomic when the + # overall size is larger than PIPE_BUF, buf should at least write 512 bytes + # atomically. + def self.print_buffered(message : String, *args, to io : IO, exception = nil, backtrace = nil) : Nil + buf = buffered_message(message, *args, exception: exception, backtrace: backtrace) + io.write(buf.to_slice) + io.flush unless io.sync? + end + + # Identical to `#print_buffered` but eventually calls + # `System.print_error(bytes)` to write to stderr without going through the + # event loop. + def self.print_error_buffered(message : String, *args, exception = nil, backtrace = nil) : Nil + buf = buffered_message(message, *args, exception: exception, backtrace: backtrace) + System.print_error(buf.to_slice) + end + + def self.buffered_message(message : String, *args, exception = nil, backtrace = nil) + buf = IO::Memory.new(4096) + + if args.empty? + buf << message + else + System.printf(message, *args) { |bytes| buf.write(bytes) } + end + + if exception + buf << ": " + exception.inspect_with_backtrace(buf) + else + buf.puts + backtrace.try(&.each { |line| buf << " from " << line << '\n' }) + end + + buf + end +end diff --git a/src/crystal/scheduler.cr b/src/crystal/scheduler.cr index efee6b3c06f1..6cc13406ea4a 100644 --- a/src/crystal/scheduler.cr +++ b/src/crystal/scheduler.cr @@ -1,3 +1,5 @@ +{% skip_file if flag?(:execution_context) %} + require "crystal/event_loop" require "crystal/system/print_error" require "fiber" @@ -66,7 +68,6 @@ class Crystal::Scheduler end def self.sleep(time : Time::Span) : Nil - Crystal.trace :sched, "sleep", for: time Thread.current.scheduler.sleep(time) end diff --git a/src/crystal/system/thread.cr b/src/crystal/system/thread.cr index 92136d1f3989..2b5e06498798 100644 --- a/src/crystal/system/thread.cr +++ b/src/crystal/system/thread.cr @@ -68,6 +68,39 @@ class Thread getter name : String? + {% if flag?(:execution_context) %} + # :nodoc: + getter! execution_context : ExecutionContext + + # :nodoc: + property! current_scheduler : ExecutionContext::Scheduler + + # :nodoc: + def execution_context=(@execution_context : ExecutionContext) : ExecutionContext + main_fiber.execution_context = execution_context + end + + # :nodoc: + def dead_fiber=(@dead_fiber : Fiber) : Fiber + end + + # :nodoc: + def dead_fiber? : Fiber? + if fiber = @dead_fiber + @dead_fiber = nil + fiber + end + end + {% else %} + # :nodoc: + getter scheduler : Crystal::Scheduler { Crystal::Scheduler.new(self) } + + # :nodoc: + def scheduler? : ::Crystal::Scheduler? + @scheduler + end + {% end %} + def self.unsafe_each(&) # nothing to iterate when @@threads is nil + don't lazily allocate in a # method called from a GC collection callback! @@ -154,14 +187,6 @@ class Thread thread.name = name end - # :nodoc: - getter scheduler : Crystal::Scheduler { Crystal::Scheduler.new(self) } - - # :nodoc: - def scheduler? : ::Crystal::Scheduler? - @scheduler - end - protected def start Thread.threads.push(self) Thread.current = self diff --git a/src/crystal/system/thread_wait_group.cr b/src/crystal/system/thread_wait_group.cr new file mode 100644 index 000000000000..3494e1e7f569 --- /dev/null +++ b/src/crystal/system/thread_wait_group.cr @@ -0,0 +1,20 @@ +# :nodoc: +class Thread::WaitGroup + def initialize(@count : Int32) + @mutex = Thread::Mutex.new + @condition = Thread::ConditionVariable.new + end + + def done : Nil + @mutex.synchronize do + @count -= 1 + @condition.broadcast if @count == 0 + end + end + + def wait : Nil + @mutex.synchronize do + @condition.wait(@mutex) unless @count == 0 + end + end +end diff --git a/src/crystal/system/unix/signal.cr b/src/crystal/system/unix/signal.cr index 26f4bf6cf7e9..e3675843c450 100644 --- a/src/crystal/system/unix/signal.cr +++ b/src/crystal/system/unix/signal.cr @@ -2,6 +2,7 @@ require "c/signal" require "c/stdio" require "c/sys/wait" require "c/unistd" +require "../print_error" module Crystal::System::Signal # The number of libc functions that can be called safely from a signal(2) @@ -32,7 +33,7 @@ module Crystal::System::Signal action.sa_flags = LibC::SA_RESTART action.sa_sigaction = LibC::SigactionHandlerT.new do |value, _, _| - writer.write_bytes(value) unless writer.closed? + FileDescriptor.write_fully(writer.fd, pointerof(value)) unless writer.closed? end LibC.sigemptyset(pointerof(action.@sa_mask)) LibC.sigaction(signal, pointerof(action), nil) @@ -84,7 +85,9 @@ module Crystal::System::Signal private def self.start_loop spawn(name: "signal-loop") do loop do - value = reader.read_bytes(Int32) + buf = uninitialized StaticArray(UInt8, 4) + reader.read_fully(buf.to_slice) + value = buf.unsafe_as(Int32) rescue IO::Error next else diff --git a/src/crystal/system/win32/file_descriptor.cr b/src/crystal/system/win32/file_descriptor.cr index 4a99d82e9134..c70f4c7edfc0 100644 --- a/src/crystal/system/win32/file_descriptor.cr +++ b/src/crystal/system/win32/file_descriptor.cr @@ -519,7 +519,11 @@ private module ConsoleUtils @@read_requests = Deque(ReadRequest).new @@bytes_read = Deque(Int32).new @@mtx = ::Thread::Mutex.new - @@reader_thread = ::Thread.new { reader_loop } + {% if flag?(:execution_context) %} + @@reader_thread = ExecutionContext.new("READER-LOOP") { reader_loop } + {% else %} + @@reader_thread = ::Thread.new { reader_loop } + {% end %} private def self.reader_loop while true diff --git a/src/crystal/tracing.cr b/src/crystal/tracing.cr index d9508eda85a8..a6c1f747625f 100644 --- a/src/crystal/tracing.cr +++ b/src/crystal/tracing.cr @@ -81,6 +81,16 @@ module Crystal write value.name || '?' end + {% if flag?(:execution_context) %} + def write(value : ExecutionContext) : Nil + write value.name + end + + def write(value : ExecutionContext::Scheduler) : Nil + write value.name + end + {% end %} + def write(value : Pointer) : Nil write "0x" System.to_int_slice(value.address, 16, true, 2) { |bytes| write(bytes) } diff --git a/src/execution_context/execution_context.cr b/src/execution_context/execution_context.cr new file mode 100644 index 000000000000..c4cdbc2f336b --- /dev/null +++ b/src/execution_context/execution_context.cr @@ -0,0 +1,109 @@ +require "../crystal/event_loop" +require "../crystal/system/thread" +require "../crystal/system/thread_linked_list" +require "../fiber" +require "../fiber/stack_pool" +require "./scheduler" +require "./single_threaded" +require "./multi_threaded" +require "./isolated" +require "./monitor" + +{% raise "ERROR: execution contexts require the `preview_mt` compilation flag" unless flag?(:preview_mt) %} + +module ExecutionContext + @@default : ExecutionContext? + + @[AlwaysInline] + def self.default : ExecutionContext + @@default.not_nil!("expected default execution context to have been setup") + end + + # :nodoc: + def self.init_default_context : Nil + {% if flag?(:mt) %} + @@default = MultiThreaded.default(default_workers_count) + {% else %} + @@default = SingleThreaded.default + {% end %} + @@monitor = Monitor.new + end + + # Returns the default number of workers to start in the execution context. + def self.default_workers_count : Int32 + ENV["CRYSTAL_WORKERS"]?.try(&.to_i?) || Math.min(System.cpu_count.to_i, 32) + end + + # :nodoc: + protected class_getter(execution_contexts) { Thread::LinkedList(ExecutionContext).new } + + # :nodoc: + property next : ExecutionContext? + + # :nodoc: + property previous : ExecutionContext? + + # :nodoc: + def self.unsafe_each(&) : Nil + @@execution_contexts.try(&.unsafe_each { |execution_context| yield execution_context }) + end + + def self.each(&) : Nil + execution_contexts.each { |execution_context| yield execution_context } + end + + @[AlwaysInline] + def self.current : ExecutionContext + Thread.current.execution_context + end + + # Tells the current scheduler to suspend the current fiber and resume the + # next runnable fiber. The current fiber will never be resumed; you're + # responsible to reenqueue it. + # + # This method is safe as it only operates on the current `ExecutionContext` + # and `Scheduler`. + @[AlwaysInline] + def self.reschedule : Nil + Scheduler.current.reschedule + end + + # Tells the current scheduler to suspend the current fiber and to resume + # *fiber* instead. The current fiber will never be resumed; you're responsible + # to reenqueue it. + # + # Raises `RuntimeError` if the fiber doesn't belong to the current execution + # context. + # + # This method is safe as it only operates on the current `ExecutionContext` + # and `Scheduler`. + def self.resume(fiber : Fiber) : Nil + if fiber.execution_context == current + Scheduler.current.resume(fiber) + else + raise RuntimeError.new("Can't resume fiber from #{fiber.execution_context} into #{current}") + end + end + + # Creates a new fiber then calls `#enqueue` to add it to the execution + # context. + # + # May be called from any `ExecutionContext` (i.e. must be thread-safe). + def spawn(*, name : String? = nil, &block : ->) : Fiber + Fiber.new(name, self, &block).tap { |fiber| enqueue(fiber) } + end + + # Legacy support for the `same_thread` argument. Each execution context may + # decide to support it or not (e.g. a single threaded context can accept it). + abstract def spawn(*, name : String? = nil, same_thread : Bool, &block : ->) : Fiber + + abstract def stack_pool : Fiber::StackPool + abstract def stack_pool? : Fiber::StackPool? + + abstract def event_loop : Crystal::EventLoop + + # Enqueues a fiber to be resumed inside the execution context. + # + # May be called from any ExecutionContext (i.e. must be thread-safe). + abstract def enqueue(fiber : Fiber) : Nil +end diff --git a/src/execution_context/global_queue.cr b/src/execution_context/global_queue.cr new file mode 100644 index 000000000000..22535ab01ed6 --- /dev/null +++ b/src/execution_context/global_queue.cr @@ -0,0 +1,104 @@ +# The queue is a port of Go's `globrunq*` functions, distributed under a +# BSD-like license: +# + +require "../fiber/queue" +require "./runnables" + +module ExecutionContext + # Global queue of runnable fibers. + # Unbounded. + # Shared by all schedulers in an execution context. + # + # Basically a `Fiber::Queue` wrapped in a `Thread::Mutex`, at the exception of + # the `#grab?` method that tries to grab 1/Nth of the queue at once. + class GlobalQueue + def initialize(@mutex : Thread::Mutex) + @queue = Fiber::Queue.new + end + + # Grabs the lock and enqueues a runnable fiber on the global runnable queue. + def push(fiber : Fiber) : Nil + @mutex.synchronize { unsafe_push(fiber) } + end + + # Enqueues a runnable fiber on the global runnable queue. Assumes the lock + # is currently held. + def unsafe_push(fiber : Fiber) : Nil + @queue.push(fiber) + end + + # Grabs the lock and puts a runnable fiber on the global runnable queue. + def bulk_push(queue : Fiber::Queue*) : Nil + @mutex.synchronize { unsafe_bulk_push(queue) } + end + + # Puts a runnable fiber on the global runnable queue. Assumes the lock is + # currently held. + def unsafe_bulk_push(queue : Fiber::Queue*) : Nil + @queue.bulk_unshift(queue) + end + + # Grabs the lock and dequeues one runnable fiber from the global runnable + # queue. + def pop? : Fiber? + @mutex.synchronize { unsafe_pop? } + end + + # Dequeues one runnable fiber from the global runnable queue. Assumes the + # lock is currently held. + def unsafe_pop? : Fiber? + @queue.pop? + end + + # Grabs the lock then tries to grab a batch of fibers from the global + # runnable queue. Returns the next runnable fiber or `nil` if the queue was + # empty. + # + # `divisor` is meant for fair distribution of fibers across threads in the + # execution context; it should be the number of threads. + def grab?(runnables : Runnables, divisor : Int32) : Fiber? + @mutex.synchronize { unsafe_grab?(runnables, divisor) } + end + + # Try to grab a batch of fibers from the global runnable queue. Returns the + # next runnable fiber or `nil` if the queue was empty. Assumes the lock is + # currently held. + # + # `divisor` is meant for fair distribution of fibers across threads in the + # execution context; it should be the number of threads. + def unsafe_grab?(runnables : Runnables, divisor : Int32) : Fiber? + # ported from Go: globrunqget + return if @queue.empty? + + divisor = 1 if divisor < 1 + size = @queue.size + + n = { + size, # can't grab more than available + size // divisor + 1, # divide + try to take at least 1 fiber + runnables.capacity // 2, # refill half the destination queue + }.min + + fiber = @queue.pop? + + # OPTIMIZE: q = @queue.split(n - 1) then `runnables.push(pointerof(q))` (?) + (n - 1).times do + break unless f = @queue.pop? + runnables.push(f) + end + + fiber + end + + @[AlwaysInline] + def empty? : Bool + @queue.empty? + end + + @[AlwaysInline] + def size : Int32 + @queue.size + end + end +end diff --git a/src/execution_context/isolated.cr b/src/execution_context/isolated.cr new file mode 100644 index 000000000000..73ff72efc278 --- /dev/null +++ b/src/execution_context/isolated.cr @@ -0,0 +1,183 @@ +require "fiber/queue" + +module ExecutionContext + # ST scheduler. Owns a single thread running a single fiber. + # + # Concurrency is disabled: calls to `#spawn` will create fibers in another + # execution context (defaults to `ExecutionContext.default`). Any calls that + # result in waiting (e.g. sleep, or socket read/write) will block the thread + # since there are no other fibers to switch to. + # + # The fiber will still run in parallel to other fibers running in other + # execution contexts. + # + # Isolated fibers can still communicate with other fibers running in other + # execution contexts using standard means, such as `Channel(T)`, `WaitGroup` + # or `Mutex`. They can also execute IO operations or sleep just like any other + # fiber. + # + # Example: + # + # ``` + # ExecutionContext::Isolated.new("Gtk") { Gtk.main } + # ``` + class Isolated + include ExecutionContext + include ExecutionContext::Scheduler + + getter name : String + + @mutex : Thread::Mutex + protected getter thread : Thread + @main_fiber : Fiber + + getter event_loop : Crystal::EventLoop = Crystal::EventLoop.create + + getter? running : Bool = true + @enqueued = false + @waiting = false + + def initialize(@name : String, @spawn_context = ExecutionContext.default, &@func : ->) + @mutex = Thread::Mutex.new + @thread = uninitialized Thread + @main_fiber = uninitialized Fiber + @thread = start_thread + ExecutionContext.execution_contexts.push(self) + end + + private def start_thread : Thread + Thread.new(name: @name) do |thread| + @thread = thread + thread.execution_context = self + thread.current_scheduler = self + thread.main_fiber.name = @name + @main_fiber = thread.main_fiber + run + end + end + + # :nodoc: + @[AlwaysInline] + def execution_context : Isolated + self + end + + # :nodoc: + def stack_pool : Fiber::StackPool + raise RuntimeError.new("No stack pool for isolated contexts") + end + + # :nodoc: + def stack_pool? : Fiber::StackPool? + end + + @[AlwaysInline] + def spawn(*, name : String? = nil, &block : ->) : Fiber + @spawn_context.spawn(name: name, &block) + end + + # :nodoc: + @[AlwaysInline] + def spawn(*, name : String? = nil, same_thread : Bool, &block : ->) : Fiber + raise ArgumentError.new("#{self.class.name}#spawn doesn't support same_thread:true") if same_thread + @spawn_context.spawn(name: name, &block) + end + + def enqueue(fiber : Fiber) : Nil + Crystal.trace :sched, "enqueue", fiber: fiber, context: self + + unless fiber == @main_fiber + raise RuntimeError.new("Concurrency is disabled in isolated contexts") + end + + @mutex.synchronize do + @enqueued = true + + if @waiting + # wake up the blocked thread + @waiting = false + @event_loop.interrupt + else + # race: enqueued before the other thread started waiting + end + end + end + + protected def reschedule : Nil + Crystal.trace :sched, "reschedule" + + loop do + @mutex.synchronize do + # race: another thread already re-enqueued the fiber + if @enqueued + Crystal.trace :sched, "resume" + @enqueued = false + @waiting = false + return + end + @waiting = true + end + + # wait on the event loop + queue = Fiber::Queue.new + @event_loop.run(pointerof(queue), blocking: true) + + if fiber = queue.pop? + break if fiber == @main_fiber && queue.empty? + raise RuntimeError.new("Concurrency is disabled in isolated contexts") + end + + # the evloop got interrupted: restart + end + + # cleanup + @mutex.synchronize do + @waiting = false + @enqueued = false + end + + Crystal.trace :sched, "resume" + end + + protected def resume(fiber : Fiber) : Nil + # in theory we could resume @main_fiber, but this method may only be + # called from the current execution context, which is @main_fiber; it + # doesn't make any sense for a fiber to resume itself + raise RuntimeError.new("Can't resume #{fiber} in #{self}") + end + + private def run + Crystal.trace :sched, "started" + @func.call + ensure + @running = false + ExecutionContext.execution_contexts.delete(self) + end + + @[AlwaysInline] + def join : Nil + @thread.join + end + + @[AlwaysInline] + def inspect(io : IO) : Nil + to_s(io) + end + + def to_s(io : IO) : Nil + io << "#<" << self.class.name << ":0x" + object_id.to_s(io, 16) + io << ' ' << name << '>' + end + + def status : String + if @waiting + "event-loop" + elsif @running + "running" + else + "shutdown" + end + end + end +end diff --git a/src/execution_context/monitor.cr b/src/execution_context/monitor.cr new file mode 100644 index 000000000000..bf71892102c7 --- /dev/null +++ b/src/execution_context/monitor.cr @@ -0,0 +1,87 @@ +module ExecutionContext + class Monitor + struct Timer + def initialize(@every : Time::Span) + @last = Time.monotonic + end + + def elapsed?(now) + ret = @last + @every <= now + @last = now if ret + ret + end + end + + DEFAULT_EVERY = 10.milliseconds + DEFAULT_COLLECT_STACKS_EVERY = 5.seconds + + def initialize( + @every = DEFAULT_EVERY, + collect_stacks_every = DEFAULT_COLLECT_STACKS_EVERY, + ) + @collect_stacks_timer = Timer.new(collect_stacks_every) + + # FIXME: should be an ExecutionContext::Isolated instead of bare Thread? + # it might print to STDERR (requires evloop) for example; it may also + # allocate memory, for example to raise an exception (gc can run in the + # thread, running finalizers) which is probably not an issue. + @thread = uninitialized Thread + @thread = Thread.new(name: "SYSMON") { run_loop } + end + + # TODO: slow parallelism: instead of actively trying to wakeup, which can be + # expensive and a source of contention leading to waste more time than + # running the enqueued fiber(s) directly, the monitor thread could check the + # queues of MT schedulers and decide to start/wake threads, it could also + # complain that a fiber has been asked to yield numerous times. + # + # TODO: detect schedulers that have been stuck running the same fiber since + # the previous iteration (check current fiber & scheduler tick to avoid ABA + # issues), then mark the fiber to trigger a cooperative yield, for example, + # `Fiber.maybe_yield` could be called at potential cancellation points that + # would otherwise not need to block now (IO, mutexes, schedulers, manually + # called in loops, ...) which could lead fiber execution time be more fair. + # + # TODO: if an execution context didn't have the opportunity to run its + # event-loop since the previous iteration, then the monitor thread may + # choose to run it; it would avoid a set of fibers to always resume + # themselves at the expense of pending events. + # + # TODO: run the GC on low application activity? + private def run_loop : Nil + every do |now| + collect_stacks if @collect_stacks_timer.elapsed?(now) + end + end + + # Executes the block at exact intervals (depending on the OS scheduler + # precision and overall OS load), without counting the time to execute the + # block. + # + # OPTIMIZE: exponential backoff (and/or park) when all schedulers are + # pending to reduce CPU usage; thread wake up would have to signal the + # monitor thread. + private def every(&) + remaining = @every + + loop do + Thread.sleep(remaining) + now = Time.monotonic + yield(now) + remaining = (now + @every - Time.monotonic).clamp(Time::Span.zero..) + rescue exception + Crystal.print_error_buffered("BUG: %s#every crashed", + self.class.name, exception: exception) + end + end + + # Iterates each ExecutionContext and collects unused Fiber stacks. + # + # OPTIMIZE: should maybe happen during GC collections (?) + private def collect_stacks + Crystal.trace :sched, "collect_stacks" do + ExecutionContext.each(&.stack_pool?.try(&.collect)) + end + end + end +end diff --git a/src/execution_context/multi_threaded.cr b/src/execution_context/multi_threaded.cr new file mode 100644 index 000000000000..8a862e93b191 --- /dev/null +++ b/src/execution_context/multi_threaded.cr @@ -0,0 +1,277 @@ +require "./global_queue" +require "./multi_threaded/scheduler" + +module ExecutionContext + # MT scheduler. + # + # Owns multiple threads and starts a scheduler in each one. The number of + # threads is dynamic. Setting the minimum and maximum to the same value will + # start a fixed number of threads. + # + # Fully concurrent, fully parallel: fibers running in this context can be + # resumed by any thread in the context; fibers can run concurrently and in + # parallel to each other, in addition to running in parallel to every other + # fibers running in other contexts. + class MultiThreaded + include ExecutionContext + + getter name : String + + @mutex : Thread::Mutex + @condition : Thread::ConditionVariable + protected getter global_queue : GlobalQueue + + getter stack_pool : Fiber::StackPool = Fiber::StackPool.new + getter event_loop : Crystal::EventLoop = Crystal::EventLoop.create + @event_loop_lock = Atomic(Bool).new(false) + + @parked = Atomic(Int32).new(0) + @spinning = Atomic(Int32).new(0) + + # :nodoc: + protected def self.default(size : Int32) : self + new("DEFAULT", 1..size, hijack: true) + end + + # Starts a context with a maximum number of threads. Threads aren't started + # right away, but will be started as needed to increase parallelism up to + # the configured maximum. + def self.new(name : String, size : Int32) : self + new(name, 0..size, hijack: false) + end + + # Starts a context with a maximum number of threads. Threads aren't started + # right away, but will be started as needed to increase parallelism up to + # the configured maximum. + def self.new(name : String, size : Range(Nil, Int32)) : self + new(name, 0..size.end, hijack: false) + end + + # Starts a context with a minimum and a maximum number of threads. The + # minimum number of threads will be started right away, then threads will be + # started as needed to increase parallelism up to the configured maximum. + def self.new(name : String, size : Range(Int32, Int32)) : self + new(name, size, hijack: false) + end + + protected def initialize(@name : String, @size : Range(Int32, Int32), hijack : Bool) + raise ArgumentError.new("#{self.class.name} needs at least one thread") if @size.end < 1 + + @mutex = Thread::Mutex.new + @condition = Thread::ConditionVariable.new + + @global_queue = GlobalQueue.new(@mutex) + @schedulers = Array(Scheduler).new(@size.end) + @threads = Array(Thread).new(@size.end) + + @rng = Random::PCG32.new + + start_schedulers(hijack) + start_initial_threads(hijack) + + ExecutionContext.execution_contexts.push(self) + end + + # The number of threads that have been started. + def size : Int32 + @threads.size + end + + # The maximum number of threads that can be started. + def capacity : Int32 + @size.end + end + + def stack_pool? : Fiber::StackPool? + @stack_pool + end + + private def start_schedulers(hijack) + @size.end.times do |index| + @schedulers << Scheduler.new(self, "#{@name}-#{index}") + end + end + + private def start_initial_threads(hijack) + @size.begin.times do |index| + scheduler = @schedulers[index] + + if hijack && index == 0 + @threads << hijack_current_thread(scheduler, index) + else + @threads << start_thread(scheduler, index) + end + end + end + + # Attaches *scheduler* to the current `Thread`, usually the executable's + # main thread. Starts a `Fiber` to run the scheduler loop. + private def hijack_current_thread(scheduler, index) : Thread + thread = Thread.current + thread.internal_name = scheduler.name + thread.execution_context = self + thread.current_scheduler = scheduler + + scheduler.thread = thread + scheduler.main_fiber = Fiber.new("#{scheduler.name}:loop", self) do + scheduler.run_loop + end + + thread + end + + # Start a new `Thread` and attaches *scheduler*. Runs the scheduler loop + # directly in the thread's main `Fiber`. + private def start_thread(scheduler, index) : Thread + Thread.new(name: scheduler.name) do |thread| + thread.execution_context = self + thread.current_scheduler = scheduler + + scheduler.thread = thread + scheduler.main_fiber = thread.main_fiber + scheduler.main_fiber.name = "#{scheduler.name}:loop" + scheduler.run_loop + end + end + + def spawn(*, name : String? = nil, same_thread : Bool, &block : ->) : Fiber + raise ArgumentError.new("#{self.class.name}#spawn doesn't support same_thread:true") if same_thread + self.spawn(name: name, &block) + end + + def enqueue(fiber : Fiber) : Nil + if ExecutionContext.current == self + # local enqueue: push to local queue of current scheduler + ExecutionContext::Scheduler.current.enqueue(fiber) + else + # cross context: push to global queue + Crystal.trace :sched, "enqueue", fiber: fiber, to_context: self + @global_queue.push(fiber) + wake_scheduler + end + end + + # Pick a scheduler at random then iterate all schedulers to try to steal + # fibers from. + protected def steal(& : Scheduler ->) : Nil + return if size == 1 + + i = @rng.next_int + n = @schedulers.size + + n.times do |j| + if scheduler = @schedulers[(i &+ j) % n]? + yield scheduler + end + end + end + + protected def park_thread(&) : Fiber? + @mutex.synchronize do + # avoid races by checking queues again + if fiber = yield + return fiber + end + + Crystal.trace :sched, "park" + @parked.add(1, :acquire_release) + + @condition.wait(@mutex) + + # we don't decrement @parked because #wake_scheduler did + Crystal.trace :sched, "wakeup" + end + + nil + end + + # This method always runs in parallel! + # + # This can be called from any thread in the context but can also be called + # from external execution contexts, in which case the context may have its + # last thread about to park itself, and we must prevent the last thread from + # parking when there is a parallel cross context enqueue! + # + # OPTIMIZE: instead of blindly spending time (blocking progress on the + # current thread) to unpark a thread / start a new thread we could move the + # responsibility to an external OBSERVER to increase parallelism in a MT + # context when it detects pending work. + protected def wake_scheduler : Nil + # another thread is spinning: nothing to do (it shall notice the enqueue) + if @spinning.get(:relaxed) > 0 + return + end + + # interrupt a thread waiting on the event loop + if @event_loop_lock.get(:relaxed) + @event_loop.interrupt + return + end + + # we can check @parked without locking the mutex because we can't push to + # the global queue _and_ park the thread at the same time, so either the + # thread is already parked (and we must awake it) or it noticed (or will + # notice) the fiber in the global queue; + # + # we still rely on an atomic to make sure the actual value is visible by + # the current thread + if @parked.get(:acquire) > 0 + @mutex.synchronize do + # avoid race conditions + return if @parked.get(:relaxed) == 0 + return if @spinning.get(:relaxed) > 0 + + # increase the number of spinning threads _now_ to avoid multiple + # threads from trying to wakeup multiple threads at the same time + # + # we must also decrement the number of parked threads because another + # thread could lock the mutex and increment @spinning again before the + # signaled thread is resumed + spinning = @spinning.add(1, :acquire_release) + parked = @parked.sub(1, :acquire_release) + + @condition.signal + end + return + end + + # check if we can start another thread; no need for atomics, the values + # shall be rather stable over time and we check them again inside the + # mutex + return if @threads.size == @size.end + + @mutex.synchronize do + index = @threads.size + return if index == @size.end # check again + + @threads << start_thread(@schedulers[index], index) + end + end + + # Only one thread is allowed to run the event loop. Yields then returns true + # if the lock was acquired, otherwise returns false immediately. + protected def lock_evloop?(&) : Bool + if @event_loop_lock.swap(true, :acquire) == false + begin + yield + ensure + @event_loop_lock.set(false, :release) + end + true + else + false + end + end + + @[AlwaysInline] + def inspect(io : IO) : Nil + to_s(io) + end + + def to_s(io : IO) : Nil + io << "#<" << self.class.name << ":0x" + object_id.to_s(io, 16) + io << ' ' << name << '>' + end + end +end diff --git a/src/execution_context/multi_threaded/scheduler.cr b/src/execution_context/multi_threaded/scheduler.cr new file mode 100644 index 000000000000..270d25e7439b --- /dev/null +++ b/src/execution_context/multi_threaded/scheduler.cr @@ -0,0 +1,273 @@ +require "crystal/pointer_linked_list" +require "../runnables" + +module ExecutionContext + class MultiThreaded + # MT fiber scheduler. + # + # Owns a single thread inside a MT execution context. + class Scheduler + include ExecutionContext::Scheduler + + getter name : String + + # :nodoc: + property execution_context : MultiThreaded + protected property! thread : Thread + protected property! main_fiber : Fiber + + @global_queue : GlobalQueue + @runnables : Runnables(256) + @event_loop : Crystal::EventLoop + + @tick : Int32 = 0 + @spinning = false + @waiting = false + @parked = false + + protected def initialize(@execution_context, @name) + @global_queue = @execution_context.global_queue + @runnables = Runnables(256).new(@global_queue) + @event_loop = @execution_context.event_loop + end + + # :nodoc: + def spawn(*, name : String? = nil, same_thread : Bool, &block : ->) : Fiber + raise RuntimeError.new("#{self.class.name}#spawn doesn't support same_thread:true") if same_thread + self.spawn(name: name, &block) + end + + # Unlike `ExecutionContext::MultiThreaded#enqueue` this method is only + # safe to call on `ExecutionContext.current` which should always be the + # case, since cross context enqueues must call + # `ExecutionContext::MultiThreaded#enqueue` through `Fiber#enqueue`. + protected def enqueue(fiber : Fiber) : Nil + Crystal.trace :sched, "enqueue", fiber: fiber + @runnables.push(fiber) + @execution_context.wake_scheduler unless @execution_context.capacity == 1 + end + + # Enqueue a list of fibers in a single operation and returns a fiber to + # resume immediately. + # + # This is called after running the event loop for example. + private def enqueue_many(queue : Fiber::Queue*) : Fiber? + if fiber = queue.value.pop? + Crystal.trace :sched, "enqueue", size: queue.value.size, fiber: fiber + unless queue.value.empty? + @runnables.bulk_push(queue) + @execution_context.wake_scheduler unless @execution_context.capacity == 1 + end + fiber + end + end + + protected def reschedule : Nil + Crystal.trace :sched, "reschedule" + if fiber = quick_dequeue? + resume fiber unless fiber == thread.current_fiber + else + # nothing to do: switch back to the main loop to spin/wait/park + resume main_fiber + end + end + + protected def resume(fiber : Fiber) : Nil + Crystal.trace :sched, "resume", fiber: fiber + + # in a multithreaded environment the fiber may be dequeued before its + # running context has been saved on the stack (thread A tries to resume + # fiber but thread B didn't saved its context yet); we must wait until + # the context switch assembly saved all registers on the stack and set + # the fiber as resumable. + until fiber.resumable? + if fiber.dead? + raise "BUG: tried to resume dead fiber #{fiber} (#{inspect})" + end + + # OPTIMIZE: if the thread saving the fiber context has been preempted, + # this will block the current thread from progressing... shall we + # abort and reenqueue the fiber after MAX iterations? + Intrinsics.pause + end + + swapcontext(fiber) + end + + @[AlwaysInline] + private def quick_dequeue? : Fiber? + # every once in a while: dequeue from global queue to avoid two fibers + # constantly respawing each other to completely occupy the local queue + if (@tick &+= 1) % 61 == 0 + if fiber = @global_queue.pop? + return fiber + end + end + + # dequeue from local queue + if fiber = @runnables.get? + return fiber + end + end + + protected def run_loop : Nil + Crystal.trace :sched, "started" + + loop do + if fiber = find_next_runnable + spin_stop if @spinning + resume fiber + else + # the event loop enqueued a fiber (or was interrupted) or the + # scheduler was unparked: go for the next iteration + end + rescue exception + Crystal.print_error_buffered("BUG: %s#run_loop [%s] crashed", + self.class.name, @name, exception: exception) + end + end + + private def find_next_runnable : Fiber? + find_next_runnable do |fiber| + return fiber if fiber + end + end + + private def find_next_runnable(&) : Nil + queue = Fiber::Queue.new + + # nothing to do: start spinning + spinning do + yield @global_queue.grab?(@runnables, divisor: @execution_context.size) + + if @execution_context.lock_evloop? { @event_loop.run(pointerof(queue), blocking: false) } + if fiber = enqueue_many(pointerof(queue)) + spin_stop + yield fiber + end + end + + yield try_steal? + end + + # wait on the event loop for events and timers to activate + evloop_ran = @execution_context.lock_evloop? do + @waiting = true + + # there is a time window between stop spinning and start waiting + # during which another context may have enqueued a fiber, check again + # before blocking on the event loop to avoid missing a runnable fiber, + # which may block for a long time: + yield @global_queue.grab?(@runnables, divisor: @execution_context.size) + + # block on the event loop until an event is ready or the loop is + # interrupted + @event_loop.run(pointerof(queue), blocking: true) + ensure + @waiting = false + end + + if evloop_ran + yield enqueue_many(pointerof(queue)) + + # the event loop was interrupted: restart the loop + return + end + + # no runnable fiber and another thread is already running the event + # loop: park the thread until another scheduler or another context + # enqueues a fiber + @execution_context.park_thread do + # by the time we acquire the lock, another thread may have enqueued + # fiber(s) and already tried to wakeup a thread (race) so we must + # check again; we don't check the scheduler's local queue (it's empty) + + yield @global_queue.unsafe_grab?(@runnables, divisor: @execution_context.size) + yield try_steal? + + @parked = true + nil + end + @parked = false + + # immediately mark the scheduler as spinning (we just unparked); we + # don't increment the number of spinning threads since + # `ExecutionContext::MultiThreaded#wake_scheduler` already did + @spinning = true + end + + # This method always runs in parallel! + private def try_steal? : Fiber? + @execution_context.steal do |other| + if other == self + # no need to steal from ourselves + next + end + + if fiber = @runnables.steal_from(other.@runnables) + Crystal.trace :sched, "stole", from: other, size: @runnables.size, fiber: fiber + return fiber + end + end + end + + # OPTIMIZE: skip spinning if there are enough threads spinning already + private def spinning(&) + spin_start + + 4.times do |iter| + spin_backoff(iter) unless iter == 0 + yield + end + + spin_stop + end + + @[AlwaysInline] + private def spin_start : Nil + return if @spinning + + @spinning = true + @execution_context.@spinning.add(1, :acquire_release) + end + + @[AlwaysInline] + private def spin_stop : Nil + return unless @spinning + + @execution_context.@spinning.sub(1, :acquire_release) + @spinning = false + end + + @[AlwaysInline] + private def spin_backoff(iter) + # OPTIMIZE: consider exponential backoff, but beware of edge cases, like + # creating latency before we notice a cross context enqueue, for example + Thread.yield + end + + @[AlwaysInline] + def inspect(io : IO) : Nil + to_s(io) + end + + def to_s(io : IO) : Nil + io << "#<" << self.class.name << ":0x" + object_id.to_s(io, 16) + io << ' ' << @name << '>' + end + + def status : String + if @spinning + "spinning" + elsif @waiting + "event-loop" + elsif @parked + "parked" + else + "running" + end + end + end + end +end diff --git a/src/execution_context/runnables.cr b/src/execution_context/runnables.cr new file mode 100644 index 000000000000..6be2fda446c0 --- /dev/null +++ b/src/execution_context/runnables.cr @@ -0,0 +1,210 @@ +# The queue is a port of Go's `runq*` functions, distributed under a BSD-like +# license: +# +# The queue derivates from the chase-lev lock-free queue with adaptations: +# +# - single ring buffer (per scheduler); +# - on overflow: bulk push half the ring to `GlobalQueue`; +# - on empty: bulk grab up to half the ring from `GlobalQueue`; +# - bulk push operation; + +require "../fiber/queue" +require "./global_queue" + +module ExecutionContext + # :nodoc: + # + # Local queue or runnable fibers for schedulers. + # Bounded. + # First-in, first-out semantics (FIFO). + # Single producer, multiple consumers thread safety. + # + # Private to an execution context scheduler, except for stealing methods that + # can be called from any thread in the execution context. + class Runnables(N) + def initialize(@global_queue : GlobalQueue) + # head is an index to the buffer where the next fiber to dequeue is. + # + # tail is an index to the buffer where the next fiber to enqueue will be + # (on the next push). + # + # head is always behind tail (not empty) or equal (empty) but never after + # tail (the queue would have a negative size => bug). + @head = Atomic(UInt32).new(0) + @tail = Atomic(UInt32).new(0) + @buffer = uninitialized Fiber[N] + end + + @[AlwaysInline] + def capacity : Int32 + N + end + + # Tries to push fiber on the local runnable queue. If the run queue is full, + # pushes fiber on the global queue, which will grab the global lock. + # + # Executed only by the owner. + def push(fiber : Fiber) : Nil + # ported from Go: runqput + loop do + head = @head.get(:acquire) # sync with consumers + tail = @tail.get(:relaxed) + + if (tail &- head) < N + # put fiber to local queue + @buffer.to_unsafe[tail % N] = fiber + + # make the fiber available for consumption + @tail.set(tail &+ 1, :release) + return + end + + if push_slow(fiber, head, tail) + return + end + + # failed to advance head (another scheduler stole fibers), + # the queue isn't full, now the push above must succeed + end + end + + private def push_slow(fiber : Fiber, head : UInt32, tail : UInt32) : Bool + # ported from Go: runqputslow + n = (tail &- head) // 2 + raise "BUG: queue is not full" if n != N // 2 + + # first, try to grab half of the fibers from local queue + batch = uninitialized Fiber[N] # actually N // 2 + 1 but that doesn't compile + n.times do |i| + batch.to_unsafe[i] = @buffer.to_unsafe[(head &+ i) % N] + end + _, success = @head.compare_and_set(head, head &+ n, :acquire_release, :acquire) + return false unless success + + # append fiber to the batch + batch.to_unsafe[n] = fiber + + # link the fibers + n.times do |i| + batch.to_unsafe[i].schedlink = batch.to_unsafe[i &+ 1] + end + queue = Fiber::Queue.new(batch.to_unsafe[0], batch.to_unsafe[n], size: (n &+ 1).to_i32) + + # now put the batch on global queue (grabs the global lock) + @global_queue.bulk_push(pointerof(queue)) + + true + end + + # Tries to enqueue all the fibers in `queue` into the local queue. If the + # queue is full, the overflow will be pushed to the global queue; in that + # case this will temporarily acquire the global queue lock. + # + # Executed only by the owner. + def bulk_push(queue : Fiber::Queue*) : Nil + # ported from Go: runqputbatch + head = @head.get(:acquire) # sync with other consumers + tail = @tail.get(:relaxed) + + while !queue.value.empty? && (tail &- head) < N + fiber = queue.value.pop + @buffer.to_unsafe[tail % N] = fiber + tail &+= 1 + end + + # make the fibers available for consumption + @tail.set(tail, :release) + + # put any overflow on global queue + @global_queue.bulk_push(queue) unless queue.value.empty? + end + + # Dequeues the next runnable fiber from the local queue. + # + # Executed only by the owner. + # TODO: rename as `#shift?` + def get? : Fiber? + # ported from Go: runqget + + head = @head.get(:acquire) # sync with other consumers + loop do + tail = @tail.get(:relaxed) + return if tail == head + + fiber = @buffer.to_unsafe[head % N] + head, success = @head.compare_and_set(head, head &+ 1, :acquire_release, :acquire) + return fiber if success + end + end + + # Steals half the fibers from the local queue of `src` and puts them onto + # the local queue. Returns one of the stolen fibers, or `nil` on failure. + # + # Only executed from the owner (when the local queue is empty). + def steal_from(src : Runnables) : Fiber? + # ported from Go: runqsteal + + tail = @tail.get(:relaxed) + n = src.grab(@buffer.to_unsafe, tail) + return if n == 0 + + # 'dequeue' last fiber from @buffer + n &-= 1 + fiber = @buffer.to_unsafe[(tail &+ n) % N] + return fiber if n == 0 + + head = @head.get(:acquire) # sync with consumers + if tail &- head &+ n >= N + raise "BUG: local queue overflow" + end + + # make the fibers available for consumption + @tail.set(tail &+ n, :release) + + fiber + end + + # Grabs a batch of fibers from local queue into `buffer` of size N (normally + # the ring buffer of another `Runnables`) starting at `buffer_head`. Returns + # number of grabbed fibers. + # + # Can be executed by any scheduler. + protected def grab(buffer : Fiber*, buffer_head : UInt32) : UInt32 + # ported from Go: runqgrab + + head = @head.get(:acquire) # sync with other consumers + loop do + tail = @tail.get(:acquire) # sync with the producer + + n = tail &- head + n -= n // 2 + return 0_u32 if n == 0 # queue is empty + + if n > N // 2 + # read inconsistent head and tail + head = @head.get(:acquire) + next + end + + n.times do |i| + fiber = @buffer.to_unsafe[(head &+ i) % N] + buffer[(buffer_head &+ i) % N] = fiber + end + + # try to mark the fiber as consumed + head, success = @head.compare_and_set(head, head &+ n, :acquire_release, :acquire) + return n if success + end + end + + @[AlwaysInline] + def empty? : Bool + @head.get(:relaxed) == @tail.get(:relaxed) + end + + @[AlwaysInline] + def size : UInt32 + @tail.get(:relaxed) &- @head.get(:relaxed) + end + end +end diff --git a/src/execution_context/scheduler.cr b/src/execution_context/scheduler.cr new file mode 100644 index 000000000000..fe5acab96500 --- /dev/null +++ b/src/execution_context/scheduler.cr @@ -0,0 +1,83 @@ +module ExecutionContext + module Scheduler + @[AlwaysInline] + def self.current : Scheduler + Thread.current.current_scheduler + end + + protected abstract def thread : Thread + protected abstract def execution_context : ExecutionContext + + # Instantiates a fiber and enqueues it into the scheduler's local queue. + def spawn(*, name : String? = nil, &block : ->) : Fiber + fiber = Fiber.new(name, execution_context, &block) + enqueue(fiber) + fiber + end + + # Legacy support for the *same_thread* argument. Each execution context may + # decide to support it or not (e.g. a single threaded context can accept it). + abstract def spawn(*, name : String? = nil, same_thread : Bool, &block : ->) : Fiber + + # Suspends the execution of the current fiber and resumes the next runnable + # fiber. + # + # Unsafe. Must only be called on `ExecutionContext.current`. Prefer + # `ExecutionContext.reschedule` instead. + protected abstract def enqueue(fiber : Fiber) : Nil + + # Suspends the execution of the current fiber and resumes the next runnable + # fiber. + # + # Unsafe. Must only be called on `ExecutionContext.current`. Prefer + # `ExecutionContext.reschedule` instead. + protected abstract def reschedule : Nil + + # Suspends the execution of the current fiber and resumes *fiber*. + # + # The current fiber will never be resumed; you're responsible to reenqueue + # it. + # + # Unsafe. Must only be called on `ExecutionContext.current`. Prefer + # `ExecutionContext.resume` instead. + protected abstract def resume(fiber : Fiber) : Nil + + # Switches the thread from running the current fiber to run *fiber* instead. + # + # Handles thread safety around fiber stacks: locks the GC to not start a + # collection while we're switching context, releases the stack of a dead + # fiber. + # + # Unsafe. Must only be called by the current scheduler. Caller must ensure + # that the fiber indeed belongs to the current execution context, and that + # the fiber can indeed be resumed. + protected def swapcontext(fiber : Fiber) : Nil + current_fiber = thread.current_fiber + + {% unless flag?(:interpreted) %} + thread.dead_fiber = current_fiber if current_fiber.dead? + {% end %} + + GC.lock_read + thread.current_fiber = fiber + Fiber.swapcontext(pointerof(current_fiber.@context), pointerof(fiber.@context)) + GC.unlock_read + + # we switched context so we can't trust *self* anymore (it is the + # scheduler that rescheduled *fiber* which may be another scheduler) as + # well as any other local or instance variables (e.g. we must resolve + # `Thread.current` again) + # + # that being said, we can still trust the *current_fiber* local variable + # (it's the only exception) + + {% unless flag?(:interpreted) %} + if fiber = Thread.current.dead_fiber? + fiber.execution_context.stack_pool.release(fiber.@stack) + end + {% end %} + end + + abstract def status : String + end +end diff --git a/src/execution_context/single_threaded.cr b/src/execution_context/single_threaded.cr new file mode 100644 index 000000000000..3ae11579e0ce --- /dev/null +++ b/src/execution_context/single_threaded.cr @@ -0,0 +1,284 @@ +require "./global_queue" +require "./runnables" + +module ExecutionContext + # ST scheduler. Owns a single thread. + # + # Fully concurrent, limited parallelism: concurrency is limited to that + # thread; fibers running in this context will thus never run in parallel to + # each other, but they may still run in parallel to fibers running in other + # contexts (thus on another thread). + class SingleThreaded + include ExecutionContext + include ExecutionContext::Scheduler + + getter name : String + + protected getter thread : Thread + @main_fiber : Fiber + + @mutex : Thread::Mutex + @global_queue : GlobalQueue + @runnables : Runnables(256) + + getter stack_pool : Fiber::StackPool = Fiber::StackPool.new + getter event_loop : Crystal::EventLoop = Crystal::EventLoop.create + + @waiting = Atomic(Bool).new(false) + @parked = Atomic(Bool).new(false) + @spinning = Atomic(Bool).new(false) + @tick : Int32 = 0 + + # :nodoc: + protected def self.default : self + new("DEFAULT", hijack: true) + end + + def self.new(name : String) : self + new(name, hijack: false) + end + + protected def initialize(@name : String, hijack : Bool) + @mutex = Thread::Mutex.new + @global_queue = GlobalQueue.new(@mutex) + @runnables = Runnables(256).new(@global_queue) + + @thread = uninitialized Thread + @main_fiber = uninitialized Fiber + @thread = hijack ? hijack_current_thread : start_thread + + ExecutionContext.execution_contexts.push(self) + end + + # :nodoc: + def execution_context : self + self + end + + def stack_pool? : Fiber::StackPool? + @stack_pool + end + + # Initializes the scheduler on the current thread (usually the executable's + # main thread). + private def hijack_current_thread : Thread + thread = Thread.current + thread.internal_name = @name + thread.execution_context = self + thread.current_scheduler = self + @main_fiber = Fiber.new("#{@name}:loop", self) { run_loop } + thread + end + + # Creates a new thread to initialize the scheduler. + private def start_thread : Thread + Thread.new(name: @name) do |thread| + thread.execution_context = self + thread.current_scheduler = self + @main_fiber = thread.main_fiber + @main_fiber.name = "#{@name}:loop" + run_loop + end + end + + # :nodoc: + def spawn(*, name : String? = nil, same_thread : Bool, &block : ->) : Fiber + # whatever the value of same_thread: the fibers will always run on the + # same thread + self.spawn(name: name, &block) + end + + def enqueue(fiber : Fiber) : Nil + if ExecutionContext.current == self + # local enqueue + Crystal.trace :sched, "enqueue", fiber: fiber + @runnables.push(fiber) + else + # cross context enqueue + Crystal.trace :sched, "enqueue", fiber: fiber, to_context: self + @global_queue.push(fiber) + wake_scheduler + end + end + + # Enqueue a list of fibers in a single operation and returns a fiber to + # resume immediately. + # + # This is called after running the event loop for example. + private def enqueue_many(queue : Fiber::Queue*) : Fiber? + if fiber = queue.value.pop? + @runnables.bulk_push(queue) unless queue.value.empty? + fiber + end + end + + protected def reschedule : Nil + Crystal.trace :sched, "reschedule" + if fiber = quick_dequeue? + resume fiber unless fiber == @thread.current_fiber + else + # nothing to do: switch back to the main loop to spin/park + resume @main_fiber + end + end + + protected def resume(fiber : Fiber) : Nil + unless fiber.resumable? + if fiber.dead? + raise "BUG: tried to resume dead fiber #{fiber} (#{inspect})" + else + raise "BUG: can't resume running fiber #{fiber} (#{inspect})" + end + end + swapcontext(fiber) + end + + @[AlwaysInline] + private def quick_dequeue? : Fiber? + # every once in a while: dequeue from global queue to avoid two fibers + # constantly respawing each other to completely occupy the local queue + if (@tick &+= 1) % 61 == 0 + if fiber = @global_queue.pop? + return fiber + end + end + + # try local queue + if fiber = @runnables.get? + return fiber + end + + # try to refill local queue + if fiber = @global_queue.grab?(@runnables, divisor: 1) + return fiber + end + + # run the event loop to see if any event is activable + queue = Fiber::Queue.new + @event_loop.run(pointerof(queue), blocking: false) + return enqueue_many(pointerof(queue)) + end + + private def run_loop : Nil + Crystal.trace :sched, "started" + + loop do + if fiber = find_next_runnable + spin_stop if @spinning.get(:relaxed) + resume fiber + else + # the event loop enqueued a fiber (or was interrupted) or the + # scheduler was unparked: go for the next iteration + end + rescue exception + Crystal.print_error_buffered("BUG: %s#run_loop [%s] crashed", + self.class.name, @name, exception: exception) + end + end + + private def find_next_runnable : Fiber? + find_next_runnable do |fiber| + return fiber if fiber + end + end + + private def find_next_runnable(&) : Nil + queue = Fiber::Queue.new + + # nothing to do: start spinning + spinning do + yield @global_queue.grab?(@runnables, divisor: 1) + + @event_loop.run(pointerof(queue), blocking: false) + yield enqueue_many(pointerof(queue)) + end + + # block on the event loop, waiting for pending event(s) to activate + waiting do + # there is a time window between stop spinning and start waiting during + # which another context may have enqueued a fiber, check again before + # waiting on the event loop to avoid missing a runnable fiber which (may + # block for a long time): + yield @global_queue.grab?(@runnables, divisor: 1) + + @event_loop.run(pointerof(queue), blocking: true) + yield enqueue_many(pointerof(queue)) + + # the event loop was interrupted: restart the loop + return + end + end + + private def spinning(&) + spin_start + + 4.times do |iter| + spin_backoff(iter) unless iter == 0 + yield + end + + spin_stop + end + + private def spin_start : Nil + @spinning.set(true, :release) + end + + private def spin_stop : Nil + @spinning.set(false, :release) + end + + @[AlwaysInline] + private def spin_backoff(iter) + # OPTIMIZE: consider exponential backoff, but beware of latency to notice + # cross context enqueues + Thread.yield + end + + @[AlwaysInline] + private def waiting(&) + @waiting.set(true, :release) + begin + yield + ensure + @waiting.set(false, :release) + end + end + + # This method runs in parallel to the rest of the ST scheduler! + # + # This is called from another context _after_ enqueueing into the global + # queue to try and wakeup the ST thread running in parallel that may be + # running, spinning or waiting on the event loop. + private def wake_scheduler : Nil + if @spinning.get(:acquire) + return + end + + if @waiting.get(:acquire) + @event_loop.interrupt + end + end + + @[AlwaysInline] + def inspect(io : IO) : Nil + to_s(io) + end + + def to_s(io : IO) : Nil + io << "#<" << self.class.name << ":0x" + object_id.to_s(io, 16) + io << ' ' << name << '>' + end + + def status : String + if @spinning.get(:relaxed) + "spinning" + elsif @waiting.get(:relaxed) + "event-loop" + else + "running" + end + end + end +end diff --git a/src/fiber.cr b/src/fiber.cr index 55745666c66d..6c95b9298d33 100644 --- a/src/fiber.cr +++ b/src/fiber.cr @@ -1,4 +1,5 @@ require "crystal/system/thread_linked_list" +require "crystal/print_buffered" require "./fiber/context" # :nodoc: @@ -58,7 +59,10 @@ class Fiber property name : String? @alive = true - {% if flag?(:preview_mt) %} @current_thread = Atomic(Thread?).new(nil) {% end %} + + {% if flag?(:preview_mt) && !flag?(:execution_context) %} + @current_thread = Atomic(Thread?).new(nil) + {% end %} # :nodoc: property next : Fiber? @@ -66,6 +70,13 @@ class Fiber # :nodoc: property previous : Fiber? + {% if flag?(:execution_context) %} + property! execution_context : ExecutionContext + {% end %} + + # :nodoc: + property schedlink : Fiber? + # :nodoc: def self.inactive(fiber : Fiber) fibers.delete(fiber) @@ -83,16 +94,19 @@ class Fiber fibers.each { |fiber| yield fiber } end + {% begin %} # Creates a new `Fiber` instance. # # When the fiber is executed, it runs *proc* in its context. # # *name* is an optional and used only as an internal reference. - def initialize(@name : String? = nil, &@proc : ->) + def initialize(@name : String? = nil, {% if flag?(:execution_context) %}@execution_context : ExecutionContext = ExecutionContext.current,{% end %} &@proc : ->) @context = Context.new @stack, @stack_bottom = {% if flag?(:interpreted) %} {Pointer(Void).null, Pointer(Void).null} + {% elsif flag?(:execution_context) %} + execution_context.stack_pool.checkout {% else %} Crystal::Scheduler.stack_pool.checkout {% end %} @@ -122,6 +136,7 @@ class Fiber Fiber.fibers.push(self) end + {% end %} # :nodoc: def initialize(@stack : Void*, thread) @@ -138,30 +153,37 @@ class Fiber {% end %} thread.gc_thread_handler, @stack_bottom = GC.current_thread_stack_bottom @name = "main" - {% if flag?(:preview_mt) %} @current_thread.set(thread) {% end %} + + {% if flag?(:preview_mt) && !flag?(:execution_context) %} + @current_thread.set(thread) + {% end %} + Fiber.fibers.push(self) + + # we don't initialize @execution_context here (we may not have an execution + # context yet), and we can't detect ExecutionContext.current (we may reach + # an infinite recursion). end # :nodoc: def run GC.unlock_read + + {% if flag?(:execution_context) && !flag?(:interpreted) %} + # if the fiber previously running on this thread has terminated, we can + # now safely release its stack + if fiber = Thread.current.dead_fiber? + fiber.execution_context.stack_pool.release(fiber.@stack) + end + {% end %} + @proc.call rescue ex - io = {% if flag?(:preview_mt) %} - IO::Memory.new(4096) # PIPE_BUF - {% else %} - STDERR - {% end %} if name = @name - io << "Unhandled exception in spawn(name: " << name << "): " + Crystal.print_buffered("Unhandled exception in spawn(name: %s)", name, exception: ex, to: STDERR) else - io << "Unhandled exception in spawn: " + Crystal.print_buffered("Unhandled exception in spawn", exception: ex, to: STDERR) end - ex.inspect_with_backtrace(io) - {% if flag?(:preview_mt) %} - STDERR.write(io.to_slice) - {% end %} - STDERR.flush ensure # Remove the current fiber from the linked list Fiber.inactive(self) @@ -172,9 +194,17 @@ class Fiber @timeout_select_action = nil @alive = false - {% unless flag?(:interpreted) %} + + {% unless flag?(:interpreted) || flag?(:execution_context) %} + # interpreted: the interpreter is managing the stacks + # + # execution context: do not prematurely release the stack before we switch + # to another fiber so we don't end up with a thread reusing a stack for a + # new fiber while the current fiber isn't fully terminated (oops); even + # without the pool, we can't unmap before we swap context. Crystal::Scheduler.stack_pool.release(@stack) {% end %} + Fiber.suspend end @@ -216,7 +246,11 @@ class Fiber # puts "never reached" # ``` def resume : Nil - Crystal::Scheduler.resume(self) + {% if flag?(:execution_context) %} + ExecutionContext.resume(self) + {% else %} + Crystal::Scheduler.resume(self) + {% end %} end # Adds this fiber to the scheduler's runnables queue for the current thread. @@ -225,7 +259,11 @@ class Fiber # the next time it has the opportunity to reschedule to another fiber. There # are no guarantees when that will happen. def enqueue : Nil - Crystal::Scheduler.enqueue(self) + {% if flag?(:execution_context) %} + execution_context.enqueue(self) + {% else %} + Crystal::Scheduler.enqueue(self) + {% end %} end # :nodoc: @@ -293,7 +331,14 @@ class Fiber # end # ``` def self.yield : Nil - Crystal::Scheduler.yield + Crystal.trace :sched, "yield" + + {% if flag?(:execution_context) %} + Fiber.current.resume_event.add(0.seconds) + Fiber.suspend + {% else %} + Crystal::Scheduler.yield + {% end %} end # Suspends execution of the current fiber indefinitely. @@ -307,7 +352,11 @@ class Fiber # useful if the fiber needs to wait for something to happen (for example an IO # event, a message is ready in a channel, etc.) which triggers a re-enqueue. def self.suspend : Nil - Crystal::Scheduler.reschedule + {% if flag?(:execution_context) %} + ExecutionContext.reschedule + {% else %} + Crystal::Scheduler.reschedule + {% end %} end def to_s(io : IO) : Nil @@ -329,7 +378,7 @@ class Fiber GC.push_stack @context.stack_top, @stack_bottom end - {% if flag?(:preview_mt) %} + {% if flag?(:preview_mt) && !flag?(:execution_context) %} # :nodoc: def set_current_thread(thread = Thread.current) : Thread @current_thread.set(thread) diff --git a/src/fiber/queue.cr b/src/fiber/queue.cr new file mode 100644 index 000000000000..9695ac8b6a83 --- /dev/null +++ b/src/fiber/queue.cr @@ -0,0 +1,83 @@ +# The queue is modeled after Go's `gQueue`, distributed under a BSD-like +# license: + +class Fiber + # :nodoc: + # + # Singly-linked list of `Fiber`. + # Last-in, first-out (LIFO) semantic. + # A fiber can only exist within a single `Queue` at any time. + # + # Unlike `Crystal::PointerLinkedList` doubly-linked list, this `Queue` is + # meant to maintain a queue of runnable fibers, or to quickly collect an + # arbitrary number of fibers. + # + # Thread unsafe! An external lock is required for concurrent accesses. + struct Queue + getter size : Int32 + + def initialize(@head : Fiber? = nil, @tail : Fiber? = nil, @size = 0) + end + + def push(fiber : Fiber) : Nil + fiber.schedlink = @head + @head = fiber + @tail = fiber if @tail.nil? + @size += 1 + end + + def bulk_unshift(queue : Queue*) : Nil + return unless last = queue.value.@tail + last.schedlink = nil + + if tail = @tail + tail.schedlink = queue.value.@head + else + @head = queue.value.@head + end + @tail = queue.value.@tail + + @size += queue.value.size + end + + @[AlwaysInline] + def pop : Fiber + pop { raise IndexError.new } + end + + @[AlwaysInline] + def pop? : Fiber? + pop { nil } + end + + private def pop(&) + if fiber = @head + @head = fiber.schedlink + @tail = nil if @head.nil? + @size -= 1 + fiber.schedlink = nil + fiber + else + yield + end + end + + @[AlwaysInline] + def empty? : Bool + @head == nil + end + + def clear + @size = 0 + @head = @tail = nil + end + + def each(&) : Nil + cursor = @head + while cursor + yield cursor + cursor = cursor.schedlink + end + end + end +end diff --git a/src/fiber/stack_pool.cr b/src/fiber/stack_pool.cr index 8f809335f46c..2dc1703e67aa 100644 --- a/src/fiber/stack_pool.cr +++ b/src/fiber/stack_pool.cr @@ -13,6 +13,7 @@ class Fiber # pointer value) rather than downwards, so *protect* must be false. def initialize(@protect : Bool = true) @deque = Deque(Void*).new + @lock = Crystal::SpinLock.new end def finalize @@ -25,7 +26,7 @@ class Fiber # returning memory to the operating system. def collect(count = lazy_size // 2) : Nil count.times do - if stack = @deque.shift? + if stack = @lock.sync { @deque.shift? } Crystal::System::Fiber.free_stack(stack, STACK_SIZE) else return @@ -42,7 +43,7 @@ class Fiber # Removes a stack from the bottom of the pool, or allocates a new one. def checkout : {Void*, Void*} - if stack = @deque.pop? + if !@deque.empty? && (stack = @lock.sync { @deque.pop? }) Crystal::System::Fiber.reset_stack(stack, STACK_SIZE, @protect) else stack = Crystal::System::Fiber.allocate_stack(STACK_SIZE, @protect) @@ -52,7 +53,7 @@ class Fiber # Appends a stack to the bottom of the pool. def release(stack) : Nil - @deque.push(stack) + @lock.sync { @deque.push(stack) } end # Returns the approximated size of the pool. It may be equal or slightly diff --git a/src/io/evented.cr b/src/io/evented.cr index 1f95d1870b0b..db601a83964f 100644 --- a/src/io/evented.cr +++ b/src/io/evented.cr @@ -20,7 +20,12 @@ module IO::Evented @read_timed_out = timed_out if reader = @readers.get?.try &.shift? - reader.enqueue + {% if flag?(:execution_context) && Crystal::EventLoop.has_constant?(:LibEvent) %} + event_loop = Crystal::EventLoop.current.as(Crystal::EventLoop::LibEvent) + event_loop.callback_enqueue(reader) + {% else %} + reader.enqueue + {% end %} end end @@ -29,7 +34,12 @@ module IO::Evented @write_timed_out = timed_out if writer = @writers.get?.try &.shift? - writer.enqueue + {% if flag?(:execution_context) && Crystal::EventLoop.has_constant?(:LibEvent) %} + event_loop = Crystal::EventLoop.current.as(Crystal::EventLoop::LibEvent) + event_loop.callback_enqueue(reader) + {% else %} + writer.enqueue + {% end %} end end @@ -89,11 +99,19 @@ module IO::Evented @write_event.consume_each &.free @readers.consume_each do |readers| - Crystal::Scheduler.enqueue readers + {% if flag?(:execution_context) %} + readers.each { |fiber| fiber.execution_context.enqueue fiber } + {% else %} + Crystal::Scheduler.enqueue readers + {% end %} end @writers.consume_each do |writers| - Crystal::Scheduler.enqueue writers + {% if flag?(:execution_context) %} + writers.each { |fiber| fiber.execution_context.enqueue fiber } + {% else %} + Crystal::Scheduler.enqueue writers + {% end %} end end diff --git a/src/kernel.cr b/src/kernel.cr index 34763b994839..c2af8771824e 100644 --- a/src/kernel.cr +++ b/src/kernel.cr @@ -608,7 +608,11 @@ end Exception::CallStack.load_debug_info if ENV["CRYSTAL_LOAD_DEBUG_INFO"]? == "1" Exception::CallStack.setup_crash_handler - Crystal::Scheduler.init + {% if flag?(:execution_context) %} + ExecutionContext.init_default_context + {% else %} + Crystal::Scheduler.init + {% end %} {% if flag?(:win32) %} Crystal::System::Process.start_interrupt_loop