C++标准程序库使用 future 来模拟一次性事件:若线程需等待某个特定的一次性事件发生,则会以恰当的方式取得一个 future,它代表目标事件;接着,该线程就能一边执行其他任务,一边在 future 上等待;

C++标准库有两种future,一种是独享future,即std::future,和共享futurestd::shared_future。它们的设计模式参考了unique_ptrshared_ptr。对于同一件事, 仅仅允许关联唯一一个std::future 实例,但可以关联多个 std::shared_future实例。

对于std::shared_future, 一个 std::shared_future对象可能派生出多个副本,这些副本都指向同一个异步结果

future

std::future提供了一种访问异步操作结果的机制。名字很贴切作用。我们需要在未来的时候获取一个结果。

future对象有两个主要成员函数:

  • wait 等待结果,与mutex相似
  • get 获取结果,返回值就是线程函数的返回值。如果函数抛出异常,异常也会在这里抛出。只能调用一次

一个future对象可以由asyncpromisepackaged_task产生

async

C++中future可以通过async函数创建。async函数的原型是:

1
2
3
4
5
6
7
template <class _Fty, class... _ArgTypes>
future<_Fty,_ArgTypes...> async(
_Fty&& _Fnarg, _ArgTypes&&... _Args)

template <class _Fty, class... _ArgTypes>
future<_Fty, _ArgTypes> async(
launch _Policy, _Fty&& _Fnarg, _ArgTypes&&... _Args)

第一个参数指明了线程的创建方式:

  • std::launch::deferred是延迟调用,表示线程入口函数调用被延迟到std::future对象调用wait()或者get()函数 调用才执行。如果wait()get()没有调用,则不会创建新线程,也不执行函数;
  • std::launch::async表示强制这个异步任务在 新线程上执行,在调用std::async()函数的时候就开始创建线程。
  • std::launch::deferred|std::launch::async:
    这里的“|”表示或者。如果没有给出launch参数,默认采用该种方式。
    操作系统会自行评估选择 async or defer,如果系统资源紧张,则采用defer,就不会创建新线程。避免创建线程过长,导致崩溃。

如果想要async实现异步,建议显式填写第一个参数为std::launch::async

其余两个参数就和thread里的构造参数一致。

非常简单的案例:

1
2
3
4
5
6
7
8
9
10
11
12
int main() {
auto fut = async([]() {

this_thread::sleep_for(chrono::milliseconds(1000));
return this_thread::get_id();

});

auto x = fut.get();
cout << x;
return 0;
}

packaged_task

std::packaged_task 的作用是:把一个“可调用对象”(函数/λ/仿函数)包装成一个任务,并且能拿到与之关联的 std::future,以后在别的线程/某个时刻执行任务时,future 就能拿到结果或异常。

非常简单的应用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
int main() {

std::packaged_task task([](int x) {
this_thread::sleep_for(chrono::milliseconds(500));
return x * 2;
});

auto fut = task.get_future();

std::thread t(std::move(task), 100);

t.join();

std::cout << fut.get();

return 0;
}

std::packaged_task成员函数有:

  • get_future 获取future对象,只能调用一次
  • operator(...)执行被包装的函数

根据packaged_task包装任务和条件变量,我们可以设计一个非常简单的线程池:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
namespace SimplePool {
//非常简单的线程池
condition_variable cv{};
queue <packaged_task<int(int)>> tasks{};
bool stop{ false };
mutex mtx{};
mutex print_mtx{};

template<class ... _Ax>
void print(_Ax&&... _args) {
lock_guard lock(print_mtx);
printf(std::forward<std::decay_t<_Ax>>(_args)...);
//fflush(stdout);
}

void worker() {
for (int i = 0;;++i) {
packaged_task<int(int)> myTask;
{//互斥锁持有区域
unique_lock lock(mtx);

cv.wait(lock, [&]() {return !tasks.empty()||stop; });

if (stop) {
break;
}
myTask = std::move(tasks.front());
tasks.pop();
print("[%d] run %d\n", (int)GetCurrentThreadId(), i);

}
myTask(i);
}
print("[%d] Stop\n", (int)GetCurrentThreadId());
}

template<class _Fx,class... _Ax>
future<int> addTask(_Fx&& _Func,_Ax&&... _Args) {
auto pack_func = bind(
std::forward<std::decay_t<_Fx>>(_Func),
std::forward<std::decay_t<_Ax>>(_Args)...);

packaged_task<int(int)> task([pack_func](int x)->int {
pack_func();
return x * 2;
});

//加入到线程池
future<int> tmp_fut = task.get_future();
{
lock_guard lock(mtx);
tasks.push(std::move(task));
}
cv.notify_one();

return tmp_fut;
}
void stop_work() {
{
lock_guard lock(mtx);
stop = true;
}
cv.notify_all();
}

void run() {
vector<thread> pool;

for (int i = 0; i != 10; ++i) {
pool.push_back(thread(worker));
}

for (int i = 0; i != 100; ++i) {
addTask([](int de) {
print("[%d] my task %d\n", (int)GetCurrentThreadId(), de);
}, i);
}
this_thread::sleep_for(chrono::seconds(1));

stop_work();

for (auto& th : pool) {
th.join();
}
}
}

promise

std::promise 用来在一个线程里产生结果,在另一个线程里通过 std::future 拿到结果/异常。可以把它理解成:promise 负责“交作业”,future 负责“收作业”。

packaged_task一样,我们可以通过成员函数get_future获取到future对象,进而获取到结果。

非常简单的示例:

1
2
3
4
5
6
7
8
9
10
11
12
int main() {
std::promise<int> pro;
auto fut = pro.get_future();

std::jthread t1([](std::promise<int> pro) {
this_thread::sleep_for(std::chrono::milliseconds(1000));
pro.set_value(100);
}, std::move(pro));

std::cout << fut.get();
return 0;
}

同时,我们也可以传递异常,使用set_exception;我们也可以选择等到线程退出时,在来把值传递出去,我们可以使用set_value_at_thread_exit,异常版本是:set_exception_at_thread_exit

需要注意的是,set_value_at_thread_exit要求在线程退出时,promise对象还存在,否则就会报错。如果条件允许,建议在线程函数返回之前使用set_value

shared_future

std::shared_future 就是“可以被多次 get()、被多个线程同时等待/读取”的 future。它适合“一个结果要广播给很多消费者”的场景。

future的关键区别

  • future.get()只能一次
  • shared_future.get()可以多次,而且可在多个线程调用

获取方式:

1
2
std::future<int> f = /* ... */;
std::shared_future<int> sf = f.share(); // f 变成无效,结果由 sf 共享

future有成员share直接获取到shared_future对象,注意,这时候future是无效了的

1
2
3
4
5
6
7
8
9
10
11
12
13
int main() {
auto sf = async(std::launch::async, [](int x) {
this_thread::sleep_for(chrono::seconds(1));
return 100 + x;
}, 10).share();

vector<jthread> pool;
for (int i = 0; i != 10; ++i) {
pool.emplace_back([sf, i]() {
printf("[%d]get %d\n", i, sf.get());
});
}
}

barrier和latch

std::barrier是C++20的线程卡,它的一次性兄弟类是:std::latch。 假定有一组线程在协同处理某些数据,各线程相互独立,分别处理数据,因此操作过程不必同步。但是,只有在全部线程都完成各自的处理后,才可以操作下一项数据或开始后续处理,std::barrier 针对的就是这种场景。为了同步一组线程 (synchronization group,下称“同步组”),我们创建线程卡,并指定参与同步的线程数目。线程在完成自身的处理后,就运行到线程卡处,通过在线程卡对象上调用 arrive_and_wait()等待同步组的其他线程。只要组内最后一个线程也运行至此,所有线程即被释放,线程卡会自我重置。

latch不同的是,barrier会自我重置,而latch只能够使用一次。

线程闩的意义在于关闸拦截:一旦它进入了就绪状态,就始终保持不变。而线程卡 则不同,线程卡会释放等待的线程并且自我重置,因此它们可重复使用。