From 173745949fa299d20a558c0511c02991f056933d Mon Sep 17 00:00:00 2001 From: Michael Bishop Date: Mon, 15 Oct 2012 09:23:11 -0400 Subject: [PATCH 1/4] Added -j support to rake. Rake now has a thread_pool implementation which returns futures when passed args and a block. MultiTask has been changed to ask the thread pool for a list of futures in which inside each a prerequisite is completed. MultiTask then waits on each future until it is complete. The number of threads in the pool is controlled with the new -j option at the command-line. The thread pool is now a member of Rake.application and rakefile authors can request futures for their own operations, participating in the pool. The thread pool is special in that it will spawn a new thread when a thread in the pool is sleeping because it is waiting for a future being completed by another thread. When the new thread is finished, the pool size will shrink to where it was previously. With this change, the pool always has a number of threads actively doing work (that number being equal to the -j parameter). This commit also includes documentation for the new -j parameter and a test for the ThreadPool implementation. --- doc/command_line_usage.rdoc | 9 ++ lib/rake/application.rb | 10 ++ lib/rake/multi_task.rb | 10 +- lib/rake/thread_pool.rb | 133 ++++++++++++++++++++++++++ test/test_rake_application_options.rb | 13 +++ test/test_rake_test_thread_pool.rb | 124 ++++++++++++++++++++++++ 6 files changed, 295 insertions(+), 4 deletions(-) create mode 100644 lib/rake/thread_pool.rb create mode 100644 test/test_rake_test_thread_pool.rb diff --git a/doc/command_line_usage.rdoc b/doc/command_line_usage.rdoc index 5661566b4..2a25f30b2 100644 --- a/doc/command_line_usage.rdoc +++ b/doc/command_line_usage.rdoc @@ -37,6 +37,15 @@ Options are: [--help (-H)] Display some help text and exit. +[--jobs _number_ (-j)] + Specifies the maximum number of concurrent tasks. The suggested + value is equal to the number of CPUs. + + Sample values: + (no -j) : unlimited concurrent tasks (standard rake behavior) + -j : 2 concurrent tasks (exact number may change) + -j 16 : 16 concurrent tasks + [--libdir _directory_ (-I)] Add _directory_ to the list of directories searched for require. diff --git a/lib/rake/application.rb b/lib/rake/application.rb index a2d6284f9..51d3f84e6 100644 --- a/lib/rake/application.rb +++ b/lib/rake/application.rb @@ -2,6 +2,7 @@ require 'optparse' require 'rake/task_manager' +require 'rake/thread_pool' require 'rake/win32' module Rake @@ -64,6 +65,7 @@ def run init load_rakefile top_level + thread_pool.join end end @@ -106,6 +108,10 @@ def options @options ||= OpenStruct.new end + def thread_pool + @thread_pool ||= ThreadPool.new options.thread_pool_size + end + # private ---------------------------------------------------------------- def invoke_task(task_string) @@ -325,6 +331,10 @@ def standard_rake_options "Execute some Ruby code, then continue with normal task processing.", lambda { |value| eval(value) } ], + ['--jobs', '-j [NUMBER]', + "Specifies the maximum number of tasks to execute in parallel. (default:2)", + lambda { |value| options.thread_pool_size = [(value || 2).to_i,2].max } + ], ['--libdir', '-I LIBDIR', "Include LIBDIR in the search path for required modules.", lambda { |value| $:.push(value) } ], diff --git a/lib/rake/multi_task.rb b/lib/rake/multi_task.rb index 21c8de732..224bc7cce 100644 --- a/lib/rake/multi_task.rb +++ b/lib/rake/multi_task.rb @@ -6,10 +6,12 @@ module Rake class MultiTask < Task private def invoke_prerequisites(args, invocation_chain) - threads = @prerequisites.collect { |p| - Thread.new(p) { |r| application[r, @scope].invoke_with_call_chain(args, invocation_chain) } - } - threads.each { |t| t.join } + futures = @prerequisites.collect do |p| + application.thread_pool.future(p) do |r| + application[r, @scope].invoke_with_call_chain(args, invocation_chain) + end + end + futures.each { |f| f.call } end end diff --git a/lib/rake/thread_pool.rb b/lib/rake/thread_pool.rb new file mode 100644 index 000000000..9b6e88005 --- /dev/null +++ b/lib/rake/thread_pool.rb @@ -0,0 +1,133 @@ +require 'thread' +require 'set' + +module Rake + + class ThreadPool + + # Creates a ThreadPool object. + # The parameter is the size of the pool. By default, the pool uses unlimited threads. + def initialize(thread_count=nil) + @max_thread_count = [(thread_count||FIXNUM_MAX), 0].max + @threads = Set.new + @threads_mon = Monitor.new + @queue = Queue.new + @join_cond = @threads_mon.new_cond + end + + # Creates a future to be executed in the ThreadPool. + # The args are passed to the block when executing (similarly to Thread#new) + # The return value is a Proc which may or may not be already executing in + # another thread. Calling Proc#call will sleep the current thread until + # the future is finished and will return the result (or raise an Exception + # thrown from the future) + def future(*args,&block) + # capture the local args for the block (like Thread#start) + local_args = args.collect { |a| begin; a.dup; rescue; a; end } + + promise_mutex = Mutex.new + promise_result = promise_error = NOT_SET + + # (promise code builds on Ben Lavender's public-domain 'promise' gem) + promise = lambda do + # return immediately if the future has been executed + unless promise_result.equal?(NOT_SET) && promise_error.equal?(NOT_SET) + return promise_error.equal?(NOT_SET) ? promise_result : raise(promise_error) + end + + # try to get the lock and execute the promise, otherwise, sleep. + if promise_mutex.try_lock + if promise_result.equal?(NOT_SET) && promise_error.equal?(NOT_SET) + #execute the promise + begin + promise_result = block.call(*local_args) + rescue Exception => e + promise_error = e + end + block = local_args = nil # GC can now clean these up + end + promise_mutex.unlock + else + # Even if we didn't get the lock, we need to sleep until the promise has + # finished executing. If, however, the current thread is part of the thread + # pool, we need to free up a new thread in the pool so there will + # always be a thread doing work. + + wait_for_promise = lambda { promise_mutex.synchronize{} } + + unless @threads_mon.synchronize { @threads.include? Thread.current } + wait_for_promise.call + else + @threads_mon.synchronize { @max_thread_count += 1 } + start_thread + wait_for_promise.call + @threads_mon.synchronize { @max_thread_count -= 1 } + end + end + promise_error.equal?(NOT_SET) ? promise_result : raise(promise_error) + end + + @queue.enq promise + start_thread + promise + end + + # Waits until the queue of futures is empty and all threads have exited. + def join + @threads_mon.synchronize do + begin + @join_cond.wait unless @threads.empty? + rescue Exception => e + STDERR.puts e + STDERR.print "Queue contains #{@queue.size} items. Thread pool contains #{@threads.count} threads\n" + STDERR.print "Current Thread #{Thread.current} status = #{Thread.current.status}\n" + STDERR.puts e.backtrace.join("\n") + @threads.each do |t| + STDERR.print "Thread #{t} status = #{t.status}\n" + STDERR.puts t.backtrace.join("\n") if t.respond_to? :backtrace + end + raise e + end + end + end + + private + def start_thread + @threads_mon.synchronize do + next unless @threads.count < @max_thread_count + + @threads << Thread.new do + begin + while @threads.count <= @max_thread_count && !@queue.empty? do + # Even though we just asked if the queue was empty, + # it still could have had an item which by this statement is now gone. + # For this reason we pass true to Queue#deq because we will sleep + # indefinitely if it is empty. + @queue.deq(true).call + end + rescue ThreadError # this means the queue is empty + ensure + @threads_mon.synchronize do + @threads.delete Thread.current + @join_cond.broadcast if @threads.empty? + end + end + end + end + end + + # for testing only + + def __queue__ + @queue + end + + def __threads__ + @threads.dup + end + + NOT_SET = Object.new.freeze + FIXNUM_MAX = (2**(0.size * 8 - 2) - 1) # FIXNUM_MAX + end + +end diff --git a/test/test_rake_application_options.rb b/test/test_rake_application_options.rb index c1471f443..62d731936 100644 --- a/test/test_rake_application_options.rb +++ b/test/test_rake_application_options.rb @@ -40,6 +40,7 @@ def test_default_options assert_nil opts.show_tasks assert_nil opts.silent assert_nil opts.trace + assert_nil opts.thread_pool_size assert_equal ['rakelib'], opts.rakelib assert ! Rake::FileUtilsExt.verbose_flag assert ! Rake::FileUtilsExt.nowrite_flag @@ -110,6 +111,18 @@ def test_help assert_equal :exit, @exit end + def test_jobs + flags(['--jobs', '4'], ['-j', '4']) do |opts| + assert_equal 4, opts.thread_pool_size + end + flags(['--jobs', 'asdas'], ['-j', 'asdas']) do |opts| + assert_equal 2, opts.thread_pool_size + end + flags('--jobs', '-j') do |opts| + assert_equal 2, opts.thread_pool_size + end + end + def test_libdir flags(['--libdir', 'xx'], ['-I', 'xx'], ['-Ixx']) do |opts| $:.include?('xx') diff --git a/test/test_rake_test_thread_pool.rb b/test/test_rake_test_thread_pool.rb new file mode 100644 index 000000000..d395b0992 --- /dev/null +++ b/test/test_rake_test_thread_pool.rb @@ -0,0 +1,124 @@ +require File.expand_path('../helper', __FILE__) +require 'rake/thread_pool' +require 'test/unit/assertions' + +class TestRakeTestThreadPool < Rake::TestCase + include Rake + + def test_pool_executes_in_current_thread_for_zero_threads + pool = ThreadPool.new(0) + f = pool.future{Thread.current} + pool.join + assert_equal Thread.current, f.call + end + + def test_pool_executes_in_other_thread_for_pool_of_size_one + pool = ThreadPool.new(1) + f = pool.future{Thread.current} + pool.join + refute_equal Thread.current, f.call + end + + def test_pool_executes_in_two_other_threads_for_pool_of_size_two + pool = ThreadPool.new(2) + threads = 2.times.collect{ pool.future{ sleep 0.1; Thread.current } }.each{|f|f.call} + + refute_equal threads[0], threads[1] + refute_equal Thread.current, threads[0] + refute_equal Thread.current, threads[1] + end + + def test_pool_creates_the_correct_number_of_threads + pool = ThreadPool.new(2) + threads = Set.new + t_mutex = Mutex.new + 10.times.each do + pool.future do + sleep 0.02 + t_mutex.synchronize{ threads << Thread.current } + end + end + pool.join + assert_equal 2, threads.count + end + + def test_pool_future_captures_arguments + pool = ThreadPool.new(2) + a = 'a' + b = 'b' + c = 5 # 5 throws an execption with 5.dup. It should be ignored + pool.future(a,c){ |a_var,ignore| a_var.capitalize!; b.capitalize! } + pool.join + assert_equal 'a', a + assert_equal 'b'.capitalize, b + end + + def test_pool_join_empties_queue + pool = ThreadPool.new(2) + repeat = 25 + repeat.times { pool.future do + repeat.times { pool.future do + repeat.times { pool.future do + ; + end } + end } + end } + + pool.join + assert_equal true, pool.__send__(:__queue__).empty? + end + + # test that throwing an exception way down in the blocks propagates + # to the top + def test_exceptions + pool = ThreadPool.new(10) + + deep_exception_block = lambda do |count| + next raise Exception.new if ( count < 1 ) + pool.future(count-1, &deep_exception_block).call + end + + assert_raises(Exception) do + pool.future(2, &deep_exception_block).call + end + + end + + def test_pool_always_has_max_threads_doing_work + # here we need to test that even if some threads are halted, there + # are always at least max_threads that are not sleeping. + pool = ThreadPool.new(2) + initial_sleep_time = 0.2 + future1 = pool.future { sleep initial_sleep_time } + dependent_futures = 5.times.collect { pool.future{ future1.call } } + future2 = pool.future { sleep initial_sleep_time } + future3 = pool.future { sleep 0.01 } + + sleep initial_sleep_time / 2.0 # wait for everything to queue up + + # at this point, we should have 5 threads sleeping depending on future1, and + # two threads doing work on future1 and future 2. + assert_equal pool.__send__(:__threads__).count, 7 + + # future 3 is in the queue because there aren't enough active threads to work on it. + assert_equal pool.__send__(:__queue__).size, 1 + + [future1, dependent_futures, future2, future3].flatten.each { |f| f.call } + pool.join + end + + def test_pool_prevents_deadlock + pool = ThreadPool.new(5) + + common_dependency_a = pool.future { sleep 0.2 } + futures_a = 10.times.collect { pool.future{ common_dependency_a.call; sleep(rand() * 0.01) } } + + common_dependency_b = pool.future { futures_a.each { |f| f.call } } + futures_b = 10.times.collect { pool.future{ common_dependency_b.call; sleep(rand() * 0.01) } } + + (futures_b).each{|f|f.call} + pool.join + end + +end + From a8bdcf018efedef14b07b9238a0bc1730b1ac03f Mon Sep 17 00:00:00 2001 From: Michael Bishop Date: Thu, 18 Oct 2012 21:21:08 -0400 Subject: [PATCH 2/4] Added test verifying results from the futures. --- ...hread_pool.rb => test_rake_thread_pool.rb} | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) rename test/{test_rake_test_thread_pool.rb => test_rake_thread_pool.rb} (90%) diff --git a/test/test_rake_test_thread_pool.rb b/test/test_rake_thread_pool.rb similarity index 90% rename from test/test_rake_test_thread_pool.rb rename to test/test_rake_thread_pool.rb index d395b0992..64b614f13 100644 --- a/test/test_rake_test_thread_pool.rb +++ b/test/test_rake_thread_pool.rb @@ -120,5 +120,24 @@ def test_pool_prevents_deadlock pool.join end + def test_pool_reports_correct_results + pool = ThreadPool.new(7) + + a = 18 + b = 5 + c = 3 + + result = a.times.collect do + pool.future do + b.times.collect do + pool.future { sleep rand * 0.001; c } + end.inject(0) { |m,f| m+f.call } + end + end.inject(0) { |m,f| m+f.call } + + assert_equal( (a*b*c), result ) + pool.join + end + end From 51cb56dce7a8d12a17ec9ceedcb4fd70d67af12e Mon Sep 17 00:00:00 2001 From: Jim Weirich Date: Mon, 22 Oct 2012 14:32:08 -0400 Subject: [PATCH 3/4] very minor test cleanup --- test/test_rake_thread_pool.rb | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/test/test_rake_thread_pool.rb b/test/test_rake_thread_pool.rb index 64b614f13..a0952ab54 100644 --- a/test/test_rake_thread_pool.rb +++ b/test/test_rake_thread_pool.rb @@ -56,13 +56,17 @@ def test_pool_future_captures_arguments def test_pool_join_empties_queue pool = ThreadPool.new(2) repeat = 25 - repeat.times { pool.future do - repeat.times { pool.future do - repeat.times { pool.future do - ; - end } - end } - end } + repeat.times { + pool.future do + repeat.times { + pool.future do + repeat.times { + pool.future do end + } + end + } + end + } pool.join assert_equal true, pool.__send__(:__queue__).empty? @@ -116,7 +120,7 @@ def test_pool_prevents_deadlock common_dependency_b = pool.future { futures_a.each { |f| f.call } } futures_b = 10.times.collect { pool.future{ common_dependency_b.call; sleep(rand() * 0.01) } } - (futures_b).each{|f|f.call} + futures_b.each{|f|f.call} pool.join end From 3a797bf569c9f0ba84359dc40851f059e470300f Mon Sep 17 00:00:00 2001 From: Jim Weirich Date: Mon, 22 Oct 2012 14:32:25 -0400 Subject: [PATCH 4/4] whitespace removed --- test/test_rake_thread_pool.rb | 25 ++++++++++++------------- 1 file changed, 12 insertions(+), 13 deletions(-) diff --git a/test/test_rake_thread_pool.rb b/test/test_rake_thread_pool.rb index a0952ab54..24148805c 100644 --- a/test/test_rake_thread_pool.rb +++ b/test/test_rake_thread_pool.rb @@ -11,18 +11,18 @@ def test_pool_executes_in_current_thread_for_zero_threads pool.join assert_equal Thread.current, f.call end - + def test_pool_executes_in_other_thread_for_pool_of_size_one pool = ThreadPool.new(1) f = pool.future{Thread.current} pool.join refute_equal Thread.current, f.call end - + def test_pool_executes_in_two_other_threads_for_pool_of_size_two pool = ThreadPool.new(2) threads = 2.times.collect{ pool.future{ sleep 0.1; Thread.current } }.each{|f|f.call} - + refute_equal threads[0], threads[1] refute_equal Thread.current, threads[0] refute_equal Thread.current, threads[1] @@ -52,7 +52,7 @@ def test_pool_future_captures_arguments assert_equal 'a', a assert_equal 'b'.capitalize, b end - + def test_pool_join_empties_queue pool = ThreadPool.new(2) repeat = 25 @@ -85,7 +85,7 @@ def test_exceptions assert_raises(Exception) do pool.future(2, &deep_exception_block).call end - + end def test_pool_always_has_max_threads_doing_work @@ -99,21 +99,21 @@ def test_pool_always_has_max_threads_doing_work future3 = pool.future { sleep 0.01 } sleep initial_sleep_time / 2.0 # wait for everything to queue up - + # at this point, we should have 5 threads sleeping depending on future1, and # two threads doing work on future1 and future 2. assert_equal pool.__send__(:__threads__).count, 7 - + # future 3 is in the queue because there aren't enough active threads to work on it. assert_equal pool.__send__(:__queue__).size, 1 - + [future1, dependent_futures, future2, future3].flatten.each { |f| f.call } pool.join end def test_pool_prevents_deadlock pool = ThreadPool.new(5) - + common_dependency_a = pool.future { sleep 0.2 } futures_a = 10.times.collect { pool.future{ common_dependency_a.call; sleep(rand() * 0.01) } } @@ -126,11 +126,11 @@ def test_pool_prevents_deadlock def test_pool_reports_correct_results pool = ThreadPool.new(7) - + a = 18 b = 5 c = 3 - + result = a.times.collect do pool.future do b.times.collect do @@ -138,10 +138,9 @@ def test_pool_reports_correct_results end.inject(0) { |m,f| m+f.call } end end.inject(0) { |m,f| m+f.call } - + assert_equal( (a*b*c), result ) pool.join end end -