引言

C++20 增加了一个非常重要的语言特性——协程(coroutine),co 表示 co-operative multi-tasking,协作式多任务, 就是说一个 co-routine 可以占据任意长的时间,但是一旦不需要 CPU,就要主动让出。

C++20 中引入了三个关键字用于跟 coroutine 交互,分别是 co_returnco_yieldco_await.

co_return

现在我们要用 coroutine 实现类似 std::future 的接口

Lazy make_lazy(std::function<int()> func) {
  co_return func;
}

这段代码可以这样解读:

  • 我们定义了一个名为 make_lazycoroutine 工厂(coroutine factory)

  • 该工厂会给用户返回一个类型为 Lazy 的 handle

  • 每个 Lazy 类对象关联的 coroutine 接受一个 func 参数,并执行 {} 中的工作

为什么要设计成这样?

首先,协程依然是一种函数,因此复用函数的定义方式是完全合理的。然而,转移控制流的能力也意味着必须要提供一种恢复控制流的机制,这就要求有一个用户可以访问的 handle. 思考一下如何实现 Python 中类似的语法:

def fibonacci():
    a, b = 1, 1
    while True:
        yield a
        a, b = b, a+b

i = 0
for n in fibonacci():
    print(n)
    i += 1
    if i >= 10:
        break

类比到 C++ 中,就是:

Iterable fibonacci() {
  int64_t a = 1, b = 1;
  while (true) {
    co_yield a;
    int c = a+b;
    a = b, b = c;
  }
}

void foo() {
  int i = 0;
  for (auto n : fibonacci()) {
    std::cout << n << '\n';
    if (++i >= 10)
      break;
  }
}

既然 fibonacci 的返回值能再 range-based for 中使用,就必须支持 beginend

某种程度上,可以这样类比到 std::jthread

class Lazy {
 private:
  std::function<int()> _func;

  std::jthread _t;

  mutable bool _done{false};
  mutable int _value{};
  mutable std::condition_variable _do_eval;
  mutable std::mutex _m;

 public:
  explicit Lazy(std::function<int()> func)
      : _func{std::move(func)}, t{thread_fn, std::ref(_func)} {}

  int &get() const {
    if (!_done) {
        _do_eval.notify_one();
        std::lock_guard guard{_m};
    }
    return _value;
  }

 private:
  static void thread_fn(std::function<int()> &op) {
    _do_eval.wait();
  }
};

为了连接 lazy 的用户代码和 runtime,我们需要基于 std::coroutine_handle 模板实现一个用户接口, 具体地,C++ 要求这个类型必须有关联类型 promise_type

class Lazy {
 public:
  struct promise_type;
  using handle_type = std::coroutine_handle<promise_type>;

  struct promise_type {
    std::function<int()> func;

    //=== ignore these currently ===//
    std::suspend_always initial_suspend() { return {}; }
    std::suspend_always final_suspend() noexcept { return {}; }
    Lazy get_return_object() { return Lazy{handle_type::from_promise(*this)}; }
    void unhandled_exception() { std::abort(); }
    //=== ignore these currently ===//

    void return_value(std::function<int()> func) { this->func = func; }
  };

 public:
  Lazy(handle_type h) : _coro{h} {}

 public:
  int &get() const {
    if (!_eval) {
      _value = _coro.promise().func();
      _eval = true;
    }
    return _value;
  }

 private:
  handle_type _coro;
  mutable bool _eval{false};
  mutable int _value;
};

如下代码会输出:

int main() {
  auto delayed = lazy([] {
    std::cout << __PRETTY_FUNCTION__ << '\n';
    return 42;
  });
  std::cout << __PRETTY_FUNCTION__ << '\n';
  std::cout << std::format("got {}", delayed.get()) << '\n';
}
int main()
main()::<lambda()>
got 42

这里需要关注的是 Lazy::promise_type::return_value,编译器会将 co_return 处理为调用这个函数。

从这个例子可以看到 coroutine 的架构:

+-----------+    +-------------------+    +----------------+    +--------------+    +---------+
| User Code | -- | Coroutine Factory | -- | User Interface | -- | promise_type | -- | runtime |
+-o---------+    +---o---------------+    +---o------------+    +--------------+    +---------+
  |                  |                        |
 function `main`    function `lazy`          class `Lazy`

co_yield

co_return 类似,co_yield 也是通过回调函数实现的,需要提供 yield_value 成员函数。

struct Range {
  struct promise_type;
  using handle_type = std::coroutine_handle<promise_type>;

  struct promise_type {
    int i{0};

    std::suspend_never initial_suspend() { return {}; }
    std::suspend_never final_suspend() noexcept { return {}; }
    Range get_return_object() {
      return Range{handle_type::from_promise(*this)};
    }
    void unhandled_exception() { std::abort(); }

    std::suspend_always yield_value(int i) {
      this->i = i;
      return {};
    }
  };

  handle_type coro;

  Range(handle_type h) : coro{h} {}

  bool has_more() const { return !coro.done(); }
  void next() { coro.resume(); }
  int get() const {
    return coro.promise().i;
  }
};

Range range(int end) {
  for (int begin = 0; begin != end; begin++) {
    co_yield begin;
  }
}

int main() {
  for (auto rng = range(10); rng.has_more(); rng.next()) {
    std::cout << std::format("got {}", rng.get()) << '\n';
  }
}

co_await

co_await 相对而言复杂一点,涉及到了新的概念:awaitable.

下面代码模拟了 SystemVerilog 中 #N 的调度:

#include <coroutine>
#include <cstdint>
#include <functional>
#include <iostream>
#include <map>

struct Scheduler {
  struct Task {
    std::coroutine_handle<> coro;
    std::function<void()> cb;
  };

  std::multimap<uint64_t, Task> _queue;

  auto delay(uint64_t time_step, std::function<void()> task) {
    struct Awaitable {
      uint64_t time_step;
      decltype(_queue) &q;
      std::function<void()> task;

      bool await_ready() const noexcept { return false; }
      void await_suspend(std::coroutine_handle<> h) { q.emplace(time_step, Task{h, task}); }
      void await_resume() const noexcept {}
    };
    return Awaitable{time_step, _queue, std::move(task)};
  }

  void run() {
    for (uint64_t t = 0; !_queue.empty(); t++) {
      auto [first, last] = _queue.equal_range(t);
      if (first == _queue.end()) continue;
      for (auto it = first; it != last; ++it) {
        auto &task = it->second;
        if (task.cb) task.cb();
        task.coro();
      }
      _queue.erase(first, last);
    }
  }
};

struct Coroutine {
  struct promise_type {
    std::suspend_never initial_suspend() { return {}; }
    std::suspend_never final_suspend() noexcept { return {}; }
    Coroutine get_return_object() { return {}; }
    void unhandled_exception() { std::abort(); }
  };
};

template <uint64_t N>
Coroutine f(Scheduler &sched) {
  co_await sched.delay(N, [] { std::cout << N << '\n'; });
}

用如下的代码测试,输出为 1、3 和 5,符合预期:

int main() {
  Scheduler sched;
  // #5;
  f<5>(sched);
  // #1;
  f<1>(sched);
  // #3;
  f<3>(sched); sched.run();
}

在上面的示例中,Coroutine 基本上起到的是占位的作用,唯一需要注意的就是 initial_suspend 的返回类型为 suspend_never 而不是 suspend_always,否则执行流不会进入到 Scheduler::delay 中,这是因为 f 基本上被处理成了这样(详情请参见 dcl.fct.def.coroutine):

template <uint64_t N>
Coroutine f(Scheduler &sched) {
  ALLOCATE_FRAME_FOR_COROUTINE();

  Coroutine::promise_type prom;        (1)
  auto ret = prom.get_return_object(); (2)

  //=== 由于是 suspend_never,这一段没什么用 ===//
  auto s1 = prom.initial_suspend();
  if (!s1.await_ready()) {
    s1.await_suspend();
    // Return `ret` to caller.
    TRANSFER_CONTROL();
  }
  s1.await_resume();
  //===========================================//

  auto awaitable = sched.delay(N, [] { std::cout << N << '\n'; });
  // 同上 ...

  auto s2 = prom.final_suspend();
  // 同上 ...
}

也就是说,在这个场景中,Awaitable::await_resume 是没有被调用的,coroutine 的恢复是通过直接调用 std::coroutine_handleoperator() 实现的。当然,await_resume 还是有作用的,它提供了 co_await 的返回值。

Member FunctionDescription

Default constructor

A promise must be default constructible.

initial_suspend()

Determines if the coroutine suspends before it runs.

get_return_object()

Returns the coroutine object (resumable object).

final_suspend() noexcept

Determines if the coroutine suspends before it ends.

unhandled_exception()

Called when an exception happens.

return_value(val)

Is invoked by co_return <expr>(只在有这种形式时必须)

return_void()

Is invoked by co_return(同上)

yield_value(val)

Is invoked by co_yield val(同上)

FunctionDescription

await_ready

Indicates if the result is ready. When it returns false, await_suspend is called.

await_suspend

Schedule the coroutine for resumption or destruction.

await_resume

Provides the result for the co_await exp expression.

基于 coroutine 的 AST 迭代器

下面以 GCC 14.2 的实现为准,Clang 可能会不同:

偏移量字节数内容

0

8

Frame .actor 的函数指针

8

8

Frame .destroy 的函数指针

16

1

promise_type 对象

24

8

coroutine_handle 对象

32

2

34

1

35

1

36

1

initial_suspend 的返回对象,suspend_never

37

1

final_suspend 的返回对象,suspend_always

Clang 中的实现

Parser

代码位于 ParseExpr.cpp 中的 Parser::ParseCastExpression

case tok::kw_co_await: {  // unary-expression: 'co_await' cast-expression
  if (NotPrimaryExpression)
    *NotPrimaryExpression = true;
  SourceLocation CoawaitLoc = ConsumeToken();
  Res = ParseCastExpression(AnyCastExpr);
  if (!Res.isInvalid())
    Res = Actions.ActOnCoawaitExpr(getCurScope(), CoawaitLoc, Res.get());
  return Res;
}

其中 Actions 是一个类型为 Sema 的成员变量,函数 ActOnCoawaitExpr 的定义如下:

ExprResult Sema::ActOnCoawaitExpr(Scope *S, SourceLocation Loc, Expr *E) {
  if (!checkSuspensionContext(*this, Loc, "co_await")) (1)
    return ExprError();

  if (!ActOnCoroutineBodyStart(S, Loc, "co_await")) {
    CorrectDelayedTyposInExpr(E);
    return ExprError();
  }

  if (E->hasPlaceholderType()) {
    ExprResult R = CheckPlaceholderExpr(E);
    if (R.isInvalid()) return ExprError();
    E = R.get();
  }

  ExprResult Lookup = BuildOperatorCoawaitLookupExpr(S, Loc);
  if (Lookup.isInvalid())
    return ExprError();
  return BuildUnresolvedCoawaitExpr(Loc, E,
                                   cast<UnresolvedLookupExpr>(Lookup.get()));
}
1函数 checkSuspensionContext 检查标准中的规定:“[expr.await]: An await-expression shall appear only in a potentially evaluated expression within the compound-statement of a function-body outside of an exception handler [...] A context within a function where an await-expression can appear is called a suspension context of the function.”