diff --git a/doc/command_line_usage.rdoc b/doc/command_line_usage.rdoc
index 5661566b4..2a25f30b2 100644
--- a/doc/command_line_usage.rdoc
+++ b/doc/command_line_usage.rdoc
@@ -37,6 +37,15 @@ Options are:
[--help (-H)]
Display some help text and exit.
+[--jobs _number_ (-j)]
+ Specifies the maximum number of concurrent tasks. The suggested
+ value is equal to the number of CPUs.
+
+ Sample values:
+ (no -j) : unlimited concurrent tasks (standard rake behavior)
+ -j : 2 concurrent tasks (exact number may change)
+ -j 16 : 16 concurrent tasks
+
[--libdir _directory_ (-I)]
Add _directory_ to the list of directories searched for require.
diff --git a/lib/rake/application.rb b/lib/rake/application.rb
index 08a0996b2..6afdf8254 100644
--- a/lib/rake/application.rb
+++ b/lib/rake/application.rb
@@ -2,6 +2,7 @@
require 'optparse'
require 'rake/task_manager'
+require 'rake/thread_pool'
require 'rake/win32'
module Rake
@@ -64,6 +65,7 @@ def run
init
load_rakefile
top_level
+ thread_pool.join
end
end
@@ -106,6 +108,10 @@ def options
@options ||= OpenStruct.new
end
+ def thread_pool
+ @thread_pool ||= ThreadPool.new options.thread_pool_size
+ end
+
# private ----------------------------------------------------------------
def invoke_task(task_string)
@@ -349,6 +355,10 @@ def standard_rake_options
"Execute some Ruby code, then continue with normal task processing.",
lambda { |value| eval(value) }
],
+ ['--jobs', '-j [NUMBER]',
+ "Specifies the maximum number of tasks to execute in parallel. (default:2)",
+ lambda { |value| options.thread_pool_size = [(value || 2).to_i,2].max }
+ ],
['--libdir', '-I LIBDIR', "Include LIBDIR in the search path for required modules.",
lambda { |value| $:.push(value) }
],
diff --git a/lib/rake/multi_task.rb b/lib/rake/multi_task.rb
index 21c8de732..224bc7cce 100644
--- a/lib/rake/multi_task.rb
+++ b/lib/rake/multi_task.rb
@@ -6,10 +6,12 @@ module Rake
class MultiTask < Task
private
def invoke_prerequisites(args, invocation_chain)
- threads = @prerequisites.collect { |p|
- Thread.new(p) { |r| application[r, @scope].invoke_with_call_chain(args, invocation_chain) }
- }
- threads.each { |t| t.join }
+ futures = @prerequisites.collect do |p|
+ application.thread_pool.future(p) do |r|
+ application[r, @scope].invoke_with_call_chain(args, invocation_chain)
+ end
+ end
+ futures.each { |f| f.call }
end
end
diff --git a/lib/rake/thread_pool.rb b/lib/rake/thread_pool.rb
new file mode 100644
index 000000000..9b6e88005
--- /dev/null
+++ b/lib/rake/thread_pool.rb
@@ -0,0 +1,133 @@
+require 'thread'
+require 'set'
+
+module Rake
+
+ class ThreadPool
+
+ # Creates a ThreadPool object.
+ # The parameter is the size of the pool. By default, the pool uses unlimited threads.
+ def initialize(thread_count=nil)
+ @max_thread_count = [(thread_count||FIXNUM_MAX), 0].max
+ @threads = Set.new
+ @threads_mon = Monitor.new
+ @queue = Queue.new
+ @join_cond = @threads_mon.new_cond
+ end
+
+ # Creates a future to be executed in the ThreadPool.
+ # The args are passed to the block when executing (similarly to Thread#new)
+ # The return value is a Proc which may or may not be already executing in
+ # another thread. Calling Proc#call will sleep the current thread until
+ # the future is finished and will return the result (or raise an Exception
+ # thrown from the future)
+ def future(*args,&block)
+ # capture the local args for the block (like Thread#start)
+ local_args = args.collect { |a| begin; a.dup; rescue; a; end }
+
+ promise_mutex = Mutex.new
+ promise_result = promise_error = NOT_SET
+
+ # (promise code builds on Ben Lavender's public-domain 'promise' gem)
+ promise = lambda do
+ # return immediately if the future has been executed
+ unless promise_result.equal?(NOT_SET) && promise_error.equal?(NOT_SET)
+ return promise_error.equal?(NOT_SET) ? promise_result : raise(promise_error)
+ end
+
+ # try to get the lock and execute the promise, otherwise, sleep.
+ if promise_mutex.try_lock
+ if promise_result.equal?(NOT_SET) && promise_error.equal?(NOT_SET)
+ #execute the promise
+ begin
+ promise_result = block.call(*local_args)
+ rescue Exception => e
+ promise_error = e
+ end
+ block = local_args = nil # GC can now clean these up
+ end
+ promise_mutex.unlock
+ else
+ # Even if we didn't get the lock, we need to sleep until the promise has
+ # finished executing. If, however, the current thread is part of the thread
+ # pool, we need to free up a new thread in the pool so there will
+ # always be a thread doing work.
+
+ wait_for_promise = lambda { promise_mutex.synchronize{} }
+
+ unless @threads_mon.synchronize { @threads.include? Thread.current }
+ wait_for_promise.call
+ else
+ @threads_mon.synchronize { @max_thread_count += 1 }
+ start_thread
+ wait_for_promise.call
+ @threads_mon.synchronize { @max_thread_count -= 1 }
+ end
+ end
+ promise_error.equal?(NOT_SET) ? promise_result : raise(promise_error)
+ end
+
+ @queue.enq promise
+ start_thread
+ promise
+ end
+
+ # Waits until the queue of futures is empty and all threads have exited.
+ def join
+ @threads_mon.synchronize do
+ begin
+ @join_cond.wait unless @threads.empty?
+ rescue Exception => e
+ STDERR.puts e
+ STDERR.print "Queue contains #{@queue.size} items. Thread pool contains #{@threads.count} threads\n"
+ STDERR.print "Current Thread #{Thread.current} status = #{Thread.current.status}\n"
+ STDERR.puts e.backtrace.join("\n")
+ @threads.each do |t|
+ STDERR.print "Thread #{t} status = #{t.status}\n"
+ STDERR.puts t.backtrace.join("\n") if t.respond_to? :backtrace
+ end
+ raise e
+ end
+ end
+ end
+
+ private
+ def start_thread
+ @threads_mon.synchronize do
+ next unless @threads.count < @max_thread_count
+
+ @threads << Thread.new do
+ begin
+ while @threads.count <= @max_thread_count && !@queue.empty? do
+ # Even though we just asked if the queue was empty,
+ # it still could have had an item which by this statement is now gone.
+ # For this reason we pass true to Queue#deq because we will sleep
+ # indefinitely if it is empty.
+ @queue.deq(true).call
+ end
+ rescue ThreadError # this means the queue is empty
+ ensure
+ @threads_mon.synchronize do
+ @threads.delete Thread.current
+ @join_cond.broadcast if @threads.empty?
+ end
+ end
+ end
+ end
+ end
+
+ # for testing only
+
+ def __queue__
+ @queue
+ end
+
+ def __threads__
+ @threads.dup
+ end
+
+ NOT_SET = Object.new.freeze
+ FIXNUM_MAX = (2**(0.size * 8 - 2) - 1) # FIXNUM_MAX
+ end
+
+end
diff --git a/test/test_rake_application_options.rb b/test/test_rake_application_options.rb
index 3d60cb697..a528e3705 100644
--- a/test/test_rake_application_options.rb
+++ b/test/test_rake_application_options.rb
@@ -41,6 +41,7 @@ def test_default_options
assert_nil opts.show_tasks
assert_nil opts.silent
assert_nil opts.trace
+ assert_nil opts.thread_pool_size
assert_equal ['rakelib'], opts.rakelib
assert ! Rake::FileUtilsExt.verbose_flag
assert ! Rake::FileUtilsExt.nowrite_flag
@@ -111,6 +112,18 @@ def test_help
assert_equal :exit, @exit
end
+ def test_jobs
+ flags(['--jobs', '4'], ['-j', '4']) do |opts|
+ assert_equal 4, opts.thread_pool_size
+ end
+ flags(['--jobs', 'asdas'], ['-j', 'asdas']) do |opts|
+ assert_equal 2, opts.thread_pool_size
+ end
+ flags('--jobs', '-j') do |opts|
+ assert_equal 2, opts.thread_pool_size
+ end
+ end
+
def test_libdir
flags(['--libdir', 'xx'], ['-I', 'xx'], ['-Ixx']) do |opts|
$:.include?('xx')
diff --git a/test/test_rake_thread_pool.rb b/test/test_rake_thread_pool.rb
new file mode 100644
index 000000000..24148805c
--- /dev/null
+++ b/test/test_rake_thread_pool.rb
@@ -0,0 +1,146 @@
+require File.expand_path('../helper', __FILE__)
+require 'rake/thread_pool'
+require 'test/unit/assertions'
+
+class TestRakeTestThreadPool < Rake::TestCase
+ include Rake
+
+ def test_pool_executes_in_current_thread_for_zero_threads
+ pool = ThreadPool.new(0)
+ f = pool.future{Thread.current}
+ pool.join
+ assert_equal Thread.current, f.call
+ end
+
+ def test_pool_executes_in_other_thread_for_pool_of_size_one
+ pool = ThreadPool.new(1)
+ f = pool.future{Thread.current}
+ pool.join
+ refute_equal Thread.current, f.call
+ end
+
+ def test_pool_executes_in_two_other_threads_for_pool_of_size_two
+ pool = ThreadPool.new(2)
+ threads = 2.times.collect{ pool.future{ sleep 0.1; Thread.current } }.each{|f|f.call}
+
+ refute_equal threads[0], threads[1]
+ refute_equal Thread.current, threads[0]
+ refute_equal Thread.current, threads[1]
+ end
+
+ def test_pool_creates_the_correct_number_of_threads
+ pool = ThreadPool.new(2)
+ threads = Set.new
+ t_mutex = Mutex.new
+ 10.times.each do
+ pool.future do
+ sleep 0.02
+ t_mutex.synchronize{ threads << Thread.current }
+ end
+ end
+ pool.join
+ assert_equal 2, threads.count
+ end
+
+ def test_pool_future_captures_arguments
+ pool = ThreadPool.new(2)
+ a = 'a'
+ b = 'b'
+ c = 5 # 5 throws an execption with 5.dup. It should be ignored
+ pool.future(a,c){ |a_var,ignore| a_var.capitalize!; b.capitalize! }
+ pool.join
+ assert_equal 'a', a
+ assert_equal 'b'.capitalize, b
+ end
+
+ def test_pool_join_empties_queue
+ pool = ThreadPool.new(2)
+ repeat = 25
+ repeat.times {
+ pool.future do
+ repeat.times {
+ pool.future do
+ repeat.times {
+ pool.future do end
+ }
+ end
+ }
+ end
+ }
+
+ pool.join
+ assert_equal true, pool.__send__(:__queue__).empty?
+ end
+
+ # test that throwing an exception way down in the blocks propagates
+ # to the top
+ def test_exceptions
+ pool = ThreadPool.new(10)
+
+ deep_exception_block = lambda do |count|
+ next raise Exception.new if ( count < 1 )
+ pool.future(count-1, &deep_exception_block).call
+ end
+
+ assert_raises(Exception) do
+ pool.future(2, &deep_exception_block).call
+ end
+
+ end
+
+ def test_pool_always_has_max_threads_doing_work
+ # here we need to test that even if some threads are halted, there
+ # are always at least max_threads that are not sleeping.
+ pool = ThreadPool.new(2)
+ initial_sleep_time = 0.2
+ future1 = pool.future { sleep initial_sleep_time }
+ dependent_futures = 5.times.collect { pool.future{ future1.call } }
+ future2 = pool.future { sleep initial_sleep_time }
+ future3 = pool.future { sleep 0.01 }
+
+ sleep initial_sleep_time / 2.0 # wait for everything to queue up
+
+ # at this point, we should have 5 threads sleeping depending on future1, and
+ # two threads doing work on future1 and future 2.
+ assert_equal pool.__send__(:__threads__).count, 7
+
+ # future 3 is in the queue because there aren't enough active threads to work on it.
+ assert_equal pool.__send__(:__queue__).size, 1
+
+ [future1, dependent_futures, future2, future3].flatten.each { |f| f.call }
+ pool.join
+ end
+
+ def test_pool_prevents_deadlock
+ pool = ThreadPool.new(5)
+
+ common_dependency_a = pool.future { sleep 0.2 }
+ futures_a = 10.times.collect { pool.future{ common_dependency_a.call; sleep(rand() * 0.01) } }
+
+ common_dependency_b = pool.future { futures_a.each { |f| f.call } }
+ futures_b = 10.times.collect { pool.future{ common_dependency_b.call; sleep(rand() * 0.01) } }
+
+ futures_b.each{|f|f.call}
+ pool.join
+ end
+
+ def test_pool_reports_correct_results
+ pool = ThreadPool.new(7)
+
+ a = 18
+ b = 5
+ c = 3
+
+ result = a.times.collect do
+ pool.future do
+ b.times.collect do
+ pool.future { sleep rand * 0.001; c }
+ end.inject(0) { |m,f| m+f.call }
+ end
+ end.inject(0) { |m,f| m+f.call }
+
+ assert_equal( (a*b*c), result )
+ pool.join
+ end
+
+end