Skip to content

Commit

Permalink
Fix Set thread safety
Browse files Browse the repository at this point in the history
Co-authored-by: Mike Dalessio <[email protected]>
Co-authored-by: Farid Zakaria <[email protected]>
  • Loading branch information
3 people committed Jun 4, 2021
1 parent 382550c commit dd2d8cf
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 7 deletions.
20 changes: 14 additions & 6 deletions lib/concurrent-ruby/concurrent/set.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,27 @@ module Concurrent
#
# @see http://ruby-doc.org/stdlib-2.4.0/libdoc/set/rdoc/Set.html Ruby standard library `Set`


# @!macro internal_implementation_note
SetImplementation = case
when Concurrent.on_cruby?
# Because MRI never runs code in parallel, the existing
# non-thread-safe structures should usually work fine.
::Set
# The CRuby implementation of Set is written in Ruby itself and is
# not thread safe for certain methods.
require 'monitor'
require 'concurrent/thread_safe/util/data_structures'

class CRubySet < ::Set
end

ThreadSafe::Util.make_synchronized_on_cruby CRubySet
CRubySet

when Concurrent.on_jruby?
require 'jruby/synchronized'

class JRubySet < ::Set
include JRuby::Synchronized
end

JRubySet

when Concurrent.on_rbx?
Expand All @@ -41,7 +48,8 @@ class JRubySet < ::Set

class RbxSet < ::Set
end
ThreadSafe::Util.make_synchronized_on_rbx Concurrent::RbxSet

ThreadSafe::Util.make_synchronized_on_rbx RbxSet
RbxSet

when Concurrent.on_truffleruby?
Expand All @@ -50,7 +58,7 @@ class RbxSet < ::Set
class TruffleRubySet < ::Set
end

ThreadSafe::Util.make_synchronized_on_truffleruby Concurrent::TruffleRubySet
ThreadSafe::Util.make_synchronized_on_truffleruby TruffleRubySet
TruffleRubySet

else
Expand Down
25 changes: 25 additions & 0 deletions lib/concurrent-ruby/concurrent/thread_safe/util/data_structures.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,31 @@ def self.synchronized(object, &block)
module Concurrent
module ThreadSafe
module Util
def self.make_synchronized_on_cruby(klass)
klass.class_eval do
def initialize(*args, &block)
@_monitor = Monitor.new
super
end

def initialize_copy(other)
# make sure a copy is not sharing a monitor with the original object!
@_monitor = Monitor.new
super
end
end

klass.superclass.instance_methods(false).each do |method|
klass.class_eval <<-RUBY, __FILE__, __LINE__ + 1
def #{method}(*args)
monitor = @_monitor
monitor or raise("BUG: Internal monitor was not properly initialized. Please report this to the concurrent-ruby developers.")
monitor.synchronize { super }
end
RUBY
end
end

def self.make_synchronized_on_rbx(klass)
klass.class_eval do
private
Expand Down
59 changes: 58 additions & 1 deletion spec/concurrent/set_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ module Concurrent
end

context 'concurrency' do
it do
it '#add and #delete' do
(1..Concurrent::ThreadSafe::Test::THREADS).map do |i|
in_thread do
1000.times do
Expand All @@ -55,6 +55,63 @@ module Concurrent
end.map(&:join)
expect(set).to be_empty
end

it 'force context switch' do
barrier = Concurrent::CyclicBarrier.new(2)

# methods like include? or delete? are implemented for CRuby in Ruby itself
# @see https://github.com/ruby/ruby/blob/master/lib/set.rb
set.clear

# add a single element
set.add(1)

# This thread should start and `Set#reject!` in CRuby should cache a value of `0` for size
thread_reject = in_thread do
# we expect this to return nil since nothing should have changed.
expect(set.reject! do |v|
barrier.wait
v == 1 # only delete the 1 value
end).to eq set
end

thread_add = in_thread do
barrier.wait
expect(set.add?(1)).to eq set
end

join_with [thread_reject, thread_add]
end

it '#each' do
threads = []
("a".."z").inject(set, &:<<) # setup a non-empty set

threads << in_thread do
2000.times do
size = nil
set.each do |member|
if size.nil?
size = set.length
else
expect(set.length).to eq(size)
end
end
end
end

threads += (1..19).map do |i|
in_thread do
v = i * 1000
10.times do
200.times { |j| set << (v+j) }
200.times { |j| set.delete(v+j) }
end
end
end

threads.map(&:join)
end
end
end
end

0 comments on commit dd2d8cf

Please sign in to comment.