Skip to content

Commit

Permalink
Merge branch 'mjb-pool'
Browse files Browse the repository at this point in the history
* mjb-pool:
  whitespace removed
  very minor test cleanup
  Added test verifying results from the futures.
  Added -j support to rake.

Conflicts:
	lib/rake/application.rb
  • Loading branch information
jimweirich committed Oct 22, 2012
2 parents ae57582 + 3a797bf commit bbf2da6
Show file tree
Hide file tree
Showing 6 changed files with 317 additions and 4 deletions.
9 changes: 9 additions & 0 deletions doc/command_line_usage.rdoc
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,15 @@ Options are:
[<tt>--help</tt> (-H)]
Display some help text and exit.

[<tt>--jobs</tt> _number_ (-j)]
Specifies the maximum number of concurrent tasks. The suggested
value is equal to the number of CPUs.

Sample values:
(no -j) : unlimited concurrent tasks (standard rake behavior)
-j : 2 concurrent tasks (exact number may change)
-j 16 : 16 concurrent tasks

[<tt>--libdir</tt> _directory_ (-I)]
Add _directory_ to the list of directories searched for require.

Expand Down
10 changes: 10 additions & 0 deletions lib/rake/application.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
require 'optparse'

require 'rake/task_manager'
require 'rake/thread_pool'
require 'rake/win32'

module Rake
Expand Down Expand Up @@ -64,6 +65,7 @@ def run
init
load_rakefile
top_level
thread_pool.join
end
end

Expand Down Expand Up @@ -106,6 +108,10 @@ def options
@options ||= OpenStruct.new
end

def thread_pool
@thread_pool ||= ThreadPool.new options.thread_pool_size
end

# private ----------------------------------------------------------------

def invoke_task(task_string)
Expand Down Expand Up @@ -349,6 +355,10 @@ def standard_rake_options
"Execute some Ruby code, then continue with normal task processing.",
lambda { |value| eval(value) }
],
['--jobs', '-j [NUMBER]',
"Specifies the maximum number of tasks to execute in parallel. (default:2)",
lambda { |value| options.thread_pool_size = [(value || 2).to_i,2].max }
],
['--libdir', '-I LIBDIR', "Include LIBDIR in the search path for required modules.",
lambda { |value| $:.push(value) }
],
Expand Down
10 changes: 6 additions & 4 deletions lib/rake/multi_task.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ module Rake
class MultiTask < Task
private
def invoke_prerequisites(args, invocation_chain)
threads = @prerequisites.collect { |p|
Thread.new(p) { |r| application[r, @scope].invoke_with_call_chain(args, invocation_chain) }
}
threads.each { |t| t.join }
futures = @prerequisites.collect do |p|
application.thread_pool.future(p) do |r|
application[r, @scope].invoke_with_call_chain(args, invocation_chain)
end
end
futures.each { |f| f.call }
end
end

Expand Down
133 changes: 133 additions & 0 deletions lib/rake/thread_pool.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
require 'thread'
require 'set'

module Rake

class ThreadPool

# Creates a ThreadPool object.
# The parameter is the size of the pool. By default, the pool uses unlimited threads.
def initialize(thread_count=nil)
@max_thread_count = [(thread_count||FIXNUM_MAX), 0].max
@threads = Set.new
@threads_mon = Monitor.new
@queue = Queue.new
@join_cond = @threads_mon.new_cond
end

# Creates a future to be executed in the ThreadPool.
# The args are passed to the block when executing (similarly to Thread#new)
# The return value is a Proc which may or may not be already executing in
# another thread. Calling Proc#call will sleep the current thread until
# the future is finished and will return the result (or raise an Exception
# thrown from the future)
def future(*args,&block)
# capture the local args for the block (like Thread#start)
local_args = args.collect { |a| begin; a.dup; rescue; a; end }

promise_mutex = Mutex.new
promise_result = promise_error = NOT_SET

# (promise code builds on Ben Lavender's public-domain 'promise' gem)
promise = lambda do
# return immediately if the future has been executed
unless promise_result.equal?(NOT_SET) && promise_error.equal?(NOT_SET)
return promise_error.equal?(NOT_SET) ? promise_result : raise(promise_error)
end

# try to get the lock and execute the promise, otherwise, sleep.
if promise_mutex.try_lock
if promise_result.equal?(NOT_SET) && promise_error.equal?(NOT_SET)
#execute the promise
begin
promise_result = block.call(*local_args)
rescue Exception => e
promise_error = e
end
block = local_args = nil # GC can now clean these up
end
promise_mutex.unlock
else
# Even if we didn't get the lock, we need to sleep until the promise has
# finished executing. If, however, the current thread is part of the thread
# pool, we need to free up a new thread in the pool so there will
# always be a thread doing work.

wait_for_promise = lambda { promise_mutex.synchronize{} }

unless @threads_mon.synchronize { @threads.include? Thread.current }
wait_for_promise.call
else
@threads_mon.synchronize { @max_thread_count += 1 }
start_thread
wait_for_promise.call
@threads_mon.synchronize { @max_thread_count -= 1 }
end
end
promise_error.equal?(NOT_SET) ? promise_result : raise(promise_error)
end

@queue.enq promise
start_thread
promise
end

# Waits until the queue of futures is empty and all threads have exited.
def join
@threads_mon.synchronize do
begin
@join_cond.wait unless @threads.empty?
rescue Exception => e
STDERR.puts e
STDERR.print "Queue contains #{@queue.size} items. Thread pool contains #{@threads.count} threads\n"
STDERR.print "Current Thread #{Thread.current} status = #{Thread.current.status}\n"
STDERR.puts e.backtrace.join("\n")
@threads.each do |t|
STDERR.print "Thread #{t} status = #{t.status}\n"
STDERR.puts t.backtrace.join("\n") if t.respond_to? :backtrace
end
raise e
end
end
end

private
def start_thread
@threads_mon.synchronize do
next unless @threads.count < @max_thread_count

@threads << Thread.new do
begin
while @threads.count <= @max_thread_count && !@queue.empty? do
# Even though we just asked if the queue was empty,
# it still could have had an item which by this statement is now gone.
# For this reason we pass true to Queue#deq because we will sleep
# indefinitely if it is empty.
@queue.deq(true).call
end
rescue ThreadError # this means the queue is empty
ensure
@threads_mon.synchronize do
@threads.delete Thread.current
@join_cond.broadcast if @threads.empty?
end
end
end
end
end

# for testing only

def __queue__
@queue
end

def __threads__
@threads.dup
end

NOT_SET = Object.new.freeze
FIXNUM_MAX = (2**(0.size * 8 - 2) - 1) # FIXNUM_MAX
end

end
13 changes: 13 additions & 0 deletions test/test_rake_application_options.rb
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ def test_default_options
assert_nil opts.show_tasks
assert_nil opts.silent
assert_nil opts.trace
assert_nil opts.thread_pool_size
assert_equal ['rakelib'], opts.rakelib
assert ! Rake::FileUtilsExt.verbose_flag
assert ! Rake::FileUtilsExt.nowrite_flag
Expand Down Expand Up @@ -111,6 +112,18 @@ def test_help
assert_equal :exit, @exit
end

def test_jobs
flags(['--jobs', '4'], ['-j', '4']) do |opts|
assert_equal 4, opts.thread_pool_size
end
flags(['--jobs', 'asdas'], ['-j', 'asdas']) do |opts|
assert_equal 2, opts.thread_pool_size
end
flags('--jobs', '-j') do |opts|
assert_equal 2, opts.thread_pool_size
end
end

def test_libdir
flags(['--libdir', 'xx'], ['-I', 'xx'], ['-Ixx']) do |opts|
$:.include?('xx')
Expand Down
146 changes: 146 additions & 0 deletions test/test_rake_thread_pool.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
require File.expand_path('../helper', __FILE__)
require 'rake/thread_pool'
require 'test/unit/assertions'

class TestRakeTestThreadPool < Rake::TestCase
include Rake

def test_pool_executes_in_current_thread_for_zero_threads
pool = ThreadPool.new(0)
f = pool.future{Thread.current}
pool.join
assert_equal Thread.current, f.call
end

def test_pool_executes_in_other_thread_for_pool_of_size_one
pool = ThreadPool.new(1)
f = pool.future{Thread.current}
pool.join
refute_equal Thread.current, f.call
end

def test_pool_executes_in_two_other_threads_for_pool_of_size_two
pool = ThreadPool.new(2)
threads = 2.times.collect{ pool.future{ sleep 0.1; Thread.current } }.each{|f|f.call}

refute_equal threads[0], threads[1]
refute_equal Thread.current, threads[0]
refute_equal Thread.current, threads[1]
end

def test_pool_creates_the_correct_number_of_threads
pool = ThreadPool.new(2)
threads = Set.new
t_mutex = Mutex.new
10.times.each do
pool.future do
sleep 0.02
t_mutex.synchronize{ threads << Thread.current }
end
end
pool.join
assert_equal 2, threads.count
end

def test_pool_future_captures_arguments
pool = ThreadPool.new(2)
a = 'a'
b = 'b'
c = 5 # 5 throws an execption with 5.dup. It should be ignored
pool.future(a,c){ |a_var,ignore| a_var.capitalize!; b.capitalize! }
pool.join
assert_equal 'a', a
assert_equal 'b'.capitalize, b
end

def test_pool_join_empties_queue
pool = ThreadPool.new(2)
repeat = 25
repeat.times {
pool.future do
repeat.times {
pool.future do
repeat.times {
pool.future do end
}
end
}
end
}

pool.join
assert_equal true, pool.__send__(:__queue__).empty?
end

# test that throwing an exception way down in the blocks propagates
# to the top
def test_exceptions
pool = ThreadPool.new(10)

deep_exception_block = lambda do |count|
next raise Exception.new if ( count < 1 )
pool.future(count-1, &deep_exception_block).call
end

assert_raises(Exception) do
pool.future(2, &deep_exception_block).call
end

end

def test_pool_always_has_max_threads_doing_work
# here we need to test that even if some threads are halted, there
# are always at least max_threads that are not sleeping.
pool = ThreadPool.new(2)
initial_sleep_time = 0.2
future1 = pool.future { sleep initial_sleep_time }
dependent_futures = 5.times.collect { pool.future{ future1.call } }
future2 = pool.future { sleep initial_sleep_time }
future3 = pool.future { sleep 0.01 }

sleep initial_sleep_time / 2.0 # wait for everything to queue up

# at this point, we should have 5 threads sleeping depending on future1, and
# two threads doing work on future1 and future 2.
assert_equal pool.__send__(:__threads__).count, 7

# future 3 is in the queue because there aren't enough active threads to work on it.
assert_equal pool.__send__(:__queue__).size, 1

[future1, dependent_futures, future2, future3].flatten.each { |f| f.call }
pool.join
end

def test_pool_prevents_deadlock
pool = ThreadPool.new(5)

common_dependency_a = pool.future { sleep 0.2 }
futures_a = 10.times.collect { pool.future{ common_dependency_a.call; sleep(rand() * 0.01) } }

common_dependency_b = pool.future { futures_a.each { |f| f.call } }
futures_b = 10.times.collect { pool.future{ common_dependency_b.call; sleep(rand() * 0.01) } }

futures_b.each{|f|f.call}
pool.join
end

def test_pool_reports_correct_results
pool = ThreadPool.new(7)

a = 18
b = 5
c = 3

result = a.times.collect do
pool.future do
b.times.collect do
pool.future { sleep rand * 0.001; c }
end.inject(0) { |m,f| m+f.call }
end
end.inject(0) { |m,f| m+f.call }

assert_equal( (a*b*c), result )
pool.join
end

end

0 comments on commit bbf2da6

Please sign in to comment.