C++标准程序库使用 future 来模拟一次性事件:若线程需等待某个特定的一次性事件发生,则会以恰当的方式取得一个 future,它代表目标事件;接着,该线程就能一边执行其他任务,一边在 future 上等待;
C++标准库有两种future,一种是独享future,即std::future,和共享futurestd::shared_future。它们的设计模式参考了unique_ptr和shared_ptr。对于同一件事, 仅仅允许关联唯一一个std::future 实例,但可以关联多个 std::shared_future实例。
对于std::shared_future, 一个 std::shared_future对象可能派生出多个副本,这些副本都指向同一个异步结果
future
std::future提供了一种访问异步操作结果的机制。名字很贴切作用。我们需要在未来的时候获取一个结果。
future对象有两个主要成员函数:
wait 等待结果,与mutex相似
get 获取结果,返回值就是线程函数的返回值。如果函数抛出异常,异常也会在这里抛出。只能调用一次
一个future对象可以由async,promise,packaged_task产生
async
C++中future可以通过async函数创建。async函数的原型是:
1 2 3 4 5 6 7
| template <class _Fty, class... _ArgTypes> future<_Fty,_ArgTypes...> async( _Fty&& _Fnarg, _ArgTypes&&... _Args)
template <class _Fty, class... _ArgTypes> future<_Fty, _ArgTypes> async( launch _Policy, _Fty&& _Fnarg, _ArgTypes&&... _Args)
|
第一个参数指明了线程的创建方式:
std::launch::deferred是延迟调用,表示线程入口函数调用被延迟到std::future对象调用wait()或者get()函数 调用才执行。如果wait()和get()没有调用,则不会创建新线程,也不执行函数;
std::launch::async表示强制这个异步任务在 新线程上执行,在调用std::async()函数的时候就开始创建线程。
std::launch::deferred|std::launch::async:
这里的“|”表示或者。如果没有给出launch参数,默认采用该种方式。
操作系统会自行评估选择 async or defer,如果系统资源紧张,则采用defer,就不会创建新线程。避免创建线程过长,导致崩溃。
如果想要async实现异步,建议显式填写第一个参数为std::launch::async。
其余两个参数就和thread里的构造参数一致。
非常简单的案例:
1 2 3 4 5 6 7 8 9 10 11 12
| int main() { auto fut = async([]() { this_thread::sleep_for(chrono::milliseconds(1000)); return this_thread::get_id();
});
auto x = fut.get(); cout << x; return 0; }
|
packaged_task
std::packaged_task 的作用是:把一个“可调用对象”(函数/λ/仿函数)包装成一个任务,并且能拿到与之关联的 std::future,以后在别的线程/某个时刻执行任务时,future 就能拿到结果或异常。
非常简单的应用:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| int main() {
std::packaged_task task([](int x) { this_thread::sleep_for(chrono::milliseconds(500)); return x * 2; });
auto fut = task.get_future();
std::thread t(std::move(task), 100);
t.join();
std::cout << fut.get();
return 0; }
|
std::packaged_task成员函数有:
get_future 获取future对象,只能调用一次
operator(...)执行被包装的函数
根据packaged_task包装任务和条件变量,我们可以设计一个非常简单的线程池:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86
| namespace SimplePool {
condition_variable cv{}; queue <packaged_task<int(int)>> tasks{}; bool stop{ false }; mutex mtx{}; mutex print_mtx{};
template<class ... _Ax> void print(_Ax&&... _args) { lock_guard lock(print_mtx); printf(std::forward<std::decay_t<_Ax>>(_args)...); }
void worker() { for (int i = 0;;++i) { packaged_task<int(int)> myTask; { unique_lock lock(mtx);
cv.wait(lock, [&]() {return !tasks.empty()||stop; });
if (stop) { break; } myTask = std::move(tasks.front()); tasks.pop(); print("[%d] run %d\n", (int)GetCurrentThreadId(), i); } myTask(i); } print("[%d] Stop\n", (int)GetCurrentThreadId()); }
template<class _Fx,class... _Ax> future<int> addTask(_Fx&& _Func,_Ax&&... _Args) { auto pack_func = bind( std::forward<std::decay_t<_Fx>>(_Func), std::forward<std::decay_t<_Ax>>(_Args)...);
packaged_task<int(int)> task([pack_func](int x)->int { pack_func(); return x * 2; });
future<int> tmp_fut = task.get_future(); { lock_guard lock(mtx); tasks.push(std::move(task)); } cv.notify_one();
return tmp_fut; } void stop_work() { { lock_guard lock(mtx); stop = true; } cv.notify_all(); }
void run() { vector<thread> pool;
for (int i = 0; i != 10; ++i) { pool.push_back(thread(worker)); }
for (int i = 0; i != 100; ++i) { addTask([](int de) { print("[%d] my task %d\n", (int)GetCurrentThreadId(), de); }, i); } this_thread::sleep_for(chrono::seconds(1)); stop_work();
for (auto& th : pool) { th.join(); } } }
|
promise
std::promise 用来在一个线程里产生结果,在另一个线程里通过 std::future 拿到结果/异常。可以把它理解成:promise 负责“交作业”,future 负责“收作业”。
与packaged_task一样,我们可以通过成员函数get_future获取到future对象,进而获取到结果。
非常简单的示例:
1 2 3 4 5 6 7 8 9 10 11 12
| int main() { std::promise<int> pro; auto fut = pro.get_future();
std::jthread t1([](std::promise<int> pro) { this_thread::sleep_for(std::chrono::milliseconds(1000)); pro.set_value(100); }, std::move(pro));
std::cout << fut.get(); return 0; }
|
同时,我们也可以传递异常,使用set_exception;我们也可以选择等到线程退出时,在来把值传递出去,我们可以使用set_value_at_thread_exit,异常版本是:set_exception_at_thread_exit
需要注意的是,set_value_at_thread_exit要求在线程退出时,promise对象还存在,否则就会报错。如果条件允许,建议在线程函数返回之前使用set_value
shared_future
std::shared_future 就是“可以被多次 get()、被多个线程同时等待/读取”的 future。它适合“一个结果要广播给很多消费者”的场景。
与future的关键区别
future.get()只能一次
shared_future.get()可以多次,而且可在多个线程调用
获取方式:
1 2
| std::future<int> f = ; std::shared_future<int> sf = f.share();
|
future有成员share直接获取到shared_future对象,注意,这时候future是无效了的
1 2 3 4 5 6 7 8 9 10 11 12 13
| int main() { auto sf = async(std::launch::async, [](int x) { this_thread::sleep_for(chrono::seconds(1)); return 100 + x; }, 10).share();
vector<jthread> pool; for (int i = 0; i != 10; ++i) { pool.emplace_back([sf, i]() { printf("[%d]get %d\n", i, sf.get()); }); } }
|
barrier和latch
std::barrier是C++20的线程卡,它的一次性兄弟类是:std::latch。 假定有一组线程在协同处理某些数据,各线程相互独立,分别处理数据,因此操作过程不必同步。但是,只有在全部线程都完成各自的处理后,才可以操作下一项数据或开始后续处理,std::barrier 针对的就是这种场景。为了同步一组线程 (synchronization group,下称“同步组”),我们创建线程卡,并指定参与同步的线程数目。线程在完成自身的处理后,就运行到线程卡处,通过在线程卡对象上调用 arrive_and_wait()等待同步组的其他线程。只要组内最后一个线程也运行至此,所有线程即被释放,线程卡会自我重置。
与latch不同的是,barrier会自我重置,而latch只能够使用一次。
线程闩的意义在于关闸拦截:一旦它进入了就绪状态,就始终保持不变。而线程卡 则不同,线程卡会释放等待的线程并且自我重置,因此它们可重复使用。