diff --git a/Readme.md b/Readme.md index bc0b58f..5ceb2ef 100644 --- a/Readme.md +++ b/Readme.md @@ -158,6 +158,12 @@ They are called on the main process and protected with a mutex. Parallel.map(1..100, finish: -> (item, i, result) { ... do something ... }) { sleep 1 } ``` +Set `finish_in_order: true` to ensure the `:finish` hooks get called in the order of the items. + +```Ruby +Parallel.map(1..9, finish: -> (item, i, result) { puts "#{item} ok" }, finish_in_order: true) { sleep rand } +``` + _NOTE: If all you are trying to do is get the index, it is much more performant to use `each_with_index` instead._ ### Worker number diff --git a/lib/parallel.rb b/lib/parallel.rb index e6cec7b..ece6c09 100644 --- a/lib/parallel.rb +++ b/lib/parallel.rb @@ -641,9 +641,33 @@ def with_instrumentation(item, index, options) def instrument_finish(item, index, result, options) return unless on_finish = options[:finish] + return instrument_finish_in_order(item, index, result, options) if options[:finish_in_order] options[:mutex].synchronize { on_finish.call(item, index, result) } end + def instrument_finish_in_order(item, index, result, options) + options[:mutex].synchronize do + options[:finish_items] ||= [] + options[:finish_items_done] ||= [] + options[:finish_index] ||= 0 + if index == options[:finish_index] + # call finish for current item and any ready items + options[:finish].call(item, index, result) + options[:finish_index] += 1 + (index + 1).upto(options[:finish_items].size).each do |old_index| + break unless options[:finish_items_done][old_index] + old_item = options[:finish_items][old_index] + options[:finish].call(old_item, old_index, result) + options[:finish_index] += 1 + end + else + # store for later + options[:finish_items][index] = item + options[:finish_items_done][index] = true + end + end + end + def instrument_start(item, index, options) return unless on_start = options[:start] options[:mutex].synchronize { on_start.call(item, index) } diff --git a/spec/cases/finish_in_order.rb b/spec/cases/finish_in_order.rb new file mode 100644 index 0000000..7ed418d --- /dev/null +++ b/spec/cases/finish_in_order.rb @@ -0,0 +1,22 @@ +# frozen_string_literal: true + +require './spec/cases/helper' + +class Callback + def self.call(_item) + sleep rand * 0.01 + end +end + +method = ENV.fetch('METHOD') +in_worker_type = "in_#{ENV.fetch('WORKER_TYPE')}".to_sym +$stdout.sync = true + +items = 1..9 +finish = ->(item, _index, _result) { puts "finish #{item}" } +options = { in_worker_type => 4, finish: finish, finish_in_order: true } +if in_worker_type == :in_ractors + Parallel.public_send(method, items, options.merge(ractor: [Callback, :call])) +else + Parallel.public_send(method, items, options) { |item| Callback.call(item) } +end diff --git a/spec/parallel_spec.rb b/spec/parallel_spec.rb index 642fffd..400283e 100644 --- a/spec/parallel_spec.rb +++ b/spec/parallel_spec.rb @@ -653,6 +653,11 @@ def cpus end end + it "calls finish hook with finish_in_order: true" do + out = `METHOD=each WORKER_TYPE=#{type} ruby spec/cases/finish_in_order.rb 2>&1` + without_ractor_warning(out).should == (1..9).map { |item| "finish #{item}\n" }.join + end + it "sets Parallel.worker_number with #{type}" do skip "unsupported" if type == "ractors" out = `METHOD=each WORKER_TYPE=#{type} ruby spec/cases/with_worker_number.rb 2>&1`