引言
C++20 增加了一个非常重要的语言特性——协程(coroutine),co 表示 co-operative multi-tasking,协作式多任务, 就是说一个 co-routine 可以占据任意长的时间,但是一旦不需要 CPU,就要主动让出。
C++20 中引入了三个关键字用于跟 coroutine 交互,分别是 co_return
、co_yield
和 co_await
.
co_return
现在我们要用 coroutine 实现类似 std::future 的接口
Lazy make_lazy(std::function<int()> func) {
co_return func;
}
这段代码可以这样解读:
我们定义了一个名为
make_lazy
的 coroutine 工厂(coroutine factory)该工厂会给用户返回一个类型为
Lazy
的 handle每个
Lazy
类对象关联的 coroutine 接受一个func
参数,并执行{}
中的工作
某种程度上,可以这样类比到 std::jthread
:
class Lazy {
private:
std::function<int()> _func;
std::jthread _t;
mutable bool _done{false};
mutable int _value{};
mutable std::condition_variable _do_eval;
mutable std::mutex _m;
public:
explicit Lazy(std::function<int()> func)
: _func{std::move(func)}, t{thread_fn, std::ref(_func)} {}
int &get() const {
if (!_done) {
_do_eval.notify_one();
std::lock_guard guard{_m};
}
return _value;
}
private:
static void thread_fn(std::function<int()> &op) {
_do_eval.wait();
}
};
为了连接 lazy
的用户代码和 runtime,我们需要基于 std::coroutine_handle
模板实现一个用户接口,
具体地,C++ 要求这个类型必须有关联类型 promise_type
:
class Lazy {
public:
struct promise_type;
using handle_type = std::coroutine_handle<promise_type>;
struct promise_type {
std::function<int()> func;
//=== ignore these currently ===//
std::suspend_always initial_suspend() { return {}; }
std::suspend_always final_suspend() noexcept { return {}; }
Lazy get_return_object() { return Lazy{handle_type::from_promise(*this)}; }
void unhandled_exception() { std::abort(); }
//=== ignore these currently ===//
void return_value(std::function<int()> func) { this->func = func; }
};
public:
Lazy(handle_type h) : _coro{h} {}
public:
int &get() const {
if (!_eval) {
_value = _coro.promise().func();
_eval = true;
}
return _value;
}
private:
handle_type _coro;
mutable bool _eval{false};
mutable int _value;
};
如下代码会输出:
int main() {
auto delayed = lazy([] {
std::cout << __PRETTY_FUNCTION__ << '\n';
return 42;
});
std::cout << __PRETTY_FUNCTION__ << '\n';
std::cout << std::format("got {}", delayed.get()) << '\n';
}
int main() main()::<lambda()> got 42
这里需要关注的是 Lazy::promise_type::return_value
,编译器会将 co_return
处理为调用这个函数。
从这个例子可以看到 coroutine 的架构:
+-----------+ +-------------------+ +----------------+ +--------------+ +---------+ | User Code | -- | Coroutine Factory | -- | User Interface | -- | promise_type | -- | runtime | +-o---------+ +---o---------------+ +---o------------+ +--------------+ +---------+ | | | function `main` function `lazy` class `Lazy`
co_yield
跟 co_return
类似,co_yield
也是通过回调函数实现的,需要提供 yield_value
成员函数。
struct Range {
struct promise_type;
using handle_type = std::coroutine_handle<promise_type>;
struct promise_type {
int i{0};
std::suspend_never initial_suspend() { return {}; }
std::suspend_never final_suspend() noexcept { return {}; }
Range get_return_object() {
return Range{handle_type::from_promise(*this)};
}
void unhandled_exception() { std::abort(); }
std::suspend_always yield_value(int i) {
this->i = i;
return {};
}
};
handle_type coro;
Range(handle_type h) : coro{h} {}
bool has_more() const { return !coro.done(); }
void next() { coro.resume(); }
int get() const {
return coro.promise().i;
}
};
Range range(int end) {
for (int begin = 0; begin != end; begin++) {
co_yield begin;
}
}
int main() {
for (auto rng = range(10); rng.has_more(); rng.next()) {
std::cout << std::format("got {}", rng.get()) << '\n';
}
}
co_await
co_await
相对而言复杂一点,涉及到了新的概念:awaitable.
下面代码模拟了 SystemVerilog 中 #N
的调度:
#include <coroutine>
#include <cstdint>
#include <functional>
#include <iostream>
#include <map>
struct Scheduler {
struct Task {
std::coroutine_handle<> coro;
std::function<void()> cb;
};
std::multimap<uint64_t, Task> _queue;
auto delay(uint64_t time_step, std::function<void()> task) {
struct Awaitable {
uint64_t time_step;
decltype(_queue) &q;
std::function<void()> task;
bool await_ready() const noexcept { return false; }
void await_suspend(std::coroutine_handle<> h) { q.emplace(time_step, Task{h, task}); }
void await_resume() const noexcept {}
};
return Awaitable{time_step, _queue, std::move(task)};
}
void run() {
for (uint64_t t = 0; !_queue.empty(); t++) {
auto [first, last] = _queue.equal_range(t);
if (first == _queue.end()) continue;
for (auto it = first; it != last; ++it) {
auto &task = it->second;
if (task.cb) task.cb();
task.coro();
}
_queue.erase(first, last);
}
}
};
struct Coroutine {
struct promise_type {
std::suspend_never initial_suspend() { return {}; }
std::suspend_never final_suspend() noexcept { return {}; }
Coroutine get_return_object() { return {}; }
void unhandled_exception() { std::abort(); }
};
};
template <uint64_t N>
Coroutine f(Scheduler &sched) {
co_await sched.delay(N, [] { std::cout << N << '\n'; });
}
用如下的代码测试,输出为 1、3 和 5,符合预期:
int main() {
Scheduler sched;
// #5;
f<5>(sched);
// #1;
f<1>(sched);
// #3;
f<3>(sched); sched.run();
}
在上面的示例中,Coroutine
基本上起到的是占位的作用,唯一需要注意的就是 initial_suspend
的返回类型为 suspend_never
而不是 suspend_always
,否则执行流不会进入到 Scheduler::delay
中,这是因为 f
基本上被处理成了这样(详情请参见 dcl.fct.def.coroutine):
template <uint64_t N>
Coroutine f(Scheduler &sched) {
ALLOCATE_FRAME_FOR_COROUTINE();
Coroutine::promise_type prom; (1)
auto ret = prom.get_return_object(); (2)
//=== 由于是 suspend_never,这一段没什么用 ===//
auto s1 = prom.initial_suspend();
if (!s1.await_ready()) {
s1.await_suspend();
// Return `ret` to caller.
TRANSFER_CONTROL();
}
s1.await_resume();
//===========================================//
auto awaitable = sched.delay(N, [] { std::cout << N << '\n'; });
// 同上 ...
auto s2 = prom.final_suspend();
// 同上 ...
}
也就是说,在这个场景中,Awaitable::await_resume
是没有被调用的,coroutine 的恢复是通过直接调用
std::coroutine_handle
的 operator()
实现的。当然,await_resume
还是有作用的,它提供了
co_await
的返回值。
Member Function | Description |
---|---|
Default constructor | A promise must be default constructible. |
initial_suspend() | Determines if the coroutine suspends before it runs. |
get_return_object() | Returns the coroutine object (resumable object). |
final_suspend() noexcept | Determines if the coroutine suspends before it ends. |
unhandled_exception() | Called when an exception happens. |
return_value(val) | Is invoked by |
return_void() | Is invoked by |
yield_value(val) | Is invoked by |
Function | Description |
---|---|
await_ready | Indicates if the result is ready. When it returns false, await_suspend is called. |
await_suspend | Schedule the coroutine for resumption or destruction. |
await_resume | Provides the result for the co_await exp expression. |
基于 coroutine 的 AST 迭代器
下面以 GCC 14.2 的实现为准,Clang 可能会不同:
偏移量 | 字节数 | 内容 |
---|---|---|
0 | 8 | Frame .actor 的函数指针 |
8 | 8 | Frame .destroy 的函数指针 |
16 | 1 | promise_type 对象 |
24 | 8 | coroutine_handle 对象 |
32 | 2 | |
34 | 1 | |
35 | 1 | |
36 | 1 |
|
37 | 1 |
|
Clang 中的实现
Parser
代码位于 ParseExpr.cpp 中的 Parser::ParseCastExpression
:
case tok::kw_co_await: { // unary-expression: 'co_await' cast-expression
if (NotPrimaryExpression)
*NotPrimaryExpression = true;
SourceLocation CoawaitLoc = ConsumeToken();
Res = ParseCastExpression(AnyCastExpr);
if (!Res.isInvalid())
Res = Actions.ActOnCoawaitExpr(getCurScope(), CoawaitLoc, Res.get());
return Res;
}
其中 Actions
是一个类型为 Sema
的成员变量,函数 ActOnCoawaitExpr
的定义如下:
ExprResult Sema::ActOnCoawaitExpr(Scope *S, SourceLocation Loc, Expr *E) {
if (!checkSuspensionContext(*this, Loc, "co_await")) (1)
return ExprError();
if (!ActOnCoroutineBodyStart(S, Loc, "co_await")) {
CorrectDelayedTyposInExpr(E);
return ExprError();
}
if (E->hasPlaceholderType()) {
ExprResult R = CheckPlaceholderExpr(E);
if (R.isInvalid()) return ExprError();
E = R.get();
}
ExprResult Lookup = BuildOperatorCoawaitLookupExpr(S, Loc);
if (Lookup.isInvalid())
return ExprError();
return BuildUnresolvedCoawaitExpr(Loc, E,
cast<UnresolvedLookupExpr>(Lookup.get()));
}
1 | 函数 checkSuspensionContext 检查标准中的规定:“[expr.await]: An await-expression shall appear only in
a potentially evaluated expression within the compound-statement of a
function-body outside of an exception handler [...] A context within a function
where an await-expression can appear is called a suspension context of the
function.” |