Cpp Concurrency
+ + +#include <iostream>
+#include <thread>
+void hello() { std::cout << "Hello Concurrent World\n"; }
+int main() {
+ std::cout << "first main" << std::endl;
+ std::thread t(hello);
+ std::cout << "from main thread!" << std::endl;
+ // t.join();
+ t.detach();
+}
+
等待:join
+不等:detach 如后台监控线程
+精细化控制: condition variables and futures
+join之前发生exception怎么办呢:try catch. 更好的方式:RAII
+class thread_guard {
+ std::thread &t;
+
+public:
+ explicit thread_guard(std::thread &t_) : t(std::move(t_)) {
+ if (!t.joinable()) {
+ throw std::logic_error(“No thread”);
+ }
+ }
+ ~thread_guard() {
+ t.join();
+ }
+ thread_guard(thread_guard const &) = delete;
+ thread_guard &operator=(thread_guard const &) = delete;
+};
+
+// usage
+std::thread t(my_func);
+thread_guard g(t);
+
向thread对象传参的时候要注意隐式类型转换:
+char buffer[1024];
+// dangling pointer alert!
+std::thread t(f,3,buffer);
+// Ok
+std::thread t(f,3,std::string(buffer));
+
thread对象的function的参数要引用的情况:
+// widget_data 需要引用
+void update_data_for_widget(widget_id w, widget_data &data);
+void oops_again(widget_id w) {
+ widget_data data;
+ // 这里先拷贝w 和 data,然后传rvalues 给需要non-const reference的function
+ // compile error!
+ std::thread t(update_data_for_widget, w, data);
+ // 解决办法:
+ std::thread t(update_data_for_widget, w, std::ref(data));
+ display_status();
+ t.join();
+ process_widget_data(data);
+}
+
std::thread像std::bind一样
+class X {
+public:
+ void do_lengthy_work();
+};
+X my_x;
+// 第三个参数开始就是成员函数的参数
+std::thread t(&X::do_lengthy_work, &my_x);
+
可以把unique_ptr move进参数。thread对象本身也是可以move不可以copy的
+std::thread t1(some_function);
+std::thread t2 = std::thread(some_other_function);
+// terminate! you must explicitly wait for a thread to complete or detach it before destruction,
+t1 = std::move(t2);
+
用std::vector管理std::thread 可以在runtime创建动态数量的thread
+thread id : std::thread::get_id() 或者 std::this_thread::get_id()
+Sharing Data with mutex
+If all shared data is read-only, there’s no problem。
+std::mutex -> std::lock_guard -> std::scoped_lock
+Don’t pass pointers and references to protected data outside the scope of the lock,
+避免死锁的方法:
+-
+
- lock the two mutexes in a fixed order +
- std::lock 同时锁多个mutex 而不会死锁 +
void swap(X &lhs, X &rhs) {
+ if (&lhs == &rhs)
+ return;
+ std::lock(lhs.m, rhs.m);
+ // 注意 adopt_lock
+ std::lock_guard<std::mutex> lock_a(lhs.m, std::adopt_lock);
+ std::lock_guard<std::mutex> lock_b(rhs.m, std::adopt_lock);
+ swap(lhs.some_detail, rhs.some_detail);
+}
+
用 scoped_lock 简化:
+void swap(X &lhs, X &rhs) {
+ if (&lhs == &rhs)
+ return;
+ std::scoped_lock guard(lhs.m, rhs.m);
+ swap(lhs.some_detail, rhs.some_detail);
+}
+
-
+
- don’t wait for another thread if there’s a chance it’s waiting for you +
- AVOID NESTED LOCKS 已经有一个锁的时候就不要再acquire lock +
- AVOID CALLING USER-SUPPLIED CODE WHILE HOLDING A LOCK +
- USE A LOCK HIERARCHY: 从上层锁到下层 +
比lock_guard更灵活的 std::unique_lock
+using std::unique_lock and std::defer_lock B, rather than std::lock_guard and std::adopt_lock. 更灵活但更耗空间,更慢。最好还是用scoped_lock。
+void swap(X &lhs, X &rhs) {
+ if (&lhs == &rhs)
+ return;
+ std::unique_lock<std::mutex> lock_a(lhs.m, std::defer_lock);
+ std::unique_lock<std::mutex> lock_b(rhs.m, std::defer_lock);
+ std::lock(lock_a, lock_b);
+ swap(lhs.some_detail, rhs.some_detail);
+}
+
Transferring mutex ownership between scopes
+因为std::unique_lock不拥有mutex,mutex的ownership可以在intance之间move
+Locking at an appropriate granularity
+void get_and_process_data() {
+ std::unique_lock<std::mutex> my_lock(the_mutex);
+ some_class data_to_process = get_next_data_chunk();
+ my_lock.unlock();
+ result_type result = process(data_to_process);
+ my_lock.lock();
+ write_result(data_to_process, result);
+}
+
appropriate granularity不仅指data的数量,还指锁的时长
+class Y {
+private:
+ int some_detail;
+ mutable std::mutex m;
+ int get_detail() const {
+ std::lock_guard<std::mutex> lock_a(m);
+ return some_detail;
+ }
+
+public:
+ Y(int sd) : some_detail(sd) {}
+ friend bool operator==(Y const &lhs, Y const &rhs) {
+ if (&lhs == &rhs)
+ return true;
+ int const lhs_value = lhs.get_detail();
+ int const rhs_value = rhs.get_detail();
+ return lhs_value == rhs_value;
+ }
+};
+
if you don’t hold the required locks for the entire duration of an operation, you’re exposing yourself to race conditions.上面相等只能说明lhs在某一时间跟rhs在另外一个时间是相等。
+除mutex 别的保护shared data的方法
+One common case is where the shared data needs protection only from concurrent access while it’s being initialized, but after that no explicit synchronization is required(比如data创建之后就是read only的了)
+Protecting shared data during initialization
+-
+
-
+
单线程的时候可以Lazy initialization:看一下没创建的话再去创建。
+
+ -
+
naive加锁的多线程方法unnecessary serialization,性能太差
+
+ -
+
double checked locking 会race:
+
+
void undefined_behaviour_with_double_checked_locking() {
+ if (!resource_ptr) { // read
+ std::lock_guard<std::mutex> lk(resource_mutex);
+ if (!resource_ptr) {
+ resource_ptr.reset(new some_resource); // write
+ }
+ }
+ resource_ptr->do_something();
+}
+
read 和 write 没有同步,会race。 +even if a thread sees the pointer written by another thread, it might not see the newly created instance of some_resource, resulting in the call to do_something() e operating on incorrect values.
+-
+
- std::once_flag 和 std::call_once +
std::shared_ptr<some_resource> resource_ptr;
+std::once_flag resource_flag;
+void init_resource() { resource_ptr.reset(new some_resource); }
+void foo() {
+ // the pointer will have been initialized by some thread (synchronizly) by the time std::call_once returns
+ // synchronization data 存在 once_flag 里
+ std::call_once(resource_flag, init_resource);
+ resource_ptr->do_something();
+}
+
除了通过function的方式,还可以被用在lazy initialization of class +members
+class X {
+private:
+ connection_info connection_details;
+ connection_handle connection;
+ std::once_flag connection_init_flag;
+ void open_connection() {
+ connection = connection_manager.open(connection_details);
+ }
+
+public:
+ X(connection_info const &connection_details_)
+ : connection_details(connection_details_) {}
+ void send_data(data_packet const &data) {
+ std::call_once(connection_init_flag, &X::open_connection, this);
+ connection.send_data(data);
+ }
+ data_packet receive_data() {
+ std::call_once(connection_init_flag, &X::open_connection, this);
+ return connection.receive_data();
+ }
+};
+
上面的例子中 send_data() 和 receive_data() 两者中更先被call的完成init.
+像mutex一样,once_flag也不能被copy、move
+-
+
- local static variables的初始化occur the first time control passes through its declaration。多线程环境下这就会race,c++11之后的compiler就没问题了。所以可以通过这种方式替代call_once() where a single global instance is required +
Protecting rarely updated data structures
+比如DNS信息:读写锁:std::shared_mutex 和 std::shared_timed_mutex.
+通过std::shared_lockstd::shared_mutex 获得shared access; +通过std::lock_guardstd::shared_mutex 或者std::unique_lockstd::shared_mutex 获得 exclusive access
+(mutable 关键词:在const 函数中也可以修改mutable变量,所以适合用在mutex上,因为mutex要lock、unlock)
+Recursive locking
+重复锁同一个mutex是UB.
+std::recursive_mutex:可重复锁,(但要记得锁了几次最后还得解锁几次,通过RAII就行)
+Most of the time, if you think you want a recursive mutex, you probably need to change your design instead.
+common use: 每一个public成员函数都lock了,一个public成员函数会call 另一个. +But such usage is not recommended because it can lead to sloppy thinking and bad design. The class invariants are typically broken.
+Better to extract a new private member function which does not lock the mutex
+Synchronizing concurrent operations
+上面讲的是保护data, 这里讲同步action:condition variables 和 futures。还有latches and barriers
+-
+
- Waiting for an event +
- Waiting for one-off events with futures +
- Waiting with a time limit +
- Using the synchronization of operations to simplify code +
condition_variables
+为什么要条件变量:等event发生,要么不停地检查flag浪费资源;要么std::this_thread::sleep_for() 但是不好确定到底睡多久。
+std::mutex mut;
+std::queue<data_chunk> data_queue;
+std::condition_variable data_cond;
+void data_preparation_thread() {
+ while (more_data_to_prepare()) {
+ data_chunk const data = prepare_data();
+ {
+ std::lock_guard<std::mutex> lk(mut);
+ data_queue.push(data);
+ }
+ data_cond.notify_one();
+ }
+}
+void data_processing_thread() {
+ while (true) {
+ std::unique_lock<std::mutex> lk(mut);
+ data_cond.wait(lk, [] { return !data_queue.empty(); });
+ data_chunk data = data_queue.front();
+ data_queue.pop();
+ lk.unlock();
+ process(data);
+ if (is_last_chunk(data))
+ break;
+ }
+}
+
上面用unique_lock 搭配conditon_variable 是因为the waiting thread must unlock the mutex while it’s waiting and lock it again afterward, and std::lock_guard doesn’t provide that flexibility
+futures
+std::future<> 和 std::shared_future<>
+Returning values from background tasks
+use std::async to start an asynchronous task,returns a std::future object。 call get() on the future to block until returning the value.
+std::future<int> the_answer = std::async(find_the_answer_to_ltuae);
+do_other_stuff();
+std::cout << "The answer is " << the_answer.get() << std::endl;
+
struct X {
+ void foo(int, std::string const &);
+ std::string bar(std::string const &);
+};
+X x;
+// Calls p->foo(42,"hello") where p is &x
+auto f1 = std::async(&X::foo, &x, 42, "hello");
+// Calls tmpx.bar("goodbye") where tmpx is a copy of x
+auto f2 = std::async(&X::bar, x, "goodbye");
+struct Y {
+ double operator()(double);
+};
+
an additional parameter to std::async before the function to call: std::launch, 要么是std::launch::deferred 等到wait() 或者get()被call的时候才调用;要么是std::launch::async 开一个新线程。或者std::launch::deferred | std::launch::async 代表由实现自己选择(默认)。
+Associating a task with a future
+wrapping the task in an instance of the std::packaged_task<> class template or by writing code to explicitly set the values using the std::promise<> class template.
+当std::packaged_task<> 对象被invoke的时候,它call对应的function or callable object,让future ready. Can be used as a building block for thread pools
+std::mutex m;
+std::deque<std::packaged_task<void()>> tasks;
+bool gui_shutdown_message_received();
+void get_and_process_gui_message();
+void gui_thread() {
+ while (!gui_shutdown_message_received()) {
+ get_and_process_gui_message();
+ std::packaged_task<void()> task;
+ {
+ std::lock_guard<std::mutex> lk(m);
+ if (tasks.empty())
+ continue;
+ task = std::move(tasks.front());
+ tasks.pop_front();
+ }
+ task();
+ }
+}
+std::thread gui_bg_thread(gui_thread);
+template <typename Func> std::future<void> post_task_for_gui_thread(Func f) {
+ std::packaged_task<void()> task(f);
+ std::future<void> res = task.get_future();
+ std::lock_guard<std::mutex> lk(m);
+ tasks.push_back(std::move(task));
+ return res;
+}
+
std::packaged_task wraps a function or other callable object
+promises
+tasks that can’t be expressed as a simple function call or those tasks where the result may come from more than one place, 第三种创建future的方式:std::promise
+std::promise/std::future pair:通过std::promise的get_future()成员函数获得std::future object。std::promise 通过set_value()设置value让 std::future object ready,future object就可以读到。
+如果destroy the std::promise without setting a value: exception
+void process_connections(connection_set &connections) {
+ while (!done(connections)) {
+ for (connection_iterator connection = connections.begin(),
+ end = connections.end();
+ connection != end; ++connection) {
+ if (connection->has_incoming_data()) {
+ data_packet data = connection->incoming();
+ std::promise<payload_type> &p = connection->get_promise(data.id);
+ p.set_value(data.payload);
+ }
+ if (connection->has_outgoing_data()) {
+ outgoing_packet data = connection->top_of_outgoing_queue();
+ connection->send(data.payload);
+ data.promise.set_value(true);
+ }
+ }
+ }
+}
+
Saving an exception for the future
+future.get() 会返回异常而不是value 当发生异常的时候
+extern std::promise<double> some_promise;
+try {
+ some_promise.set_value(calculate_value());
+} catch (...) {
+ some_promise.set_exception(std::current_exception());
+}
+// 如果知道type of the exception,最好用这个而不是try catch
+some_promise.set_exception(std::make_exception_ptr(std::logic_error("foo ")));
+
Waiting from multiple threads
+用std::shared_future, multiple threads can wait for the same event
+// implicit transfer of ownership
+std::shared_future<std::string> sf(some_promise.get_future());
+// share() 方法将future变为shared_future
+auto sf = p.get_future().share();
+
Waiting with a time limit
+memory order and atomic
+memory order是线程之间进行顺序绑定用的。
+release sequence 指一个release 可以和后面的很多个acquire 同步
+如果acquire操作看见release fence后面的store的结果,那fence就和acquire同步;如果acquire fence后面的load看见release操作的结果,那release和fence同步。
+Release fence可以防止fence前的内存操作重排到fence后的任意store之后,即阻止loadstore重排和storestore重排。
+acquire fence可以防止fence后的内存操作重排到fence前的任意load之前,即阻止loadload重排和loadstore重排
+std::atomic_thread_fence(memory_order_acq_rel)和std::atomic_thread_fence(memory_order_seq_cst)都是full fence。
+基于atomic_thread_fence(外加一个任意序的原子变量操作)的同步和基于原子操作的同步很类似,比如最常用的,都可以形成release acquire语义,但是从上面的描述可以看出,fence的效果要比基于原子变量的效果更强,在weak memory order平台的开销也更大。
+以release为例,对于基于原子变量的release opration,仅仅是阻止前面的内存操作重排到该release opration之后,而release fence则是阻止重排到fence之后的任意store operation之后。
+因为fence的同步效果和原子操作上的同步效果比较相似,可以互相组合,自然的,使用fence的同步会有三种情况, fence - atomic同步,fence - fence同步和atomic-fence -同步。
+If a non-atomic operation is sequenced before an atomic operation, and that atomic operation happens before an operation in another thread, the non-atomic operation also happens before that operation in the other thread. 参考代码:5.3.6的 listing5.13 +所以mutex的实现的基础就是这个:lock()是acquire,unlock是release,所以unlock之前的操作在unlock之前,也就在下一个thread的lock之前。
+原子变量实现自旋锁:https://blog.51cto.com/quantfabric/2581173 相比mutex,是busy waiting而不是sleep waiting,不会使线程阻塞;少了用户态内核态的切换。
+无锁编程性能并不一定更好,比如cache受影响。
+原子变量的底层实现可能会使用mutex。
+