-
Notifications
You must be signed in to change notification settings - Fork 254
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add :finish_in_order option #339
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -641,9 +641,33 @@ def with_instrumentation(item, index, options) | |
|
||
def instrument_finish(item, index, result, options) | ||
return unless on_finish = options[:finish] | ||
return instrument_finish_in_order(item, index, result, options) if options[:finish_in_order] | ||
options[:mutex].synchronize { on_finish.call(item, index, result) } | ||
end | ||
|
||
def instrument_finish_in_order(item, index, result, options) | ||
options[:mutex].synchronize do | ||
options[:finish_items] ||= [] | ||
options[:finish_items_done] ||= [] | ||
options[:finish_index] ||= 0 | ||
if index == options[:finish_index] | ||
# call finish for current item and any ready items | ||
options[:finish].call(item, index, result) | ||
options[:finish_index] += 1 | ||
Comment on lines
+655
to
+656
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can do this in the loop to not have the code twice ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To do it in the loop, you would have to store the item first, e.g. something like this: def instrument_finish_in_order(item, index, result, options)
options[:mutex].synchronize do
options[:finish_items] ||= []
options[:finish_items_done] ||= []
options[:finish_index] ||= 0
options[:finish_items][index] = item
options[:finish_items_done][index] = true
break unless index == options[:finish_index]
index.upto(options[:finish_items].size).each do |old_index|
break unless options[:finish_items_done][old_index]
old_item = options[:finish_items][old_index]
options[:finish].call(old_item, old_index, result)
options[:finish_index] += 1
end
end
end Let me know if you prefer this version and I'll update the PR |
||
(index + 1).upto(options[:finish_items].size).each do |old_index| | ||
break unless options[:finish_items_done][old_index] | ||
old_item = options[:finish_items][old_index] | ||
options[:finish].call(old_item, old_index, result) | ||
options[:finish_index] += 1 | ||
end | ||
else | ||
# store for later | ||
options[:finish_items][index] = item | ||
options[:finish_items_done][index] = true | ||
end | ||
end | ||
end | ||
|
||
def instrument_start(item, index, options) | ||
return unless on_start = options[:start] | ||
options[:mutex].synchronize { on_start.call(item, index) } | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,22 @@ | ||
# frozen_string_literal: true | ||
|
||
require './spec/cases/helper' | ||
|
||
class Callback | ||
def self.call(_item) | ||
sleep rand * 0.01 | ||
end | ||
end | ||
|
||
method = ENV.fetch('METHOD') | ||
in_worker_type = "in_#{ENV.fetch('WORKER_TYPE')}".to_sym | ||
$stdout.sync = true | ||
|
||
items = 1..9 | ||
finish = ->(item, _index, _result) { puts "finish #{item}" } | ||
options = { in_worker_type => 4, finish: finish, finish_in_order: true } | ||
if in_worker_type == :in_ractors | ||
Parallel.public_send(method, items, options.merge(ractor: [Callback, :call])) | ||
else | ||
Parallel.public_send(method, items, options) { |item| Callback.call(item) } | ||
end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can these be local variables ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can't use local variables, as they need to maintain the state between invocations.
As they are being called via a class method, it can't use instance variables either.
The options are dup'ed so works even when invoked multiple times with the same options