diff --git a/.rubocop.yml b/.rubocop.yml index 87eb8c8..9fa1d8c 100644 --- a/.rubocop.yml +++ b/.rubocop.yml @@ -94,3 +94,6 @@ Security/MarshalLoad: Lint/EmptyBlock: Exclude: [spec/**/*.rb] + +Naming/MethodParameterName: + Exclude: [spec/**/*.rb] diff --git a/Readme.md b/Readme.md index 9bc5bdc..b4c80e8 100644 --- a/Readme.md +++ b/Readme.md @@ -4,7 +4,7 @@ Parallel [![Build Status](https://github.com/grosser/parallel/actions/workflows/actions.yml/badge.svg)](https://github.com/grosser/parallel/actions/workflows/actions.yml) -Run any code in parallel Processes(> use all CPUs) or Threads(> speedup blocking operations).
+Run any code in parallel Processes(> use all CPUs), Threads(> speedup blocking operations), or Ractors(> use all CPUs).
Best suited for map-reduce or e.g. parallel downloads/uploads. Install @@ -20,7 +20,7 @@ Usage ```Ruby # 2 CPUs -> work in 2 processes (a,b + c) results = Parallel.map(['a','b','c']) do |one_letter| - expensive_calculation(one_letter) + SomeClass.expensive_calculation(one_letter) end # 3 Processes -> finished after 1 run @@ -28,6 +28,9 @@ results = Parallel.map(['a','b','c'], in_processes: 3) { |one_letter| ... } # 3 Threads -> finished after 1 run results = Parallel.map(['a','b','c'], in_threads: 3) { |one_letter| ... } + +# 3 Ractors -> finished after 1 run +results = Parallel.map(['a','b','c'], in_ractors: 3, ractor: [SomeClass, :expensive_calculation]) ``` Same can be done with `each` @@ -67,6 +70,15 @@ Processes/Threads are workers, they grab the next piece of work when they finish - Variables can be shared/modified - No extra memory used +### Ractors + - Ruby 3.0+ only + - Speedup for blocking operations + - Variables cannot be shared/modified + - No extra memory used + - Very fast to spawn + - Experimental and unstable + - `start` and `finish` hooks are called on main thread + ### ActiveRecord #### Connection Lost diff --git a/lib/parallel.rb b/lib/parallel.rb index 932393d..c7f2bf6 100644 --- a/lib/parallel.rb +++ b/lib/parallel.rb @@ -264,6 +264,9 @@ def map(source, options = {}, &block) elsif options[:in_threads] method = :in_threads size = options[method] + elsif options[:in_ractors] + method = :in_ractors + size = options[method] else method = :in_processes if Process.respond_to?(:fork) @@ -285,6 +288,8 @@ def map(source, options = {}, &block) work_direct(job_factory, options, &block) elsif method == :in_threads work_in_threads(job_factory, options.merge(count: size), &block) + elsif method == :in_ractors + work_in_ractors(job_factory, options.merge(count: size), &block) else work_in_processes(job_factory, options.merge(count: size), &block) end @@ -382,6 +387,72 @@ def work_in_threads(job_factory, options, &block) exception || results end + def work_in_ractors(job_factory, options) + exception = nil + results = [] + results_mutex = Mutex.new # arrays are not thread-safe on jRuby + + callback = options[:ractor] + if block_given? || !callback + raise ArgumentError, "pass the code you want to execute as `ractor: [ClassName, :method_name]`" + end + + # build + ractors = Array.new(options.fetch(:count)) do + Ractor.new do + loop do + got = receive + (klass, method_name), item, index = got + break if index == :break + begin + Ractor.yield [nil, klass.send(method_name, item), item, index] + rescue StandardError => e + Ractor.yield [e, nil, item, index] + end + end + end + end + + # start + ractors.dup.each do |ractor| + if set = job_factory.next + item, index = set + instrument_start item, index, options + ractor.send [callback, item, index] + else + ractor.send([[nil, nil], nil, :break]) # stop the ractor + ractors.delete ractor + end + end + + # replace with new items + while set = job_factory.next + item_next, index_next = set + done, (exception, result, item, index) = Ractor.select(*ractors) + if exception + ractors.delete done + break + end + instrument_finish item, index, result, options + results_mutex.synchronize { results[index] = (options[:preserve_results] == false ? nil : result) } + + instrument_start item_next, index_next, options + done.send([callback, item_next, index_next]) + end + + # finish + ractors.each do |ractor| + (new_exception, result, item, index) = ractor.take + exception ||= new_exception + next if new_exception + instrument_finish item, index, result, options + results_mutex.synchronize { results[index] = (options[:preserve_results] == false ? nil : result) } + ractor.send([[nil, nil], nil, :break]) # stop the ractor + end + + exception || results + end + def work_in_processes(job_factory, options, &blk) workers = create_workers(job_factory, options, &blk) results = [] @@ -426,6 +497,7 @@ def work_in_processes(job_factory, options, &blk) end end end + exception || results end @@ -521,12 +593,20 @@ def call_with_index(item, index, options, &block) end def with_instrumentation(item, index, options) - on_start = options[:start] - on_finish = options[:finish] - options[:mutex].synchronize { on_start.call(item, index) } if on_start + instrument_start(item, index, options) result = yield - options[:mutex].synchronize { on_finish.call(item, index, result) } if on_finish + instrument_finish(item, index, result, options) result unless options[:preserve_results] == false end + + def instrument_finish(item, index, result, options) + return unless on_finish = options[:finish] + options[:mutex].synchronize { on_finish.call(item, index, result) } + end + + def instrument_start(item, index, options) + return unless on_start = options[:start] + options[:mutex].synchronize { on_start.call(item, index) } + end end end diff --git a/spec/cases/each_with_ar_sqlite.rb b/spec/cases/each_with_ar_sqlite.rb index 203e83a..d326144 100644 --- a/spec/cases/each_with_ar_sqlite.rb +++ b/spec/cases/each_with_ar_sqlite.rb @@ -16,6 +16,13 @@ class User < ActiveRecord::Base # rubocop:disable Lint/ConstantDefinitionInBlock end + class Callback # rubocop:disable Lint/ConstantDefinitionInBlock + def self.call(_) + $stdout.sync = true + puts "Parallel: #{User.all.map(&:name).join}" + end + end + # create tables unless User.table_exists? ActiveRecord::Schema.define(version: 1) do @@ -31,8 +38,10 @@ class User < ActiveRecord::Base # rubocop:disable Lint/ConstantDefinitionInBlock puts "Parent: #{User.first.name}" - Parallel.each([1], in_worker_type => 1) do - puts "Parallel (#{in_worker_type}): #{User.all.map(&:name).join}" + if in_worker_type == :in_ractors + Parallel.each([1], in_worker_type => 1, ractor: [Callback, :call]) + else + Parallel.each([1], in_worker_type => 1) { |x| Callback.call x } end puts "Parent: #{User.first.name}" diff --git a/spec/cases/map_with_ractor.rb b/spec/cases/map_with_ractor.rb new file mode 100644 index 0000000..cdb3997 --- /dev/null +++ b/spec/cases/map_with_ractor.rb @@ -0,0 +1,11 @@ +# frozen_string_literal: true +require './spec/cases/helper' + +class Callback + def self.call(arg) + "#{arg}x" + end +end + +result = Parallel.map(ENV['INPUT'].chars, in_ractors: Integer(ENV["COUNT"] || 2), ractor: [Callback, :call]) +print result * '' diff --git a/spec/cases/profile_memory.rb b/spec/cases/profile_memory.rb index 5ce77e5..89d13cf 100644 --- a/spec/cases/profile_memory.rb +++ b/spec/cases/profile_memory.rb @@ -11,6 +11,10 @@ def count_objects cur.map { |k, v| [k, v - old[k]] }.to_h.reject { |_k, v| v == 0 } end +class Callback + def self.call(x); end +end + require './spec/cases/helper' items = Array.new(1000) @@ -18,6 +22,15 @@ def count_objects # TODO: not sure why this fails without 2.times in threading mode :( -puts(count_objects { 2.times { Parallel.map(items, options) {} } }.inspect) +call = lambda do + if ARGV[0] == "ractors" + Parallel.map(items, options.merge(ractor: [Callback, :call])) + sleep 0.1 # ractors need a bit to shut down + else + Parallel.map(items, options) {} + end +end + +puts(count_objects { 2.times { call.call } }.inspect) -puts(count_objects { 2.times { Parallel.map(items, options) {} } }.inspect) +puts(count_objects { 2.times { call.call } }.inspect) diff --git a/spec/cases/with_break.rb b/spec/cases/with_break.rb index 33534e9..3ea9a27 100644 --- a/spec/cases/with_break.rb +++ b/spec/cases/with_break.rb @@ -6,11 +6,24 @@ in_worker_type = "in_#{ENV.fetch('WORKER_TYPE')}".to_sym worker_size = (ENV['WORKER_SIZE'] || 4).to_i -result = Parallel.public_send(method, 1..100, in_worker_type => worker_size) do |x| - sleep 0.1 # so all workers get started - print x - raise Parallel::Break, *ARGV if x == 1 - sleep 0.2 # so now no work gets queued before Parallel::Break is raised - x +ARGV.freeze # make ractor happy + +class Callback + def self.call(x) + $stdout.sync = true + sleep 0.1 # so all workers get started + print x + raise Parallel::Break, *ARGV if x == 1 + sleep 0.2 # so now no work gets queued before Parallel::Break is raised + x + end end + +options = { in_worker_type => worker_size } +result = + if in_worker_type == :in_ractors + Parallel.public_send(method, 1..10, options.merge(ractor: [Callback, :call])) + else + Parallel.public_send(method, 1..10, options) { |x| Callback.call x } + end print " Parallel::Break raised - result #{result.inspect}" diff --git a/spec/cases/with_break_before_finish.rb b/spec/cases/with_break_before_finish.rb index a303d45..c5764e7 100644 --- a/spec/cases/with_break_before_finish.rb +++ b/spec/cases/with_break_before_finish.rb @@ -3,18 +3,28 @@ method = ENV.fetch('METHOD') in_worker_type = "in_#{ENV.fetch('WORKER_TYPE')}".to_sym +$stdout.sync = true + +class Callback + def self.call(x) + $stdout.sync = true + sleep 0.1 # let workers start + raise Parallel::Break if x == 1 + sleep 0.2 + print x + x + end +end finish = lambda do |_item, _index, _result| sleep 0.1 print "finish hook called" end -Parallel.public_send(method, 1..100, in_worker_type => 4, finish: finish) do |x| - sleep 0.1 # let workers start - raise Parallel::Break if x == 1 - sleep 0.2 - print x - x +options = { in_worker_type => 4, finish: finish } +if in_worker_type == :in_ractors + Parallel.public_send(method, 1..10, options.merge(ractor: [Callback, :call])) +else + Parallel.public_send(method, 1..10, options) { |x| Callback.call x } end - print " Parallel::Break raised" diff --git a/spec/cases/with_exception.rb b/spec/cases/with_exception.rb index 781ca9d..ffecf53 100644 --- a/spec/cases/with_exception.rb +++ b/spec/cases/with_exception.rb @@ -6,14 +6,27 @@ in_worker_type = "in_#{ENV.fetch('WORKER_TYPE')}".to_sym worker_size = (ENV['WORKER_SIZE'] || 4).to_i -begin - Parallel.public_send(method, 1..100, in_worker_type => worker_size) do |x| +class ParallelTestError < StandardError +end + +class Callback + def self.call(x) + $stdout.sync = true sleep 0.1 # so all workers get started print x - raise 'foo' if x == 1 + raise ParallelTestError, 'foo' if x == 1 sleep 0.2 # so now no work gets queued before exception is raised x end -rescue StandardError +end + +begin + options = { in_worker_type => worker_size } + if in_worker_type == :in_ractors + Parallel.public_send(method, 1..100, options.merge(ractor: [Callback, :call])) + else + Parallel.public_send(method, 1..100, options) { |x| Callback.call x } + end +rescue ParallelTestError print ' raised' end diff --git a/spec/cases/with_exception_before_finish.rb b/spec/cases/with_exception_before_finish.rb index cbb3f2f..5bc429c 100644 --- a/spec/cases/with_exception_before_finish.rb +++ b/spec/cases/with_exception_before_finish.rb @@ -7,12 +7,9 @@ class ParallelTestError < StandardError end -begin - finish = lambda do |_item, _index, _result| - print " called" - end - - Parallel.public_send(method, 1..10, in_worker_type => 4, finish: finish) do |x| +class Callback + def self.call(x) + $stdout.sync = true if x != 3 sleep 0.2 raise ParallelTestError @@ -20,6 +17,19 @@ class ParallelTestError < StandardError print x x end +end + +begin + finish = lambda do |_item, _index, _result| + print " called" + end + + options = { in_worker_type => 4, finish: finish } + if in_worker_type == :in_ractors + Parallel.public_send(method, 1..10, options.merge(ractor: [Callback, :call])) + else + Parallel.public_send(method, 1..10, options) { |x| Callback.call x } + end rescue ParallelTestError nil end diff --git a/spec/cases/with_exception_in_finish.rb b/spec/cases/with_exception_in_finish.rb index 5c915a4..856b791 100644 --- a/spec/cases/with_exception_in_finish.rb +++ b/spec/cases/with_exception_in_finish.rb @@ -1,20 +1,33 @@ # frozen_string_literal: true require './spec/cases/helper' +$stdout.sync = true method = ENV.fetch('METHOD') in_worker_type = "in_#{ENV.fetch('WORKER_TYPE')}".to_sym -begin - finish = lambda do |x, _index, _result| - raise 'foo' if x == 1 - end +class ParallelTestError < StandardError +end - Parallel.public_send(method, 1..100, in_worker_type => 4, finish: finish) do |x| - sleep 0.1 # so all workers get started +class Callback + def self.call(x) + $stdout.sync = true print x - sleep 0.2 unless x == 1 # so now no work gets queued before exception is raised + sleep 0.2 # let everyone start and print + sleep 0.2 unless x == 1 # prevent other work from start/finish before exception x end -rescue StandardError +end + +begin + finish = lambda do |x, _index, _result| + raise ParallelTestError, 'foo' if x == 1 + end + options = { in_worker_type => 4, finish: finish } + if in_worker_type == :in_ractors + Parallel.public_send(method, 1..10, options.merge(ractor: [Callback, :call])) + else + Parallel.public_send(method, 1..10, options) { |x| Callback.call x } + end +rescue ParallelTestError print ' raised' end diff --git a/spec/cases/with_exception_in_start.rb b/spec/cases/with_exception_in_start.rb index bfa5fb3..6d49117 100644 --- a/spec/cases/with_exception_in_start.rb +++ b/spec/cases/with_exception_in_start.rb @@ -4,17 +4,31 @@ method = ENV.fetch('METHOD') in_worker_type = "in_#{ENV.fetch('WORKER_TYPE')}".to_sym -begin - start = lambda do |_item, _index| - @started = @started ? @started + 1 : 1 - raise 'foo' if @started == 4 - end +class ParallelTestError < StandardError +end - Parallel.public_send(method, 1..100, in_worker_type => 4, start: start) do |x| +class Callback + def self.call(x) + $stdout.sync = true print x sleep 0.2 # so now no work gets queued before exception is raised x end -rescue StandardError +end + +begin + start = lambda do |_item, _index| + @started = (@started ? @started + 1 : 1) + sleep 0.01 # a bit of time for ractors to work + raise ParallelTestError, 'foo' if @started == 4 + end + + options = { in_worker_type => 4, start: start } + if in_worker_type == :in_ractors + Parallel.public_send(method, 1..10, options.merge(ractor: [Callback, :call])) + else + Parallel.public_send(method, 1..10, options) { |x| Callback.call x } + end +rescue ParallelTestError print ' raised' end diff --git a/spec/cases/with_exception_in_start_before_finish.rb b/spec/cases/with_exception_in_start_before_finish.rb index ad1f7f2..7ff5a05 100644 --- a/spec/cases/with_exception_in_start_before_finish.rb +++ b/spec/cases/with_exception_in_start_before_finish.rb @@ -3,25 +3,38 @@ method = ENV.fetch('METHOD') in_worker_type = "in_#{ENV.fetch('WORKER_TYPE')}".to_sym +$stdout.sync = true class ParallelTestError < StandardError end +class Callback + def self.call(x) + $stdout.sync = true + puts "call #{x}" + x + end +end + begin - start = lambda do |item, _index| + start = lambda do |item, index| + puts "start #{index}" if item != 3 sleep 0.2 raise ParallelTestError end end - finish = lambda do |_item, _index, _result| - print " called" + finish = lambda do |_item, index, _result| + puts "finish #{index}" end - Parallel.public_send(method, 1..10, in_worker_type => 4, start: start, finish: finish) do |x| - print x - x + options = { in_worker_type => 4, start: start, finish: finish } + + if in_worker_type == :in_ractors + Parallel.public_send(method, 1..10, options.merge(ractor: [Callback, :call])) + else + Parallel.public_send(method, 1..10, options) { |x| Callback.call x } end rescue ParallelTestError nil diff --git a/spec/cases/with_lambda.rb b/spec/cases/with_lambda.rb index 4adc062..e752012 100644 --- a/spec/cases/with_lambda.rb +++ b/spec/cases/with_lambda.rb @@ -1,13 +1,20 @@ # frozen_string_literal: true require './spec/cases/helper' -type = case ARGV[0] - when "PROCESSES" then :in_processes - when "THREADS" then :in_threads - else - raise "Use PROCESSES or THREADS" -end - +$stdout.sync = true +type = :"in_#{ARGV.fetch(0)}" all = [3, 2, 1] produce = -> { all.pop || Parallel::Stop } -puts Parallel.map(produce, type => 2) { |(i, _id)| "ITEM-#{i}" } + +class Callback + def self.call(x) + $stdout.sync = true + "ITEM-#{x}" + end +end + +if type == :in_ractors + puts(Parallel.map(produce, type => 2, ractor: [Callback, :call])) +else + puts(Parallel.map(produce, type => 2) { |(i, _id)| Callback.call i }) +end diff --git a/spec/cases/with_queue.rb b/spec/cases/with_queue.rb index 84c9b7d..52a694e 100644 --- a/spec/cases/with_queue.rb +++ b/spec/cases/with_queue.rb @@ -1,11 +1,12 @@ # frozen_string_literal: true require './spec/cases/helper' -type = case ARGV[0] - when "PROCESSES" then :in_processes - when "THREADS" then :in_threads - else - raise "Use PROCESSES or THREADS" +type = :"in_#{ARGV.fetch(0)}" + +class Callback + def self.call(x) + "ITEM-#{x}" + end end queue = Queue.new @@ -16,4 +17,9 @@ queue.push 3 queue.push Parallel::Stop end -puts Parallel.map(queue, type => 2) { |(i, _id)| "ITEM-#{i}" } + +if type == :in_ractors + puts(Parallel.map(queue, type => 2, ractor: [Callback, :call])) +else + puts(Parallel.map(queue, type => 2) { |(i, _id)| Callback.call i }) +end diff --git a/spec/parallel_spec.rb b/spec/parallel_spec.rb index 5d09ca8..b4cdfcd 100644 --- a/spec/parallel_spec.rb +++ b/spec/parallel_spec.rb @@ -2,7 +2,9 @@ require 'spec_helper' describe Parallel do - worker_types = (Process.respond_to?(:fork) ? ["processes", "threads"] : ["threads"]) + worker_types = ["threads"] + worker_types << "processes" if Process.respond_to?(:fork) + worker_types << "ractors" if defined?(Ractor) def time_taken t = Time.now.to_f @@ -28,6 +30,10 @@ def execute_start_and_kill(command, amount, signal = 'INT') t.value end + def without_ractor_warning(out) + out.sub(/.*Ractor is experimental.*\n/, "") + end + describe ".processor_count" do before do Parallel.instance_variable_set(:@processor_count, nil) @@ -261,35 +267,58 @@ def cpus end worker_types.each do |type| - it "stops all workers when one fails in #{type}" do - `METHOD=map WORKER_TYPE=#{type} ruby spec/cases/with_exception.rb 2>&1`.should =~ /^\d{4} raised$/ + it "does not queue new work when one fails in #{type}" do + out = `METHOD=map WORKER_TYPE=#{type} ruby spec/cases/with_exception.rb 2>&1` + without_ractor_warning(out).should =~ /\A\d{4} raised\z/ end - it "stops all workers when one raises Break in #{type}" do + it "does not queue new work when one raises Break in #{type}" do out = `METHOD=map WORKER_TYPE=#{type} ruby spec/cases/with_break.rb 2>&1` - out.should =~ /^\d{4} Parallel::Break raised - result nil$/ + without_ractor_warning(out).should =~ /\A\d{4} Parallel::Break raised - result nil\z/ end it "stops all workers when a start hook fails with #{type}" do - `METHOD=map WORKER_TYPE=#{type} ruby spec/cases/with_exception_in_start.rb 2>&1`.should =~ /^\d{3} raised$/ + out = `METHOD=map WORKER_TYPE=#{type} ruby spec/cases/with_exception_in_start.rb 2>&1` + out = without_ractor_warning(out) + if type == "ractors" + # TODO: running ractors should be interrupted + out.should =~ /\A.*raised.*\z/ + out.should_not =~ /5/ # stopped at 4 + else + out.should =~ /\A\d{3} raised\z/ + end end - it "stops all workers when a finish hook fails with #{type}" do - `METHOD=map WORKER_TYPE=#{type} ruby spec/cases/with_exception_in_finish.rb 2>&1`.should =~ /^\d{4} raised$/ + it "does not add new work when a finish hook fails with #{type}" do + out = `METHOD=map WORKER_TYPE=#{type} ruby spec/cases/with_exception_in_finish.rb 2>&1` + without_ractor_warning(out).should =~ /\A\d{4} raised\z/ end it "does not call the finish hook when a worker fails with #{type}" do - `METHOD=map WORKER_TYPE=#{type} ruby spec/cases/with_exception_before_finish.rb 2>&1`.should == '3 called' + out = `METHOD=map WORKER_TYPE=#{type} ruby spec/cases/with_exception_before_finish.rb 2>&1` + without_ractor_warning(out).should == '3 called' end it "does not call the finish hook when a worker raises Break in #{type}" do out = `METHOD=map WORKER_TYPE=#{type} ruby spec/cases/with_break_before_finish.rb 2>&1` - out.should =~ /^\d{3}(finish hook called){3} Parallel::Break raised$/ + without_ractor_warning(out).should =~ /\A\d{3}(finish hook called){3} Parallel::Break raised\z/ end it "does not call the finish hook when a start hook fails with #{type}" do out = `METHOD=map WORKER_TYPE=#{type} ruby spec/cases/with_exception_in_start_before_finish.rb 2>&1` - out.should == '3 called' + if type == "ractors" + # we are calling on the main thread, so everything sleeps + without_ractor_warning(out).should == "start 0\n" + else + out.split("\n").sort.join("\n").should == <<~OUT.rstrip + call 3 + finish 2 + start 0 + start 1 + start 2 + start 3 + OUT + end end it "can return from break with #{type}" do @@ -298,29 +327,26 @@ def cpus end it "sets Parallel.worker_number with 4 #{type}" do + skip if type == "ractors" # not supported out = `METHOD=map WORKER_TYPE=#{type} ruby spec/cases/with_worker_number.rb 2>&1` out.should =~ /\A[0123]+\z/ ['0', '1', '2', '3'].each { |number| out.should include number } end it "sets Parallel.worker_number with 0 #{type}" do + skip if type == "ractors" # not supported type_key = "in_#{type}".to_sym result = Parallel.map([1, 2, 3, 4, 5, 6, 7, 8, 9], type_key => 0) { |_x| Parallel.worker_number } result.uniq.should == [0] Parallel.worker_number.should be_nil end - end - it "can run with 0 threads" do - Thread.should_not_receive(:exclusive) - result = Parallel.map([1, 2, 3, 4, 5, 6, 7, 8, 9], in_threads: 0) { |x| x + 2 } - result.should == [3, 4, 5, 6, 7, 8, 9, 10, 11] - end - - it "can run with 0 processes" do - Process.should_not_receive(:fork) - result = Parallel.map([1, 2, 3, 4, 5, 6, 7, 8, 9], in_processes: 0) { |x| x + 2 } - result.should == [3, 4, 5, 6, 7, 8, 9, 10, 11] + it "can run with 0" do + Thread.should_not_receive(:exclusive) + Process.should_not_receive(:fork) + result = Parallel.map([1, 2, 3, 4, 5, 6, 7, 8, 9], "in_#{type}".to_sym => 0) { |x| x + 2 } + result.should == [3, 4, 5, 6, 7, 8, 9, 10, 11] + end end it "notifies when an item of work is dispatched to a worker process" do @@ -569,10 +595,10 @@ def cpus worker_types.each do |type| it "works with SQLite in #{type}" do - `WORKER_TYPE=#{type} ruby spec/cases/each_with_ar_sqlite.rb 2>&1`.gsub( - /.* deprecated; use BigDecimal.*\n/, - '' - ).should == "Parent: X\nParallel (in_#{type}): XXX\nParent: X\n" + out = `WORKER_TYPE=#{type} ruby spec/cases/each_with_ar_sqlite.rb 2>&1` + out.gsub!(/.* deprecated; use BigDecimal.*\n/, '') + skip "unsupported" if type == "ractors" + without_ractor_warning(out).should == "Parent: X\nParallel: XXX\nParent: X\n" end it "stops all workers when one fails in #{type}" do @@ -581,19 +607,22 @@ def cpus it "stops all workers when one raises Break in #{type}" do out = `METHOD=each WORKER_TYPE=#{type} ruby spec/cases/with_break.rb 2>&1` - out.should =~ /^\d{4} Parallel::Break raised - result nil$/ + without_ractor_warning(out).should =~ /^\d{4} Parallel::Break raised - result nil$/ end it "stops all workers when a start hook fails with #{type}" do - `METHOD=each WORKER_TYPE=#{type} ruby spec/cases/with_exception_in_start.rb 2>&1`.should =~ /^\d{3} raised$/ + out = `METHOD=each WORKER_TYPE=#{type} ruby spec/cases/with_exception_in_start.rb 2>&1` + without_ractor_warning(out).should =~ /^\d{3} raised$/ end - it 'stops all workers when a finish hook fails with processes' do - `METHOD=each WORKER_TYPE=#{type} ruby spec/cases/with_exception_in_finish.rb 2>&1`.should =~ /^\d{4} raised$/ + it "does not add new work when a finish hook fails with #{type}" do + out = `METHOD=each WORKER_TYPE=#{type} ruby spec/cases/with_exception_in_finish.rb 2>&1` + without_ractor_warning(out).should =~ /^\d{4} raised$/ end it "does not call the finish hook when a worker fails with #{type}" do - `METHOD=each WORKER_TYPE=#{type} ruby spec/cases/with_exception_before_finish.rb 2>&1`.should == '3 called' + out = `METHOD=each WORKER_TYPE=#{type} ruby spec/cases/with_exception_before_finish.rb 2>&1` + without_ractor_warning(out).should == '3 called' end it "does not call the finish hook when a worker raises Break in #{type}" do @@ -603,10 +632,23 @@ def cpus it "does not call the finish hook when a start hook fails with #{type}" do out = `METHOD=each WORKER_TYPE=#{type} ruby spec/cases/with_exception_in_start_before_finish.rb 2>&1` - out.should == '3 called' + if type == "ractors" + # we are calling on the main thread, so everything sleeps + without_ractor_warning(out).should == "start 0\n" + else + out.split("\n").sort.join("\n").should == <<~OUT.rstrip + call 3 + finish 2 + start 0 + start 1 + start 2 + start 3 + OUT + end end it "sets Parallel.worker_number with #{type}" do + skip "unsupported" if type == "ractors" out = `METHOD=each WORKER_TYPE=#{type} ruby spec/cases/with_worker_number.rb 2>&1` out.should =~ /\A[0123]+\z/ ['0', '1', '2', '3'].each { |number| out.should include number } @@ -652,15 +694,14 @@ def cpus end ["lambda", "queue"].each do |thing| - describe "lambdas" do + describe thing do let(:result) { "ITEM-1\nITEM-2\nITEM-3\n" } - it "runs in threads" do - ruby("spec/cases/with_#{thing}.rb THREADS 2>&1").should == result - end - - it "runs in processs" do - ruby("spec/cases/with_#{thing}.rb PROCESSES 2>&1").should == result + worker_types.each do |type| + it "runs in #{type}" do + out = ruby("spec/cases/with_#{thing}.rb #{type} 2>&1") + without_ractor_warning(out).should == result + end end it "refuses to use progress" do