diff --git a/content/posts/cpp-concurrency.md b/content/posts/cpp-concurrency.md index 41bccb2..efea3c1 100644 --- a/content/posts/cpp-concurrency.md +++ b/content/posts/cpp-concurrency.md @@ -277,3 +277,150 @@ Better to extract a new private member function which does not lock the mutex * Waiting for one-off events with futures * Waiting with a time limit * Using the synchronization of operations to simplify code + +## condition_variables +为什么要条件变量:等event发生,要么不停地检查flag浪费资源;要么std::this_thread::sleep_for() 但是不好确定到底睡多久。 +```cpp +std::mutex mut; +std::queue data_queue; +std::condition_variable data_cond; +void data_preparation_thread() { + while (more_data_to_prepare()) { + data_chunk const data = prepare_data(); + { + std::lock_guard lk(mut); + data_queue.push(data); + } + data_cond.notify_one(); + } +} +void data_processing_thread() { + while (true) { + std::unique_lock lk(mut); + data_cond.wait(lk, [] { return !data_queue.empty(); }); + data_chunk data = data_queue.front(); + data_queue.pop(); + lk.unlock(); + process(data); + if (is_last_chunk(data)) + break; + } +} +``` +上面用unique_lock 搭配conditon_variable 是因为the waiting thread must unlock the mutex while it’s waiting and lock it again afterward, and std::lock_guard doesn’t provide that flexibility + +## futures +std::future<> 和 std::shared_future<> + +### Returning values from background tasks +use **std::async** to start an asynchronous task,returns a std::future object。 call get() on the future to block until returning the value. +```cpp +std::future the_answer = std::async(find_the_answer_to_ltuae); +do_other_stuff(); +std::cout << "The answer is " << the_answer.get() << std::endl; +``` + +```cpp +struct X { + void foo(int, std::string const &); + std::string bar(std::string const &); +}; +X x; +// Calls p->foo(42,"hello") where p is &x +auto f1 = std::async(&X::foo, &x, 42, "hello"); +// Calls tmpx.bar("goodbye") where tmpx is a copy of x +auto f2 = std::async(&X::bar, x, "goodbye"); +struct Y { + double operator()(double); +}; +``` + +an additional parameter to std::async before the function to call: std::launch, 要么是std::launch::deferred 等到wait() 或者get()被call的时候才调用;要么是std::launch::async 开一个新线程。或者std::launch::deferred | std::launch::async 代表由实现自己选择(默认)。 + +### Associating a task with a future +wrapping the task in an instance of the **std::packaged_task<>** class template or by writing code to explicitly set the values using the **std::promise<>** class template. + +当std::packaged_task<> 对象被invoke的时候,它call对应的function or callable object,让future ready. Can be used as a building block for thread pools + +```cpp +std::mutex m; +std::deque> tasks; +bool gui_shutdown_message_received(); +void get_and_process_gui_message(); +void gui_thread() { + while (!gui_shutdown_message_received()) { + get_and_process_gui_message(); + std::packaged_task task; + { + std::lock_guard lk(m); + if (tasks.empty()) + continue; + task = std::move(tasks.front()); + tasks.pop_front(); + } + task(); + } +} +std::thread gui_bg_thread(gui_thread); +template std::future post_task_for_gui_thread(Func f) { + std::packaged_task task(f); + std::future res = task.get_future(); + std::lock_guard lk(m); + tasks.push_back(std::move(task)); + return res; +} +``` +std::packaged_task wraps a function or other callable object + +### promises +tasks that can’t be expressed as a simple function call or those tasks where the result may come from more than one place, 第三种创建future的方式:std::promise + +std::promise/std::future pair:通过std::promise的get_future()成员函数获得std::future object。std::promise 通过set_value()设置value让 std::future object ready,future object就可以读到。 + +如果destroy the std::promise without setting a value: exception + +```cpp +void process_connections(connection_set &connections) { + while (!done(connections)) { + for (connection_iterator connection = connections.begin(), + end = connections.end(); + connection != end; ++connection) { + if (connection->has_incoming_data()) { + data_packet data = connection->incoming(); + std::promise &p = connection->get_promise(data.id); + p.set_value(data.payload); + } + if (connection->has_outgoing_data()) { + outgoing_packet data = connection->top_of_outgoing_queue(); + connection->send(data.payload); + data.promise.set_value(true); + } + } + } +} +``` + +### Saving an exception for the future +future.get() 会返回异常而不是value 当发生异常的时候 +```cpp +extern std::promise some_promise; +try { + some_promise.set_value(calculate_value()); +} catch (...) { + some_promise.set_exception(std::current_exception()); +} +// 如果知道type of the exception,最好用这个而不是try catch +some_promise.set_exception(std::make_exception_ptr(std::logic_error("foo "))); +``` + +### Waiting from multiple threads +用std::shared_future, multiple threads can wait for the same event +```cpp +// implicit transfer of ownership +std::shared_future sf(some_promise.get_future()); +// share() 方法将future变为shared_future +auto sf = p.get_future().share(); +``` + + +## Waiting with a time limit