Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
grosser committed Mar 21, 2022
1 parent e73b37d commit b80d60c
Show file tree
Hide file tree
Showing 16 changed files with 375 additions and 107 deletions.
3 changes: 3 additions & 0 deletions .rubocop.yml
Original file line number Diff line number Diff line change
Expand Up @@ -94,3 +94,6 @@ Security/MarshalLoad:

Lint/EmptyBlock:
Exclude: [spec/**/*.rb]

Naming/MethodParameterName:
Exclude: [spec/**/*.rb]
16 changes: 14 additions & 2 deletions Readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ Parallel
[![Build Status](https://github.com/grosser/parallel/actions/workflows/actions.yml/badge.svg)](https://github.com/grosser/parallel/actions/workflows/actions.yml)


Run any code in parallel Processes(> use all CPUs) or Threads(> speedup blocking operations).<br/>
Run any code in parallel Processes(> use all CPUs), Threads(> speedup blocking operations), or Ractors(> use all CPUs).<br/>
Best suited for map-reduce or e.g. parallel downloads/uploads.

Install
Expand All @@ -20,14 +20,17 @@ Usage
```Ruby
# 2 CPUs -> work in 2 processes (a,b + c)
results = Parallel.map(['a','b','c']) do |one_letter|
expensive_calculation(one_letter)
SomeClass.expensive_calculation(one_letter)
end

# 3 Processes -> finished after 1 run
results = Parallel.map(['a','b','c'], in_processes: 3) { |one_letter| ... }

# 3 Threads -> finished after 1 run
results = Parallel.map(['a','b','c'], in_threads: 3) { |one_letter| ... }

# 3 Ractors -> finished after 1 run
results = Parallel.map(['a','b','c'], in_ractors: 3, ractor: [SomeClass, :expensive_calculation])
```

Same can be done with `each`
Expand Down Expand Up @@ -67,6 +70,15 @@ Processes/Threads are workers, they grab the next piece of work when they finish
- Variables can be shared/modified
- No extra memory used

### Ractors
- Ruby 3.0+ only
- Speedup for blocking operations
- Variables cannot be shared/modified
- No extra memory used
- Very fast to spawn
- Experimental and unstable
- `start` and `finish` hooks are called on main thread

### ActiveRecord

#### Connection Lost
Expand Down
88 changes: 84 additions & 4 deletions lib/parallel.rb
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,9 @@ def map(source, options = {}, &block)
elsif options[:in_threads]
method = :in_threads
size = options[method]
elsif options[:in_ractors]
method = :in_ractors
size = options[method]
else
method = :in_processes
if Process.respond_to?(:fork)
Expand All @@ -285,6 +288,8 @@ def map(source, options = {}, &block)
work_direct(job_factory, options, &block)
elsif method == :in_threads
work_in_threads(job_factory, options.merge(count: size), &block)
elsif method == :in_ractors
work_in_ractors(job_factory, options.merge(count: size), &block)
else
work_in_processes(job_factory, options.merge(count: size), &block)
end
Expand Down Expand Up @@ -382,6 +387,72 @@ def work_in_threads(job_factory, options, &block)
exception || results
end

def work_in_ractors(job_factory, options)
exception = nil
results = []
results_mutex = Mutex.new # arrays are not thread-safe on jRuby

callback = options[:ractor]
if block_given? || !callback
raise ArgumentError, "pass the code you want to execute as `ractor: [ClassName, :method_name]`"
end

# build
ractors = Array.new(options.fetch(:count)) do
Ractor.new do
loop do
got = receive
(klass, method_name), item, index = got
break if index == :break
begin
Ractor.yield [nil, klass.send(method_name, item), item, index]
rescue StandardError => e
Ractor.yield [e, nil, item, index]
end
end
end
end

# start
ractors.dup.each do |ractor|
if set = job_factory.next
item, index = set
instrument_start item, index, options
ractor.send [callback, item, index]
else
ractor.send([[nil, nil], nil, :break]) # stop the ractor
ractors.delete ractor
end
end

# replace with new items
while set = job_factory.next
item_next, index_next = set
done, (exception, result, item, index) = Ractor.select(*ractors)
if exception
ractors.delete done
break
end
instrument_finish item, index, result, options
results_mutex.synchronize { results[index] = (options[:preserve_results] == false ? nil : result) }

instrument_start item_next, index_next, options
done.send([callback, item_next, index_next])
end

# finish
ractors.each do |ractor|
(new_exception, result, item, index) = ractor.take
exception ||= new_exception
next if new_exception
instrument_finish item, index, result, options
results_mutex.synchronize { results[index] = (options[:preserve_results] == false ? nil : result) }
ractor.send([[nil, nil], nil, :break]) # stop the ractor
end

exception || results
end

def work_in_processes(job_factory, options, &blk)
workers = create_workers(job_factory, options, &blk)
results = []
Expand Down Expand Up @@ -426,6 +497,7 @@ def work_in_processes(job_factory, options, &blk)
end
end
end

exception || results
end

Expand Down Expand Up @@ -521,12 +593,20 @@ def call_with_index(item, index, options, &block)
end

def with_instrumentation(item, index, options)
on_start = options[:start]
on_finish = options[:finish]
options[:mutex].synchronize { on_start.call(item, index) } if on_start
instrument_start(item, index, options)
result = yield
options[:mutex].synchronize { on_finish.call(item, index, result) } if on_finish
instrument_finish(item, index, result, options)
result unless options[:preserve_results] == false
end

def instrument_finish(item, index, result, options)
return unless on_finish = options[:finish]
options[:mutex].synchronize { on_finish.call(item, index, result) }
end

def instrument_start(item, index, options)
return unless on_start = options[:start]
options[:mutex].synchronize { on_start.call(item, index) }
end
end
end
13 changes: 11 additions & 2 deletions spec/cases/each_with_ar_sqlite.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,13 @@
class User < ActiveRecord::Base # rubocop:disable Lint/ConstantDefinitionInBlock
end

class Callback # rubocop:disable Lint/ConstantDefinitionInBlock
def self.call(_)
$stdout.sync = true
puts "Parallel: #{User.all.map(&:name).join}"
end
end

# create tables
unless User.table_exists?
ActiveRecord::Schema.define(version: 1) do
Expand All @@ -31,8 +38,10 @@ class User < ActiveRecord::Base # rubocop:disable Lint/ConstantDefinitionInBlock

puts "Parent: #{User.first.name}"

Parallel.each([1], in_worker_type => 1) do
puts "Parallel (#{in_worker_type}): #{User.all.map(&:name).join}"
if in_worker_type == :in_ractors
Parallel.each([1], in_worker_type => 1, ractor: [Callback, :call])
else
Parallel.each([1], in_worker_type => 1) { |x| Callback.call x }
end

puts "Parent: #{User.first.name}"
Expand Down
11 changes: 11 additions & 0 deletions spec/cases/map_with_ractor.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# frozen_string_literal: true
require './spec/cases/helper'

class Callback
def self.call(arg)
"#{arg}x"
end
end

result = Parallel.map(ENV['INPUT'].chars, in_ractors: Integer(ENV["COUNT"] || 2), ractor: [Callback, :call])
print result * ''
17 changes: 15 additions & 2 deletions spec/cases/profile_memory.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,26 @@ def count_objects
cur.map { |k, v| [k, v - old[k]] }.to_h.reject { |_k, v| v == 0 }
end

class Callback
def self.call(x); end
end

require './spec/cases/helper'

items = Array.new(1000)
options = { "in_#{ARGV[0]}".to_sym => 2 }

# TODO: not sure why this fails without 2.times in threading mode :(

puts(count_objects { 2.times { Parallel.map(items, options) {} } }.inspect)
call = lambda do
if ARGV[0] == "ractors"
Parallel.map(items, options.merge(ractor: [Callback, :call]))
sleep 0.1 # ractors need a bit to shut down
else
Parallel.map(items, options) {}
end
end

puts(count_objects { 2.times { call.call } }.inspect)

puts(count_objects { 2.times { Parallel.map(items, options) {} } }.inspect)
puts(count_objects { 2.times { call.call } }.inspect)
25 changes: 19 additions & 6 deletions spec/cases/with_break.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,24 @@
in_worker_type = "in_#{ENV.fetch('WORKER_TYPE')}".to_sym
worker_size = (ENV['WORKER_SIZE'] || 4).to_i

result = Parallel.public_send(method, 1..100, in_worker_type => worker_size) do |x|
sleep 0.1 # so all workers get started
print x
raise Parallel::Break, *ARGV if x == 1
sleep 0.2 # so now no work gets queued before Parallel::Break is raised
x
ARGV.freeze # make ractor happy

class Callback
def self.call(x)
$stdout.sync = true
sleep 0.1 # so all workers get started
print x
raise Parallel::Break, *ARGV if x == 1
sleep 0.2 # so now no work gets queued before Parallel::Break is raised
x
end
end

options = { in_worker_type => worker_size }
result =
if in_worker_type == :in_ractors
Parallel.public_send(method, 1..10, options.merge(ractor: [Callback, :call]))
else
Parallel.public_send(method, 1..10, options) { |x| Callback.call x }
end
print " Parallel::Break raised - result #{result.inspect}"
24 changes: 17 additions & 7 deletions spec/cases/with_break_before_finish.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,28 @@

method = ENV.fetch('METHOD')
in_worker_type = "in_#{ENV.fetch('WORKER_TYPE')}".to_sym
$stdout.sync = true

class Callback
def self.call(x)
$stdout.sync = true
sleep 0.1 # let workers start
raise Parallel::Break if x == 1
sleep 0.2
print x
x
end
end

finish = lambda do |_item, _index, _result|
sleep 0.1
print "finish hook called"
end

Parallel.public_send(method, 1..100, in_worker_type => 4, finish: finish) do |x|
sleep 0.1 # let workers start
raise Parallel::Break if x == 1
sleep 0.2
print x
x
options = { in_worker_type => 4, finish: finish }
if in_worker_type == :in_ractors
Parallel.public_send(method, 1..10, options.merge(ractor: [Callback, :call]))
else
Parallel.public_send(method, 1..10, options) { |x| Callback.call x }
end

print " Parallel::Break raised"
21 changes: 17 additions & 4 deletions spec/cases/with_exception.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,27 @@
in_worker_type = "in_#{ENV.fetch('WORKER_TYPE')}".to_sym
worker_size = (ENV['WORKER_SIZE'] || 4).to_i

begin
Parallel.public_send(method, 1..100, in_worker_type => worker_size) do |x|
class ParallelTestError < StandardError
end

class Callback
def self.call(x)
$stdout.sync = true
sleep 0.1 # so all workers get started
print x
raise 'foo' if x == 1
raise ParallelTestError, 'foo' if x == 1
sleep 0.2 # so now no work gets queued before exception is raised
x
end
rescue StandardError
end

begin
options = { in_worker_type => worker_size }
if in_worker_type == :in_ractors
Parallel.public_send(method, 1..100, options.merge(ractor: [Callback, :call]))
else
Parallel.public_send(method, 1..100, options) { |x| Callback.call x }
end
rescue ParallelTestError
print ' raised'
end
22 changes: 16 additions & 6 deletions spec/cases/with_exception_before_finish.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,29 @@
class ParallelTestError < StandardError
end

begin
finish = lambda do |_item, _index, _result|
print " called"
end

Parallel.public_send(method, 1..10, in_worker_type => 4, finish: finish) do |x|
class Callback
def self.call(x)
$stdout.sync = true
if x != 3
sleep 0.2
raise ParallelTestError
end
print x
x
end
end

begin
finish = lambda do |_item, _index, _result|
print " called"
end

options = { in_worker_type => 4, finish: finish }
if in_worker_type == :in_ractors
Parallel.public_send(method, 1..10, options.merge(ractor: [Callback, :call]))
else
Parallel.public_send(method, 1..10, options) { |x| Callback.call x }
end
rescue ParallelTestError
nil
end
Loading

0 comments on commit b80d60c

Please sign in to comment.