Skip to content

Commit

Permalink
Merge pull request #339 from shaicoleman/finish-in-order
Browse files Browse the repository at this point in the history
Add :finish_in_order option
  • Loading branch information
grosser authored Dec 14, 2023
2 parents 449bb55 + d8a7bcb commit 6cd55df
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 0 deletions.
6 changes: 6 additions & 0 deletions Readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,12 @@ They are called on the main process and protected with a mutex.
Parallel.map(1..100, finish: -> (item, i, result) { ... do something ... }) { sleep 1 }
```

Set `finish_in_order: true` to ensure the `:finish` hooks get called in the order of the items.

```Ruby
Parallel.map(1..9, finish: -> (item, i, result) { puts "#{item} ok" }, finish_in_order: true) { sleep rand }
```

_NOTE: If all you are trying to do is get the index, it is much more performant to use `each_with_index` instead._

### Worker number
Expand Down
24 changes: 24 additions & 0 deletions lib/parallel.rb
Original file line number Diff line number Diff line change
Expand Up @@ -641,9 +641,33 @@ def with_instrumentation(item, index, options)

def instrument_finish(item, index, result, options)
return unless on_finish = options[:finish]
return instrument_finish_in_order(item, index, result, options) if options[:finish_in_order]
options[:mutex].synchronize { on_finish.call(item, index, result) }
end

def instrument_finish_in_order(item, index, result, options)
options[:mutex].synchronize do
options[:finish_items] ||= []
options[:finish_items_done] ||= []
options[:finish_index] ||= 0
if index == options[:finish_index]
# call finish for current item and any ready items
options[:finish].call(item, index, result)
options[:finish_index] += 1
(index + 1).upto(options[:finish_items].size).each do |old_index|
break unless options[:finish_items_done][old_index]
old_item = options[:finish_items][old_index]
options[:finish].call(old_item, old_index, result)
options[:finish_index] += 1
end
else
# store for later
options[:finish_items][index] = item
options[:finish_items_done][index] = true
end
end
end

def instrument_start(item, index, options)
return unless on_start = options[:start]
options[:mutex].synchronize { on_start.call(item, index) }
Expand Down
22 changes: 22 additions & 0 deletions spec/cases/finish_in_order.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# frozen_string_literal: true

require './spec/cases/helper'

class Callback
def self.call(_item)
sleep rand * 0.01
end
end

method = ENV.fetch('METHOD')
in_worker_type = "in_#{ENV.fetch('WORKER_TYPE')}".to_sym
$stdout.sync = true

items = 1..9
finish = ->(item, _index, _result) { puts "finish #{item}" }
options = { in_worker_type => 4, finish: finish, finish_in_order: true }
if in_worker_type == :in_ractors
Parallel.public_send(method, items, options.merge(ractor: [Callback, :call]))
else
Parallel.public_send(method, items, options) { |item| Callback.call(item) }
end
5 changes: 5 additions & 0 deletions spec/parallel_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -653,6 +653,11 @@ def cpus
end
end

it "calls finish hook with finish_in_order: true" do
out = `METHOD=each WORKER_TYPE=#{type} ruby spec/cases/finish_in_order.rb 2>&1`
without_ractor_warning(out).should == (1..9).map { |item| "finish #{item}\n" }.join
end

it "sets Parallel.worker_number with #{type}" do
skip "unsupported" if type == "ractors"
out = `METHOD=each WORKER_TYPE=#{type} ruby spec/cases/with_worker_number.rb 2>&1`
Expand Down

0 comments on commit 6cd55df

Please sign in to comment.