Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multithreading improvements #78

Merged
merged 20 commits into from
Nov 17, 2018
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Added serial_valve
pmconrad committed Oct 5, 2018
commit e336b0bb5c412c8bd53b665a7a37036e2b290dca
40 changes: 40 additions & 0 deletions include/fc/thread/parallel.hpp
Original file line number Diff line number Diff line change
@@ -45,6 +45,46 @@ namespace fc {
worker_pool& get_worker_pool();
}

class serial_valve {
private:
class ticket_guard {
public:
ticket_guard( boost::atomic<future<void>*>& latch );
~ticket_guard();
void wait_for_my_turn();
private:
promise<void>* my_promise;
future<void>* ticket;
};

friend class ticket_guard;
boost::atomic<future<void>*> latch;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need to include boost/atomic/atomic.hpp to compile in Windows.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added, thanks!


public:
serial_valve();
~serial_valve();

/** Executes f1() then f2().
* For any two calls do_serial(f1,f2) and do_serial(f1',f2') where
* do_serial(f1,f2) is invoked before do_serial(f1',f2'), it is
* guaranteed that f2' will be executed after f2 has completed. Failure
* of either function counts as completion of both.
* If f1 throws then f2 will not be invoked.
*
* @param f1 a functor to invoke
* @param f2 a functor to invoke
* @return the return value of f2()
*/
template<typename Functor1,typename Functor2>
auto do_serial( const Functor1& f1, const Functor2& f2 ) -> decltype(f2())
{
ticket_guard guard( latch );
f1();
guard.wait_for_my_turn();
return f2();
}
};

/**
* Calls function <code>f</code> in a separate thread and returns a future
* that can be used to wait on the result.
46 changes: 45 additions & 1 deletion src/thread/parallel.cpp
Original file line number Diff line number Diff line change
@@ -151,4 +151,48 @@ namespace fc {
return the_pool;
}
}
}

serial_valve::ticket_guard::ticket_guard( boost::atomic<future<void>*>& latch )
{
my_promise = new promise<void>();
future<void>* my_future = new future<void>( promise<void>::ptr( my_promise, true ) );
try
{
do
{
ticket = latch.load();
FC_ASSERT( ticket, "Valve is shutting down!" );
}
while( !latch.compare_exchange_weak( ticket, my_future ) );
}
catch (...)
{
delete my_future; // this takes care of my_promise as well
throw;
}
}

serial_valve::ticket_guard::~ticket_guard()
{
my_promise->set_value();
ticket->wait();
delete ticket;
}

void serial_valve::ticket_guard::wait_for_my_turn()
{
ticket->wait();
}

serial_valve::serial_valve()
{
latch.store( new future<void>( promise<void>::ptr( new promise<void>( true ), true ) ) );
}

serial_valve::~serial_valve()
{
fc::future<void>* last = latch.exchange( 0 );
last->wait();
delete last;
}
} // namespace fc
145 changes: 145 additions & 0 deletions tests/thread/parallel_tests.cpp
Original file line number Diff line number Diff line change
@@ -177,4 +177,149 @@ BOOST_AUTO_TEST_CASE( sign_verify_parallel )
}
}

BOOST_AUTO_TEST_CASE( serial_valve )
{
boost::atomic<uint32_t> counter(0);
fc::serial_valve valve;

{ // Simple test, f2 finishes before f1
fc::promise<void>* syncer = new fc::promise<void>();
fc::promise<void>* waiter = new fc::promise<void>();
auto p1 = fc::async([&counter,&valve,syncer,waiter] () {
valve.do_serial( [syncer,waiter](){ syncer->set_value();
fc::future<void>( fc::shared_ptr<fc::promise<void>>( waiter, true ) ).wait(); },
[&counter](){ BOOST_CHECK_EQUAL( 0, counter.load() );
counter.fetch_add(1); } );
});
fc::future<void>( fc::shared_ptr<fc::promise<void>>( syncer, true ) ).wait();

// at this point, p1.f1 has started executing and is waiting on waiter

syncer = new fc::promise<void>();
auto p2 = fc::async([&counter,&valve,syncer] () {
valve.do_serial( [syncer](){ syncer->set_value(); },
[&counter](){ BOOST_CHECK_EQUAL( 1, counter.load() );
counter.fetch_add(1); } );
});
fc::future<void>( fc::shared_ptr<fc::promise<void>>( syncer, true ) ).wait();

fc::usleep( fc::milliseconds(10) );

// at this point, p2.f1 has started executing and p2.f2 is waiting for its turn

BOOST_CHECK( !p1.ready() );
BOOST_CHECK( !p2.ready() );

waiter->set_value(); // signal p1.f1 to continue

p2.wait(); // and wait for p2.f2 to complete

BOOST_CHECK( p1.ready() );
BOOST_CHECK( p2.ready() );
BOOST_CHECK_EQUAL( 2, counter.load() );
}

{ // Triple test, f3 finishes first, then f1, finally f2
fc::promise<void>* syncer = new fc::promise<void>();
fc::promise<void>* waiter = new fc::promise<void>();
counter.store(0);
auto p1 = fc::async([&counter,&valve,syncer,waiter] () {
valve.do_serial( [&syncer,waiter](){ syncer->set_value();
fc::future<void>( fc::shared_ptr<fc::promise<void>>( waiter, true ) ).wait(); },
[&counter](){ BOOST_CHECK_EQUAL( 0, counter.load() );
counter.fetch_add(1); } );
});
fc::future<void>( fc::shared_ptr<fc::promise<void>>( syncer, true ) ).wait();

// at this point, p1.f1 has started executing and is waiting on waiter

syncer = new fc::promise<void>();
auto p2 = fc::async([&counter,&valve,syncer] () {
valve.do_serial( [&syncer](){ syncer->set_value();
fc::usleep( fc::milliseconds(100) ); },
[&counter](){ BOOST_CHECK_EQUAL( 1, counter.load() );
counter.fetch_add(1); } );
});
fc::future<void>( fc::shared_ptr<fc::promise<void>>( syncer, true ) ).wait();

// at this point, p2.f1 has started executing and is sleeping

syncer = new fc::promise<void>();
auto p3 = fc::async([&counter,&valve,syncer] () {
valve.do_serial( [syncer](){ syncer->set_value(); },
[&counter](){ BOOST_CHECK_EQUAL( 2, counter.load() );
counter.fetch_add(1); } );
});
fc::future<void>( fc::shared_ptr<fc::promise<void>>( syncer, true ) ).wait();

fc::usleep( fc::milliseconds(10) );

// at this point, p3.f1 has started executing and p3.f2 is waiting for its turn

BOOST_CHECK( !p1.ready() );
BOOST_CHECK( !p2.ready() );
BOOST_CHECK( !p3.ready() );

waiter->set_value(); // signal p1.f1 to continue

p3.wait(); // and wait for p3.f2 to complete

BOOST_CHECK( p1.ready() );
BOOST_CHECK( p2.ready() );
BOOST_CHECK( p3.ready() );
BOOST_CHECK_EQUAL( 3, counter.load() );
}

{ // Triple test again but with invocations from different threads
fc::promise<void>* syncer = new fc::promise<void>();
fc::promise<void>* waiter = new fc::promise<void>();
counter.store(0);
auto p1 = fc::do_parallel([&counter,&valve,syncer,waiter] () {
valve.do_serial( [&syncer,waiter](){ syncer->set_value();
fc::future<void>( fc::shared_ptr<fc::promise<void>>( waiter, true ) ).wait(); },
[&counter](){ BOOST_CHECK_EQUAL( 0, counter.load() );
counter.fetch_add(1); } );
});
fc::future<void>( fc::shared_ptr<fc::promise<void>>( syncer, true ) ).wait();

// at this point, p1.f1 has started executing and is waiting on waiter

syncer = new fc::promise<void>();
auto p2 = fc::do_parallel([&counter,&valve,syncer] () {
valve.do_serial( [&syncer](){ syncer->set_value();
fc::usleep( fc::milliseconds(100) ); },
[&counter](){ BOOST_CHECK_EQUAL( 1, counter.load() );
counter.fetch_add(1); } );
});
fc::future<void>( fc::shared_ptr<fc::promise<void>>( syncer, true ) ).wait();

// at this point, p2.f1 has started executing and is sleeping

syncer = new fc::promise<void>();
auto p3 = fc::do_parallel([&counter,&valve,syncer] () {
valve.do_serial( [syncer](){ syncer->set_value(); },
[&counter](){ BOOST_CHECK_EQUAL( 2, counter.load() );
counter.fetch_add(1); } );
});
fc::future<void>( fc::shared_ptr<fc::promise<void>>( syncer, true ) ).wait();

fc::usleep( fc::milliseconds(10) );

// at this point, p3.f1 has started executing and p3.f2 is waiting for its turn

BOOST_CHECK( !p1.ready() );
BOOST_CHECK( !p2.ready() );
BOOST_CHECK( !p3.ready() );

waiter->set_value(); // signal p1.f1 to continue

p3.wait(); // and wait for p3.f2 to complete

BOOST_CHECK( p1.ready() );
BOOST_CHECK( p2.ready() );
BOOST_CHECK( p3.ready() );
BOOST_CHECK_EQUAL( 3, counter.load() );
}
}

BOOST_AUTO_TEST_SUITE_END()