Skip to content

Commit

Permalink
Merge pull request #78 from bitshares/multithreading_improvements
Browse files Browse the repository at this point in the history
Multithreading improvements
  • Loading branch information
pmconrad authored Nov 17, 2018
2 parents 0f110c3 + ed775a5 commit 460e7cc
Show file tree
Hide file tree
Showing 16 changed files with 793 additions and 98 deletions.
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ set( fc_sources
src/thread/spin_lock.cpp
src/thread/spin_yield_lock.cpp
src/thread/mutex.cpp
src/thread/parallel.cpp
src/thread/non_preemptable_scope_check.cpp
src/asio.cpp
src/string.cpp
Expand Down
8 changes: 4 additions & 4 deletions include/fc/asio.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ namespace asio {
* @brief internal implementation types/methods for fc::asio
*/
namespace detail {
using namespace fc;

class read_write_handler
{
Expand Down Expand Up @@ -59,14 +58,14 @@ namespace asio {
bool operator()( C& c, bool s ) { c.non_blocking(s); return true; }
};

#if WIN32 // windows stream handles do not support non blocking!
#if WIN32 // windows stream handles do not support non blocking!
template<>
struct non_blocking<boost::asio::windows::stream_handle> {
typedef boost::asio::windows::stream_handle C;
bool operator()( C& ) { return false; }
bool operator()( C&, bool ) { return false; }
};
#endif
#endif
} // end of namespace detail

/***
Expand All @@ -78,7 +77,8 @@ namespace asio {
public:
default_io_service_scope();
~default_io_service_scope();
static void set_num_threads(uint16_t num_threads);
static void set_num_threads(uint16_t num_threads);
static uint16_t get_num_threads();
boost::asio::io_service* io;
private:
std::vector<boost::thread*> asio_threads;
Expand Down
12 changes: 0 additions & 12 deletions include/fc/container/deque.hpp

This file was deleted.

5 changes: 5 additions & 0 deletions include/fc/crypto/sha1.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
#include <fc/fwd.hpp>
#include <fc/string.hpp>

#include <functional>

namespace fc{

class sha1
Expand Down Expand Up @@ -82,3 +84,6 @@ namespace std
}
};
}

#include <fc/reflect/reflect.hpp>
FC_REFLECT_TYPENAME( fc::sha1 )
5 changes: 3 additions & 2 deletions include/fc/thread/future.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
#endif

namespace fc {
class abstract_thread;
struct void_t{};
class priority;
class thread;
Expand Down Expand Up @@ -146,7 +145,9 @@ namespace fc {
public:
typedef fc::shared_ptr< promise<void> > ptr;
promise( const char* desc FC_TASK_NAME_DEFAULT_ARG):promise_base(desc){}
//promise( const void_t& ){ set_value(); }
promise( bool fulfilled, const char* desc FC_TASK_NAME_DEFAULT_ARG ){
if( fulfilled ) set_value();
}

void wait(const microseconds& timeout = microseconds::maximum() ){
this->_wait( timeout );
Expand Down
106 changes: 106 additions & 0 deletions include/fc/thread/parallel.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Copyright (c) 2018 The BitShares Blockchain, and contributors.
*
* The MIT License
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/

#pragma once

#include <fc/thread/task.hpp>
#include <fc/thread/thread.hpp>
#include <fc/asio.hpp>

#include <boost/atomic/atomic.hpp>

namespace fc {

namespace detail {
class pool_impl;

class worker_pool {
public:
worker_pool();
~worker_pool();
void post( task_base* task );
private:
pool_impl* my;
};

worker_pool& get_worker_pool();
}

class serial_valve {
private:
class ticket_guard {
public:
explicit ticket_guard( boost::atomic<future<void>*>& latch );
~ticket_guard();
void wait_for_my_turn();
private:
promise<void>* my_promise;
future<void>* ticket;
};

friend class ticket_guard;
boost::atomic<future<void>*> latch;

public:
serial_valve();
~serial_valve();

/** Executes f1() then f2().
* For any two calls do_serial(f1,f2) and do_serial(f1',f2') where
* do_serial(f1,f2) is invoked before do_serial(f1',f2'), it is
* guaranteed that f2' will be executed after f2 has completed. Failure
* of either function counts as completion of both.
* If f1 throws then f2 will not be invoked.
*
* @param f1 a functor to invoke
* @param f2 a functor to invoke
* @return the return value of f2()
*/
template<typename Functor1,typename Functor2>
auto do_serial( const Functor1& f1, const Functor2& f2 ) -> decltype(f2())
{
ticket_guard guard( latch );
f1();
guard.wait_for_my_turn();
return f2();
}
};

/**
* Calls function <code>f</code> in a separate thread and returns a future
* that can be used to wait on the result.
*
* @param f the operation to perform
*/
template<typename Functor>
auto do_parallel( Functor&& f, const char* desc FC_TASK_NAME_DEFAULT_ARG ) -> fc::future<decltype(f())> {
typedef decltype(f()) Result;
typedef typename fc::deduce<Functor>::type FunctorType;
fc::task<Result,sizeof(FunctorType)>* tsk =
new fc::task<Result,sizeof(FunctorType)>( fc::forward<Functor>(f), desc );
fc::future<Result> r(fc::shared_ptr< fc::promise<Result> >(tsk,true) );
detail::get_worker_pool().post( tsk );
return r;
}
}
2 changes: 2 additions & 0 deletions include/fc/thread/task.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ namespace fc {
};
void* get_task_specific_data(unsigned slot);
void set_task_specific_data(unsigned slot, void* new_value, void(*cleanup)(void*));
class idle_guard;
}

class task_base : virtual public promise_base {
Expand Down Expand Up @@ -53,6 +54,7 @@ namespace fc {
// thread/thread_private
friend class thread;
friend class thread_d;
friend class detail::idle_guard;
fwd<spin_lock,8> _spinlock;

// avoid rtti info for every possible functor...
Expand Down
28 changes: 24 additions & 4 deletions include/fc/thread/thread.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,37 @@ namespace fc {

namespace detail
{
class worker_pool;
void* get_thread_specific_data(unsigned slot);
void set_thread_specific_data(unsigned slot, void* new_value, void(*cleanup)(void*));
unsigned get_next_unused_task_storage_slot();
void* get_task_specific_data(unsigned slot);
void set_task_specific_data(unsigned slot, void* new_value, void(*cleanup)(void*));
}

/** Instances of this class can be used to get notifications when a thread is
* (or is no longer) idle.
*/
class thread_idle_notifier {
public:
virtual ~thread_idle_notifier() {}

/** This method is called when the thread is idle. If it returns a
* task_base it will be queued and executed immediately.
* @return a task to execute, or nullptr
*/
virtual task_base* idle() = 0;
/** This method is called when the thread is no longer idle, e. g. after
* it has woken up due to a timer or signal.
*/
virtual void busy() = 0;
};

class thread {
public:
thread( const std::string& name = "" );
thread( thread&& m );
thread& operator=(thread&& t );
thread( const std::string& name = "", thread_idle_notifier* notifier = 0 );
thread( thread&& m ) = delete;
thread& operator=(thread&& t ) = delete;

/**
* Returns the current thread.
Expand Down Expand Up @@ -130,11 +149,12 @@ namespace fc {
return wait_any_until(fc::move(proms), fc::time_point::now()+timeout_us );
}
private:
thread( class thread_d* );
thread( class thread_d* ); // parameter is ignored, will create a new thread_d
friend class promise_base;
friend class task_base;
friend class thread_d;
friend class mutex;
friend class detail::worker_pool;
friend void* detail::get_thread_specific_data(unsigned slot);
friend void detail::set_thread_specific_data(unsigned slot, void* new_value, void(*cleanup)(void*));
friend unsigned detail::get_next_unused_task_storage_slot();
Expand Down
21 changes: 10 additions & 11 deletions src/asio.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ namespace fc {
}
else
{
//elog( "${message} ", ("message", boost::system::system_error(ec).what()));
p->set_exception( fc::exception_ptr( new fc::exception(
FC_LOG_MESSAGE( error, "${message} ", ("message", boost::system::system_error(ec).what())) ) ) );
}
Expand All @@ -83,8 +82,6 @@ namespace fc {
}
p->set_value( eps );
} else {
//elog( "%s", boost::system::system_error(ec).what() );
//p->set_exception( fc::copy_exception( boost::system::system_error(ec) ) );
p->set_exception(
fc::exception_ptr( new fc::exception(
FC_LOG_MESSAGE( error, "process exited with: ${message} ",
Expand All @@ -104,10 +101,12 @@ namespace fc {
* @param num_threads the number of threads
*/
void default_io_service_scope::set_num_threads(uint16_t num_threads) {
FC_ASSERT(fc::asio::default_io_service_scope::num_io_threads == 0);
fc::asio::default_io_service_scope::num_io_threads = num_threads;
FC_ASSERT(num_io_threads == 0);
num_io_threads = num_threads;
}

uint16_t default_io_service_scope::get_num_threads() { return num_io_threads; }

/***
* Default constructor
*/
Expand All @@ -116,18 +115,18 @@ namespace fc {
io = new boost::asio::io_service();
the_work = new boost::asio::io_service::work(*io);

if (this->num_io_threads == 0)
if( num_io_threads == 0 )
{
// the default was not set by the configuration. Determine a good
// number of threads. Minimum of 8, maximum of hardware_concurrency
this->num_io_threads = std::max( boost::thread::hardware_concurrency(), 8u );
num_io_threads = std::max( boost::thread::hardware_concurrency(), 8U );
}

for( uint16_t i = 0; i < this->num_io_threads; ++i )
for( uint16_t i = 0; i < num_io_threads; ++i )
{
asio_threads.push_back( new boost::thread( [=]()
asio_threads.push_back( new boost::thread( [i,this]()
{
fc::thread::current().set_name("asio");
fc::thread::current().set_name( "fc::asio worker #" + fc::to_string(i) );

BOOST_SCOPE_EXIT(void)
{
Expand Down Expand Up @@ -194,7 +193,7 @@ namespace fc {
promise<std::vector<boost::asio::ip::tcp::endpoint> >::ptr p( new promise<std::vector<boost::asio::ip::tcp::endpoint> >("tcp::resolve completion") );
res.async_resolve( boost::asio::ip::tcp::resolver::query(hostname,port),
boost::bind( detail::resolve_handler<boost::asio::ip::tcp::endpoint,resolver_iterator>, p, _1, _2 ) );
return p->wait();;
return p->wait();
}
FC_RETHROW_EXCEPTIONS(warn, "")
}
Expand Down
Loading

0 comments on commit 460e7cc

Please sign in to comment.