回到顶部 暗色模式
recent
C++协程(4):理解对称转移

无栈协程介绍,翻译自 https://lewissbaker.github.io/

2024 Jan 28 23:45
Tags: C&C++
C++协程(3):理解promise

无栈协程介绍,翻译自 https://lewissbaker.github.io/

2024 Jan 27 16:19
Tags: C&C++
C++协程(2):理解co_await

无栈协程介绍,翻译自 https://lewissbaker.github.io/

2024 Jan 25 00:00
Tags: C&C++
C++协程(1):协程理论

无栈协程介绍,翻译自 https://lewissbaker.github.io/

2024 Jan 23 23:59
Tags: C&C++
分布式数据系统:共识算法

数据密集型应用系统设计:分布式事务与共识

2022 Nov 13 16:18
Tags: Arch
μblog

C++协程(4):理解对称转移

        协程标准提供了一种有趣的方式来编写异步代码,就好像你还在写同步代码那样。你只需要在合适的点调用 $co_-await$,编译器就会负责暂停协程,在暂停点之间保存状态,并在操作完成后恢复协程执行。
        然而,协程标准,就像它最初定义的,有着很不友好的限制,一不小心就很容易导致栈溢出。并且如果想要避免这种溢出,就需要你在的 $task$<$T$> 类型中进行额外的同步开销。
        还好,2018年对协程设计进行了调整,增加了一种叫做“对称转移” ( $symmetric$ $transfer$ ) 的功能,让你可以在不消耗任何额外栈空间的前提下暂停一个协程的同时恢复另一个协程的执行。此功能的加入解决了协程标准的一个关键限制,并且允许更简单有效的,不需要任何安全方面的措施来防止栈溢出的方式来实现一个异步协程。
        在这篇文章我会试着解释栈溢出问题,以及这个关键的“对称转移”能力是怎么解决这个问题的。

首先是一些关于任务协程怎么工作的背景

        考虑如下协程:

task foo() {
  co_return;
}

task bar() {
  co_await foo();
}

        假设我们有一个简单的 $task$ 类型,当其他协程等待它时会懒执行。这个特别的 $task$ 类型没有返回值。
        让我们分析下当 $bar\left(\right)$ 执行 $co_-await$ $foo\left(\right)$ 时发生了什么。

  • $bar\left(\right)$ 协程调用 $foo\left(\right)$ 函数。注意从主调的角度看协程只是一个普通函数。
  • $foo\left(\right)$ 调用执行一些步骤:
    • 分配协程帧内存 ( 一般在堆上 )。
    • 把参数拷贝到协程帧 ( 如果没有参数,不执行这一步 )。
    • 在协程帧构造 $promise$ 对象。
    • 调用 $promise.get_-return_-object\left(\right)$ 来获取 $foo\left(\right)$ 的返回值。这一步生成将被返回的 $task$ 对象,使用指向刚创建的协程帧的 $std$::$coroutine_-handle$ 初始化。
    • 在最初暂停点 ( 即左大括号处 ) 暂停协程。
    • 把 $task$ 对象返回给 $bar\left(\right)$。
  • 接着 $bar\left(\right)$ 协程对 $foo\left(\right)$ 返回的 $task$ 执行 $co_-await$ 表达式。
    • $bar\left(\right)$ 协程暂停,调用 $task$ 的 $await_-suspend\left(\right)$ 方法,传入指向 $bar\left(\right)$ 协程帧的 $std$::$coroutine_-handle$ 。
    • $await_-suspend\left(\right)$ 函数保存 $bar\left(\right)$ 的 $std$::$coroutine_-handle$ 到 $foo\left(\right)$ 的 $promise$ 对象中,调用 $foo\left(\right)$ 的 $std$::$coroutine_-handle$ 的 $.resume\left(\right)$ 方法,恢复 $foo\left(\right)$ 协程的执行。
  • $foo\left(\right)$ 协程执行同步完成。
  • $foo\left(\right)$ 协程在最终暂停点 ( 即右大括号处 ) 暂停,通过启动前存储在 $promise$ 对象的 $std$::$coroutine_-handle$ 恢复协程 $bar\left(\right)$ 的执行。
  • $bar\left(\right)$ 协程恢复,继续执行,直到到达 $co_-await$ 表达式语句的结尾,这时它调用 $foo\left(\right)$ 返回的临时变量 $task$ 的析构函数。
  • $task$ 的析构函数再调用 $foo\left(\right)$ 协程句柄的 $.destroy\left(\right)$ 方法,销毁协程帧,包括 $promise$ 对象和拷贝的参数。

        好,这么看来一个简单的调用包含了许多步骤。
        为了更深的理解这些步骤,让我们看看一种简单的使用协程标准设计 ( 不支持对称转移 ) 的 $task$ 类实现。

task 的大体实现

        类的实现大体是这样:

class task {
public:
  class promise_type { /* see below */ };

  task(task&& t) noexcept
  : coro_(std::exchange(t.coro_, {}))
  {}

  ~task() {
    if (coro_)
      coro_.destroy();
  }

  class awaiter { /* see below */ };

  awaiter operator co_await() && noexcept;

private:
  explicit task(std::coroutine_handle<promise_type> h) noexcept
  : coro_(h)
  {}

  std::coroutine_handle<promise_type> coro_;
};

        $task$ 独占一个指向协程帧的 $std$::$coroutine_-handle$,在协程调用时创建。$task$ 对象是RAII对象,保证当 $task$ 离开作用域时,$std$::$coroutine_-handle$ 的 $.destroy\left(\right)$ 会被调用。
        接着让我们实现 $promise$ 类型。

实现 task::promise_type

        根据以前的文章,我们知道 $promise$ 类型成员定义了协程帧中的 $Promise$ 对象类型,控制协程行为。
        首先,我们需要实现 $get_-return_-object\left(\right)$ 来构造在协程创建时返回的 $task$ 对象。这个方法只需要使用新创建的协程帧的 $std$::$coroutine_-handle$ 来初始化 $task$。
        我们可以使用 $std$::$coroutine_-handle$::$from_-promise\left(\right)$ 方法来从 $promise$ 对象获取一个句柄。

class task::promise_type {
public:
  task get_return_object() noexcept {
    return task{std::coroutine_handle<promise_type>::from_promise(*this)};
  }
}

        接着,我们想要协程在左大括号处进行初始化暂停,这样稍后可以在挂起 $task$ 的点恢复协程。
        懒启动协程有一些优点:

  1. 可以在开始协程执行前设置 $std$::$coroutine_-handle$。这意味着我们不需要使用线程同步来处理设置和协程运行之间的竞态。
  2. $task$ 析构函数可以无条件地销毁协程帧,不需要担心协程是否还在另一个线程运行,因为协程只会在主动 $await$ 时执行。并且开始后主调协程会被暂停,所以直到协程执行完成,我们才能调用 $task$ 的析构函数。这让编译器可以更好的把协程帧直接分配在主调的调用栈上,参考P0981R0了解更多关于堆分配跳过优化 ( $Heap$ $Allocation$ $eLision$ $Optimisation$, $HALO$ )。
  3. 提升协程代码的异常安全性。如果你不立即 $co_-await$ 返回的 $task$,并且做了一些其他逻辑,抛出了异常,导致栈回退,这时 $task$ 的析构函数会被调用,这样我们也可以安全的销毁协程帧,因为我们知道它还没开始运行。我们不需要处理 $detach$,悬垂引用,析构函数阻塞,进程终止或者未定义行为。这也是我在CppCon 2019 talk on Structured Concurrency中讲到的。

        为了让协程在左大括号处进行初始化暂停,我们实现了一个返回内置 $suspend_-always$ 类型的 $initial_-suspend\left(\right)$ 方法。

std::suspend_always initial_suspend() noexcept {
  return {};
}

        接着,我们需要定义 $return_-void\left(\right)$ 方法,这个方法会在执行到 $co_-return;$ 时,或者协程执行结束时被调用。这个方法并不需要做任何事,只需要声明来让编译器知道这个协程类型可以使用 $co_-return;$ 语句。

void return_void() noexcept {}

        我们也需要增加一个 $unhandled_-exception\left(\right)$ 方法,如果一个异常逃出协程体之外,这个方法会被调用。在我们的场景,可以认为 $task$ 协程体是noexcept的,如果出现异常,直接调用 $std$::$terminate\left(\right)$。

void unhandled_exception() noexcept {
  std::terminate();
}

        最后,当协程执行到右大括号时,我们想让协程在最终暂停点暂停,并恢复等待这个协程完成的另一个协程的执行。
        为了实现这个,我们需要 $promise$ 有一个数据成员来保存接着执行的协程的 $std$::$coroutine_-handle$。我们也需要定义 $final_-suspend\left(\right)$ 方法,返回一个 $awaitble$ 对象,在当前协程在最终暂停点暂停后,恢复下一个协程。
        把恢复延后到当前协程暂停后进行是很重要的,因为下一个协程可能会立即调用 $task$ 的析构函数,后者再调用协程帧的 $.destroy\left(\right)$ 。$.destroy\left(\right)$ 只对暂停协程有效,所以这时可能会导致未定义行为。
        编译器会在右大括号处插入 $co_-await$ $promise.final_-suspend\left(\right)$ 语句。
        十分要注意的是,当 $final_-suspend\left(\right)$ 被调用时,当前协程还没有暂停。在协程暂停前,我们需要等待返回的 $awaitble$ 对象 $await_-suspend\left(\right)$ 方法调用。

struct final_awaiter {
  bool await_ready() noexcept {
    return false;
  }

  void await_suspend(std::coroutin_handle<promise_type> h) noexcept {
    // The coroutine is now suspended at the final-suspend point.
    // Lookup its continuation in the promise and resume it.
    h.promise().continuation.resume();
  }

  void await_resume() noexcept {}
};

final_awaiter final_suspend() noexcept {
  return {};
}

std::coroutine_handle<> continuation;
};

        好了,$promise$ 类型完成了。最后还剩下 $task$::$operator$ $co_-await\left(\right)$。

实现 task::operator co_await()

        你可能记得理解co_await这篇文章讲的,当执行 $co_-await$ 表达式时,编译器会生成一个对 $operator$ $co_-await\left(\right)$ 的调用,如果定义了这个方法,返回对象必须同时定义 $await_-ready\left(\right)$、$await_-suspend\left(\right)$ 和 $await_-resume\left(\right)$ 方法。
        当一个协程 $await$ 一个 $task$ 时,我们希望挂起的协程总是暂停,并且在暂停后,把挂起协程的句柄保存在即将恢复的协程的 $promise$ 中,并在之后对 $task$ 的 $std$::$coroutine_-handle$ 调用 $.resume\left(\right)$ 来开始 $task$ 执行。
        因此代码会相对直接:

class task::awaiter {
public:
  bool await_ready() noexcept {
    return false;
  }

  void await_suspend(std::coroutine_handle<> continuation) noexcept {
    // Store the continuation in the task'promise so that the final_suspend()
    // knows to resume this corotuine when the task completes.
    coro_.promise().continuation = continuation;

    // Then we resume the task's coroutine, which is currently suspended
    // at the initial-suspend-point (ie. at the open curly brace).
    coro_.resume();
  }

  void await_resume() noexcept {}

private:
  explicit awaiter(std::coroutine_handle<task::promise_type> h) noexcept
  : coro_(h)
  {}

  std::coroutine_handle<task::promise_type> coro_;
};

task::awaiter task::operator co_await() && noexcept {
  return awaiter{coro_};
}

        这样就完成了 $task$ 类型功能所需的代码。
        你可以在 $Compiler$ $Explorer$ 查看完整代码:https://godbolt.org/z/-Kw6Nf

栈溢出问题

        然而,当你尝试在协程中编写循环,并且 $co_-await$ 可能在循环体内同步完成的 $task$ 时,会受到一些限制。
        例如:

task completes_synchronously() {
  co_return;
}

task loop_synchronously(int count) {
  for (int i = 0; i < count; i++) {
    co_await completes_synchronously();
  }
}

        使用上文中的 $task$ 实现,$loop_-synchronously\left(\right)$ 函数 ( 可能 ) 会在 $count$ 是 $10$、$10000$,甚至 $100'000$ 时都能正常运行。但是有些值会导致协程崩溃。
        例如,当 $count$ 是 $1'000'000$ 时就会崩溃,可以查看:https://godbolt.org/z/gy5Q8q
        崩溃的原因是栈溢出。
        为了理解栈溢出的原因,我们需要看看代码执行的时候发生了什么,尤其是栈帧上面发生了什么。
        当其他协程 $co_-await$ 了返回的 $task$ 后,$loop_-synchronously\left(\right)$ 开始执行。这将暂停挂起的协程,调用 $task$::$awaiter$::$await_-suspend\left(\right)$,后者调用 $task$ 的 $std$::$coroutine_-handle$ 的 $.resume\left(\right)$ 方法。
        因此在 $loop_-synchronously\left(\right)$ 启动时,栈看起来就像这样:

           Stack                                                   Heap
+------------------------------+  <-- top of stack   +--------------------------+
| loop_synchronously$resume    | active coroutine -> | loop_synchronously frame |
+------------------------------+                     | +----------------------+ |
| coroutine_handle::resume     |                     | | task::promise        | |
+------------------------------+                     | | - continuation --.   | |
| task::awaiter::await_suspend |                     | +------------------|---+ |
+------------------------------+                     | ...                |     |
| awaiting_coroutine$resume    |                     +--------------------|-----+
+------------------------------+                                          V
|  ....                        |                     +--------------------------+
+------------------------------+                     | awaiting_coroutine frame |
                                                     |                          |
                                                     +--------------------------+

注意:一个协程函数通常被编译器编译成两部分:

  1. “活动函数” ( $ramp$ $function$ ),处理协程帧构造、参数拷贝,$promise$ 构造和获取返回值,以及
  2. “协程体” ( $coroutine$ $body$ ),包含用户编写的逻辑。

我使用 $\$resume$ 后缀表示协程的“协程体”部分。
后面的文章会详细介绍这种分割方式。

        当 $loop_-synchronously\left(\right)$ $await$ $complete_-synchronously\left(\right)$ 返回的 $task$ 时,当前协程会被暂停,调用 $task$::$awaiter$::$await_-suspend\left(\right)$。$await_-suspend\left(\right)$ 方法再调用 $complete_-synchronously\left(\right)$ 协程句柄的 $.resume\left(\right)$ 方法。
        这会恢复 $complete_-synchronously\left(\right)$ 协程,后者会同步运行结束,并在最终暂停点暂停,调用 $task$::$promise$::$final_-awaiter$::$await_-suspend\left(\right)$ 方法,后者会调用 $loop_-synchronously\left(\right)$ 协程句柄的 $.resume\left(\right)$ 方法。
        接着当 $loop_-synchronously\left(\right)$ 协程恢复之后、$complete_-synchronously\left(\right)$ 返回的临时变量 $task$ 在分号处销毁时,我们再看看当前程序状态,栈 / 堆看起来就像这样:

           Stack                                                   Heap
+-------------------------------+ <-- top of stack
| loop_synchronously$resume     | active coroutine -.
+-------------------------------+                   |
| coroutine_handle::resume      |            .------'
+-------------------------------+            |
| final_awaiter::await_suspend  |            |
+-------------------------------+            |  +--------------------------+ <-.
| completes_synchronously$resume|            |  | completes_synchronously  |   |
+-------------------------------+            |  | frame                    |   |
| coroutine_handle::resume      |            |  +--------------------------+   |
+-------------------------------+            '---.                             |
| task::awaiter::await_suspend  |                V                             |
+-------------------------------+ <-- prev top  +--------------------------+   |
| loop_synchronously$resume     |     of stack  | loop_synchronously frame |   |
+-------------------------------+               | +----------------------+ |   |
| coroutine_handle::resume      |               | | task::promise        | |   |
+-------------------------------+               | | - continuation --.   | |   |
| task::awaiter::await_suspend  |               | +------------------|---+ |   |
+-------------------------------+               | - task temporary --|---------'
| awaiting_coroutine$resume     |               +--------------------|-----+
+-------------------------------+                                    V
|  ....                         |               +--------------------------+
+-------------------------------+               | awaiting_coroutine frame |
                                                |                          |
                                                +--------------------------+

        接着下一个要做的就是调用 $task$ 的析构函数,销毁 $complete_-synchronously\left(\right)$ 的协程帧,并递增 $count$ 变量,继续循环,再创建一个新的 $complete_-synchronously\left(\right)$ 协程并恢复它。
        事实上,这里会做的就是 $loop_-synchronously\left(\right)$ 和 $complete_-synchronously\left(\right)$ 返回递归调用对方,每次调用都会占用一部分栈空间,直到迭代够一定次数后,栈溢出并导致了未定义行为,导致程序立即崩溃。
        协程内编写循环这种看着没有任何递归的行为,却是很简单的导致函数越界的方式。
        那么,在这种原始协程标准设计之下,又该怎么解决这个问题呢?

协程标准解法

        好,那么这种无界递归问题该怎么避免呢?
        在上面的实现中,我们使用返回 $void$ 的 $await_-suspend\left(\right)$。协程标准中还有另外一个返回bool的 $await_-suspend\left(\right)$,返回true代表协程已暂停,控制权移交给 $resume\left(\right)$ 的调用者,返回false则协程立即恢复,不需要消耗任何额外的栈空间。
        所以,为了避免无界地循环递归,我们希望利用返回bool版本的 $await_-suspend\left(\right)$,让 $task$::$awaiter$::$await_-suspend\left(\right)$ 在任务同步结束时返回false,而不是递归地通过 $std$::$coroutine_-handle$::$resume\left(\right)$ 恢复协程。
        实现这种解法需要两部分:

  1. 在 $task$::$awaiter$::$await_-suspend\left(\right)$ 方法,你可以调用 $.resume\left(\right)$ 来启动协程执行。然后在 $.resume\left(\right)$ 返回时,检查协程是否执行完成。如果执行完成,我们可以返回false,表示挂起的协程应当立即恢复。如果没有完成,我们可以返回true,表示将控制权移交给 $std$::$coroutine_-handle$::$resume\left(\right)$ 的调用者。
  2. 在协程执行完成后调用的 $task$::$promise_-type$::$final_-awaiter$::$await_-suspend\left(\right)$ 方法内,我们需要检查挂起的协程是否已经 ( 或者即将 ) 从 $task$::$awaiter$::$await_-suspend\left(\right)$ 调用中返回true。如果是,调用 $.resume\left(\right)$ 来恢复它。否则我们需要避免恢复协程,并且告诉 $task$::$awaiter$::$await_-suspend\left(\right)$,让它返回false

        然而,有一个更复杂的问题,就是协程可能在当前线程开始执行、暂停,并在 $.resume\left(\right)$ 调用返回前,在其他线程恢复并执行完成。因此,我们需要解决第一部分和第二部分之间可能的并发冲突。
        我们需要使用 $std$::$atomic$ 值同步。
        现在我们可以对代码进行以下修改:

class task::promise_type {
  ...

  std::coroutine_handle<> continuation;
  std::atomic<bool> ready = false;
};

bool task::awaiter::await_suspend(
    std::coroutine_handle<> continuation) noexcept {
  promise_type& promise = coro_.promise();
  promise.continuation = continuation;
  coro_.resume();
  return !promise.ready.exchange(true, std::memory_order_acq_rel);
}

void task::promise_type::final_awaiter::await_suspend(
    std::coroutine_handle<promise_type> h) noexcept {
  promise_type& promise = h.promise();
  if (promise.ready.exchange(true, std::memory_order_acq_rel)) {
    // The coroutine did not complete synchronously, resume it here.
    promise.continuation.resume();
  }
}

        可以在 $Compiler$ $Explorer$ 上查看修改后的例子:https://godbolt.org/z/7fm8Za。注意这时当 $count$ $==$ $1'000'000$ 时,程序不再崩溃了。
        这就是 $cppcoro$::$task$<$T$> 里面实现的,规避无界递归问题的方式 ( 在某些平台也是一样 ),而且运行得很好。
        哇哦!问题解决了,吗?发布!可以吗…?

问题

        以上方法在解决递归的同时,也引入了一些问题。
        首先,它需要 $std$::$atomic$ 操作,开销可能很大。主调等待挂起的协程需要进行一次值交换,并且被调执行完成后也需要一次原子交换。如果你的程序是单线程的,那么你会在永远不需要的线程同步原子操作上花费额外的开销。
        其次,它引入了额外的分支。一个在主调,需要判断是否暂停或者立即恢复协程,另一个在被调,需要判断是否恢复下个协程或者暂停。
        注意这个额外分支的开销,甚至原子操作的开销,跟协程程序的业务逻辑相比通常不值一提。然而,协程被称为零成本抽象,并且有许多人使用协程暂停操作来避免等待L1缓存未命中问题 ( 更多细节可以查看 Gor 的 great CppCon talk on nanocoroutines )。
        最后,也可能是最重要的,它在挂起协程的恢复中引入了一些执行时不确定的值。
        假设我有以下代码:

cppcoro::static_thread_pool tp;

task foo();
{
  std::cout << "foo1 " << std::this_thread::get_id() << "\n";
  // Suspend coroutine and reschedule onto thread-poll thread.
  co_await tp.schedule();
  std::cout << "foo2 " << std::this_thread::get_id() << "\n";
}

task bar()
{
  std::cout << "bar1 " << std::this_thread::get_id() << "\n";
  co_await foo();
  std::cout << "bar2 " << std::this_thread::get_id() << "\n";
}

        在原来实现中,我们保证在 $co_-await$ $foo\left(\right)$ 执行完成后的代码会在相同的线程中内联执行。
        例如,一种可能的输出:

bar1 1234
foo1 1234
foo2 3456
bar2 3456

        然而,通过使用原子变量,$foo\left(\right)$ 执行完成可能与 $bar\left(\right)$ 的暂停之间并发冲突,在一些情况下,这意味着 $co_-await$ $foo\left(\right)$ 之后的代码可能在 $bar\left(\right)$ 启动的线程上执行。
        例如,一种可能的输出:

bar1 1234
foo1 1234
foo2 3456
bar2 1234

        对于许多用例来说,这种行为没有区别。然而,对于想要传递运行上下文的算法来说就有问题了。
        例如,$via\left(\right)$ 算法等待一些 $Awaitable$,然后在对应的 $scheduler$ 的运行上下文上返回。这个算法的一个简单版本如下:

template <typename Awaitable, typename Scheduler>
task<await_result_t<Awaitable>> via(Awaitable a, Scheduler s)
{
  auto result = co_await std::move(a);
  co_await s.schedule();
  co_return result;
}

task<T> get_value();
void consume(const T&);

task<void> consumer(static_thread_pool::scheduler s)
{
  T result = co_await via(get_value(), s);
  consume(result);
}

        最初版本的 $consume\left(\right)$ 调用总是保证在线程池 $s$ 上执行。然而,在使用原子变量后,$consume\left(\right)$ 可能在与 $scheduler$ $s$ 相关的线程上执行,也可能在 $consumer\left(\right)$ 开始执行的线程上执行。
        所以我们应该怎么在没有原子操作、额外分支和不确定的恢复上下文的前提下解决栈溢出问题呢?

进入“对称转移”

        Gor Nishanov ( $2018$ ) 的论文P0913R0《增加对称协程控制转移》,提出了这种问题的解法,通过提供一种允许一种不需要消耗任何额外栈空间的前提下,让一个协程暂停时对称恢复另一个协程的能力。
        论文提出了两个关键改变:

  • 允许 $await_-suspend\left(\right)$ 返回 $std$::$coroutine_-handle$<$T$>,表示执行需要对称转移给返回的句柄标识的协程。
  • 增加 $std$::$experimental$::$noop_-coroutine\left(\right)$ 函数,返回一个特别的 $std$::$coroutine_-handle$,可以作为 $await_-suspend\left(\right)$ 的返回值,暂停当前协程,从 $.resume\left(\right)$ 调用返回而不是将控制权转移给其他协程。

        所以“对称转移”是什么意思呢?
        当你调用 $std$::$coroutine_-handle$ 的 $.resume\left(\right)$ 来恢复协程,$.resume\left(\right)$ 的调用者在协程恢复后仍然是活跃的。接着,协程暂停,在暂停点调用 $await_-suspend\left(\right)$ 返回void( 无条件暂停 ) 或者true ( 有条件暂停 ) 时,$.resume\left(\right)$ 的调用才会返回。
        这可以认为是一种“对称转移”,协程执行和行为就像普通函数调用一样。$.resume\left(\right)$ 的主调可以是任意函数 ( 协程或非协程 )。当协程暂停并且 $await_-suspend\left(\right)$ 返回true或者void时,控制权会返回给 $.resume\left(\right)$ 的调用者。
        每次调用 $.resume\left(\right)$ 恢复协程时,都会创建一个新的协程的栈帧。
        然而,通过“对称转移”,我们可以简单的暂停协程,并恢复另一个协程。两个协程之间不需要显式的主被调关系,一个协程暂停时,它就可以把控制权转移给其他暂停的协程 ( 包括它自己 ),并且不需要在下次暂停或者执行完成时把控制权转移给上一个协程。
        让我们看看在 $awaiter$ 使用对称转移时,编译器会怎么处理 $co_-await$ 表达式:

{
  decltype(auto) value = <expr>;
  decltype(auto) awaitble =
      get_awaitable(promise, static_cast<decltype(value)&&>(value));
  decltype(auto) awaiter =
      get_awaiter(static_cast<decltype(awaitable)&&>(awaitable));
  if (!awaiter.await_ready())
  {
    using handle_t = std::coroutine_handle<P>;

    // <suspend-coroutine>

    auto h = awaiter.await_suspend(handle_t::from_promise(p));
    h.resume();
    // <return-to-caller-or-resumer>

    // <resume-point>
  }

  return awaiter.await_resume();
}

        让我们放大与其他 $co_-await$ 行为不同的部分:

auto h = awaiter.await_suspend(handle_t::from_promise(p));
h.resume();
// <return-to-caller-or-resumer>

        一旦协程状态机处于 $lower$ 状态 ( 另一篇文章的主题 ),<$return$-$to$-$caller$-$or$-$resumer$> 部分一般是 $return;$ 语句,这会导致上次 $.resume\left(\right)$ 的调用返回给主调。
        这意味着我们在原来 $std$::$coroutine_-handle$::$resume\left(\right)$ 调用的函数体内,又产生了一个对同一个签名的函数的调用,即 $std$::$coroutine_-handle$::$resume\left(\right)$ ,跟着一条 $return;$ 语句。
        一些编译器在开启编译优化后,能够在条件满足的时候,把函数结尾的调用 ( 即返回之前 ) 转变为尾调用 ( $tail$-$calls$ )。
        而这种优化正是我们想要的,可以避免栈溢出问题的方式。但是与其期望优化器来决定进行尾调用转移,我们更想要确保这种优化一直有效,即使是未开启优化的情况。
        但是首先,我们先了解下尾调用的含义。

尾调用

        尾调用意味着在当前栈帧弹出前,把当前函数的返回地址修改为被调的返回地址 ( 即被调直接返回给当前函数的主调 )。
        在X86/X64架构,这通常意味着编译器生成的代码会先弹出当前栈帧,然后调用 $jmp$ 指令来跳转到被调函数入口,而不是使用 $call$ 指令,并在 $call$ 返回后才弹出当前栈帧。
        这个优化通常只会在有限情况下发生,然而:
        特别的,它需要:

  • 调用机制支持尾调用,包括主调和被调;
  • 返回类型相同;
  • 没有需要在调用返回前执行的非默认析构函数;
  • 调用不在try/catch块内。

        $co_-await$ 对称转移形式的设计使得它刚好能满足所有需求。我们一个个来看。
        调用机制:当编译器把协程 $lower$ 成状态机时,它实际上分成两部分:活跃部分 ( $ramp$,分配和初始化协程框架 ) 和主体 ( 包含用户编写的协程体的状态机 )。
        协程的函数签名 ( 以及所有用户指定的调用机制 ) 只会影响活跃函数部分,主体部分则受编译器控制,永远不会被用户代码调用,只能通过 $ramp$ 函数和 $std$::$coroutine_-handle$::$resume\left(\right)$ 调用。
        协程体部分的调用机制是用户不可见的,完全由编译器决定,因此它可以选择一种合适的支持尾调用的机制,并被所有协程体使用。
        返回类型相同:源和目的协程的 $.resume\left(\right)$ 方法的返回值都是void,所以这个需求也能满足。
        没有非默认析构函数:当执行尾调用时,我们需要在调用目标函数前释放当前栈帧,这需要所有栈上对象的生命周期都在调用结束之前。
        一般情况下,一旦作用域内有对象的析构函数非默认,这些对象会在协程暂停的时候存活,所以它们会被分配在协程帧上,而非栈帧上。
        局部变量不需要在协程暂停时存活,可能分配在栈帧上,它们的生命周期会在协程下次暂停时结束。
        因此栈分配对象不存在需要在尾调用返回之前调用的非默认析构函数。
        调用不在try/catch块内:这个可能有点 trick,因为每个协程内都有个显式的try/catch块,包裹用户编写的协程体。
        根据规范,协程被定义为:

{
  promise_type promise;
  co_await promise.initial_suspend();
  try { F; }
  catch (...) { promise.unhandled_exception(); }
final_suspend:
  co_await promise.final_suspend();
}

        $F$ 就是用户编写的协程体部分。
        因此所有用户编写的 $co_-await$ 表达式 ( 除了 $initial$/$final_-suspend$ ) 都在try/catch块的上下文中。
        然而,实现上会把 $.resume\left(\right)$ 执行放到try/catch块的外部。
        我希望在另一篇文章讲协程怎么变成状态机的文章中讲解更多这方面的细节 ( 这篇文章已经够长了 )。

注意,然而,当前C++规范对实现这个操作需求的描述不够清晰,并且只是一个非规范注释,暗示这可能是必须的。希望我们将来能够修复这个规范。

        那么我们知道了执行对称转移的协程已经满足了所有尾调用的需求。编译器会确保它永远是尾调用,无论是否开启了优化。
        这意味着通过使用 $await_-suspend\left(\right)$ 返回的 $std$::$coroutine_-handle$,我们可以暂停当前协程,并在不消耗额外栈空间的前提下,把控制权转移给另一个协程。
        这让我们可以编写任意深度的相互递归调用代码,不需要担心栈溢出。
        这就是我们需要修复的 $task$ 的实现。

再看task

        有了“对称转移”能力,我们再来修复 $task$ 类型实现。
        我们要修改两个 $await_-suspend\left(\right)$ 方法的实现:

  • 首先当我们 $await$ $task$ 时,我们执行对称转移,来恢复任务协程。
  • 其次当任务协程完成,它执行一次对称转移,恢复挂起的协程。

        为了指明 $await$ 的方向,我们需要把 $task$::$awaiter$ 方法从:

void task::awaiter::await_suspend(
    std::coroutine_handle<> continuation) noexcept {
  // Store the continuation in the task's promise so that the final_suspend()
  // knows to resume this coroutine when the task completes.
  coro_.promise().continuation = continuation;

  // Then we resume the task's coroutine, which is currently suspended
  // at the initial-suspend-point (ie. at the open curly brace).
  coro_.resume();
}

        改成:

std::coroutine_handle<> task::awaiter::await_suspend(
    std::coroutine_handle<> continuation) noexcept {
  // Store the continuation in the task's promise so that the final_suspend()
  // knows to resume this coroutine when the task completes.
  coro_.promise().continuation = continuation;


  // Then we tail-resume the task's coroutine, which is currently suspended
  // at the initial-suspend-point (ie. at the open curly brace), by returning
  // its handle from await_suspend().
  return coro_;
}

        同时为了指明返回路径,我们需要把 $task$::$promise_-type$::$final_-awaiter$ 方法从:

void task::promise_type::final_awaiter::await_suspend(
    std::coroutine_handle<promise_type> h) noexcept {
  // The coroutine is now suspended at the final-suspend point.
  // Lookup its continuation in the promise and resume it.
  h.promise().continuation.resume();
}

        改成:

std::coroutine_handle<> task::promise_type::final_awaiter::await_suspend(
    std::coroutine_handle<promise_type> h) noexcept {
  // The coroutine is now suspended at the final-suspend point.
  // Lookup its continuation in the promise and resume it symmetrically.
  return h.promise().continuation;
}

        这样我们就有了一个不需要担心栈溢出问题的 $task$ 实现,并且 $await_-suspend$ 是void返回的,没有bool返回带来的不确定恢复上下文问题。

观察栈

        现在让我们看看最初的例子:

task completes_synchronously() {
  co_return;
}

task loop_synchronously(int count) {
  for (int i = 0; i < count; ++i) {
    co_await completes_synchronously();
  }
}

        当其他协程 $co_-await$ $task$ 时, $loop_-synchronously\left(\right)$ 协程首次开始执行。这会触发其他协程的对称转移,并通过 $std$::$coroutine_-handle$::$resume\left(\right)$ 调用恢复。
        因此当 $loop_-synchronously\left(\right)$ 开始时,栈看起来会像这样:

           Stack                                                Heap
+---------------------------+  <-- top of stack   +--------------------------+
| loop_synchronously$resume | active coroutine -> | loop_synchronously frame |
+---------------------------+                     | +----------------------+ |
| coroutine_handle::resume  |                     | | task::promise        | |
+---------------------------+                     | | - continuation --.   | |
|     ...                   |                     | +------------------|---+ |
+---------------------------+                     | ...                |     |
                                                  +--------------------|-----+
                                                                       V
                                                  +--------------------------+
                                                  | awaiting_coroutine frame |
                                                  |                          |
                                                  +--------------------------+

        现在,当执行 $co_-await$ $completes_-synchronously\left(\right)$ ,它会对称转移到 $completes_-synchronously$ 协程。
        通过:

  • 调用 $task$::$operator$ $co_-await\left(\right)$,后者会在稍后返回 $task$::$awaiter$ 对象
  • 然后暂停并调用 $task$::$awaiter$::$await_-suspend\left(\right)$,后者稍后返回 $completes_-synchronously$ 协程的 $coroutine_-handle$。
  • 然后执行尾调用 / 跳转到 $completes_-synchronously$ 协程,在激活 $completes_-synchronously$ 栈帧前弹出 $loop_-synchronously$ 栈帧。

        在 $completes_-synchronously$ 恢复后,栈看起来会像这样:

              Stack                                          Heap
                                            .-> +--------------------------+ <-.
                                            |   | completes_synchronously  |   |
                                            |   | frame                    |   |
                                            |   | +----------------------+ |   |
                                            |   | | task::promise        | |   |
                                            |   | | - continuation --.   | |   |
                                            |   | +------------------|---+ |   |
                                            `-, +--------------------|-----+   |
                                              |                      V         |
+-------------------------------+ <-- top of  | +--------------------------+   |
| completes_synchronously$resume|     stack   | | loop_synchronously frame |   |
+-------------------------------+ active -----' | +----------------------+ |   |
| coroutine_handle::resume      | coroutine     | | task::promise        | |   |
+-------------------------------+               | | - continuation --.   | |   |
|     ...                       |               | +------------------|---+ |   |
+-------------------------------+               | task temporary     |     |   |
                                                | - coro_       -----|---------`
                                                +--------------------|-----+
                                                                     V
                                                +--------------------------+
                                                | awaiting_coroutine frame |
                                                |                          |
                                                +--------------------------+

        注意这里栈帧数量增长了。
        在 $completes_-synchronously$ 协程完成,执行到右大括号处,它会执行 $co_-await$ $promise.final_-suspend\left(\right)$。
        这会暂停协程,并调用 $final_-awaiter$::$await_-suspend\left(\right)$ ,后者返回下个协程的 $std$::$coroutine_-handle$ ( 即指向 $loop_-synchronously$ 协程的句柄 )。这会触发一次对称转移 / 尾调用来恢复 $loop_-synchronously$ 协程。
        $loop_-synchronously$ 恢复后的栈看起来就像这样:

           Stack                                                   Heap
                                                   +--------------------------+ <-.
                                                   | completes_synchronously  |   |
                                                   | frame                    |   |
                                                   | +----------------------+ |   |
                                                   | | task::promise        | |   |
                                                   | | - continuation --.   | |   |
                                                   | +------------------|---+ |   |
                                                   +--------------------|-----+   |
                                                                        V         |
+----------------------------+  <-- top of stack   +--------------------------+   |
| loop_synchronously$resume  | active coroutine -> | loop_synchronously frame |   |
+----------------------------+                     | +----------------------+ |   |
| coroutine_handle::resume() |                     | | task::promise        | |   |
+----------------------------+                     | | - continuation --.   | |   |
|     ...                    |                     | +------------------|---+ |   |
+----------------------------+                     | task temporary     |     |   |
                                                   | - coro_       -----|---------`
                                                   +--------------------|-----+
                                                                        V
                                                   +--------------------------+
                                                   | awaiting_coroutine frame |
                                                   |                          |

        $loop_-synchronously$ 协程被恢复后,在执行到达分号后,要做的第一件事就是调用 $completes_-synchronously$ 调用返回的临时 $task$ 的析构函数。这会销毁协程帧,释放内存,这时情况会是这样:

           Stack                                                   Heap
+---------------------------+  <-- top of stack   +--------------------------+
| loop_synchronously$resume | active coroutine -> | loop_synchronously frame |
+---------------------------+                     | +----------------------+ |
| coroutine_handle::resume  |                     | | task::promise        | |
+---------------------------+                     | | - continuation --.   | |
|     ...                   |                     | +------------------|---+ |
+---------------------------+                     | ...                |     |
                                                  +--------------------|-----+
                                                                       V
                                                  +--------------------------+
                                                  | awaiting_coroutine frame |
                                                  |                          |
                                                  +--------------------------+

        我们现在返回到 $loop_-synchronously$ 协程的执行,并且栈帧和协程帧数量与开始执行时一样,并且之后每次循环也是一样。
        因此我们可以执行许多次迭代,只消耗常数级的存储空间。
        完整的对称转移版本的 $task$ 类型可以看以下的 $Compiler$ $Explorer$ 链接:https://godbolt.org/z/9baieF

通用形式 await_suspend 的对称转移

        既然我们已经见识过了对称转移形式的 $awaitable$ $concept$ 的能力和重要性,我想向你展示一下通用形式,理论上可以替换voidbool返回形式的 $await_-suspend\left(\right)$。
        但首先,我们需要看一下P0913R0提案添加的新的协程设计:$std$::$noop_-coroutine\left(\right)$。

循环终止

        通过对称转移形式的协程,每次协程暂停,它都会对称恢复另一个协程。只要你有其他协程可以恢复,它就非常有用。但是有时我们不想要执行其他协程,只需要暂停并把控制权返回给 $std$::$coroutine_-handle$::$resume\left(\right)$ 的主调。
        voidbool返回形式的 $await_-suspend\left(\right)$ 都允许协程暂停并从 $std$::$coroutine_-handle$::$resmue\left(\right)$ 调用中返回,那么我们怎么让对称转移形式的协程返回呢?
        答案是使用内置的特殊 $std$::$coroutine_-handle$,称为无操作协程句柄 ( $noop$ $coroutine$ $handle$ ),通过函数 $std$::$noop_-coroutine\left(\right)$ 生成。
        无操作协程句柄之所以叫这个名字,是因为它的 $.resume\left(\right)$ 实现是立即返回,即恢复协程后没有操作。一般它的实现包含一条简单的 $ret$ 指令。
        如果 $await_-suspend\left(\right)$ 方法返回 $std$::$noop_-coroutine\left(\right)$ 句柄,那么协程不会把控制权转移给下一个协程,而是转移给 $std$::$coroutine_-handle$::$resume\left(\right)$ 的主调。

其他风格的 await_suspend() 表示方式

        有了这个信息,我们再看其他风格的 $await_-suspend\left(\right)$ 如何使用对称转移。
        我们有void返回格式的:

void my_awaiter::await_suspend(std::coroutine_handle<> h) {
  this->coro = h;
  enqueue(this);
}

        可以改成bool返回格式的:

bool my_awaiter::await_suspend(std::coroutine_handle<> h) {
  this->coro = h;
  enqueue(this);
  return true;
}

        也可以再改成对称转移风格:

std::noop_coroutine_handle my_awaiter::await_suspend(
    std::coroutine_handle<> h) {
  this->coro = h;
  enqueue(this);
  return std::noop_coroutine();
}

        我们有bool返回格式的:

bool my_awaiter::await_suspend(std::coroutine_handle<> h) {
  this->coro = h;
  if (try_start(this)) {
    // Operation will complete asynchronously.
    // Return true to transfer execution to caller of
    // coroutine_handle::resume().
    return true;
  }

  // Operation completed asynchronously.
  // Return false to immediately resume the current coroutine.
  return false;
}

        可以改成对称转移格式的:

std::coroutine_handle<> my_awaiter::await_suspend(std::coroutine_handle<> h) {
  this->coro = h;
  if (try_start(this)) {
    // Operation will complete asynchronously.
    // Return std::noop_coroutine() to transfer execution to caller of
    // coroutine_handle::resume().
    return std::noop_coroutine();
  }

  // Operation completed asynchronously.
  // Return current coroutine's handle to immediately resume
  // the current coroutine.
  return h;
}

为什么有三种风格?

        那么为什么在有对称转移风格的前提下,我们还继续使用voidbool返回风格的 $await_-suspend\left(\right)$ 呢?
        一部分历史原因,一部分实用性原因,一部分性能原因。
        $await_-suspend\left(\right)$ 返回 $std$::$noop_-coroutine_-handle$ 可以完全替代void返回版本,因为这两种对于编译器来说,都表示协程会无条件地把控制权转移给 $std$::$coroutine_-handle$::$resume\left(\right)$ 的主调。
        在我看来,它还能留下来的原因,一部分是它在对称转移被提出之前就已经在使用了,另一部分是因为void返回对于无条件暂停的情况可以少写一点代码。
        bool返回版本,在某些情况下,可以优化得比对称转移形式的更好。
        假设我们有一个bool返回的 $await_-suspend\left(\right)$,定义在另一个编译单元。这时编译器可以在挂起协程生成代码,暂停当前协程并在 $await_-suspend\left(\right)$ 调用返回之后,通过执行下一块代码的方式来有条件地恢复它。如果 $await_-suspend\left(\right)$ 返回false,那么就可以明确知道需要执行哪段代码。
        即使有了对称转移风格,我们还是需要表示相同的结果:返回主调 / 恢复者,或者恢复当前协程。相比返回true或者false,我们需要返回 $std$::$noop_-coroutine\left(\right)$ 或者当前协程的句柄。我们可以把这些句柄都统一成 $std$::$coroutine_-handle$<$void$> 返回。
        然而,因为 $await_-suspend\left(\right)$ 定义在其他编译单元,编译器看不到协程返回的句柄指向什么,所以当协程恢复后,它需要一些更重的间接调用,并且相比bool返回的单个分支而言,可能会有更多的恢复协程分支。
        有可能在将来的某一天,对称转移版本可以获得同等性能。例如,我们可以编写内联的 $await_-suspend\left(\right)$,但是调用一个bool返回的非内联方法,并有条件地返回合适的句柄。
        例如:

struct my_awaiter {
  bool await_ready();

  // Compiler should in-theory be able to optimise this to the same
  // as the bool-returning version, but currently don't do this optimisation.
  std::coroutine_handle<> await_suspend(std::coroutine_handle<> h) {
    if (try_start(h)) {
      return std::noop_coroutine();
    } else {
      return h;
    }
  }

  void await_resume();

private:
  // This method is defined out-of-line in a seperate translation unit.
  bool try_start(std::coroutine_handle<> h);
}

        然而,现在的编译器 ( 比如Clang 10 ) 还不能把这种情况优化成和bool返回版本一样高效的代码。话虽如此,除非你编写了一个非常紧凑的循环,否则可能不会注意到它们的差异。
        到目前为止,通用的规则是:

  • 如果你需要无条件地返回给 $.resume\left(\right)$ 的主调,使用void返回风格。
  • 如果你需要有条件地返回给 $.resume\left(\right)$ 的主调,或者恢复当前协程,使用bool返回风格。
  • 如果你需要恢复其他协程,使用对称转移风格。

补充

        C++ 20新加的对称转移能力,使得协程递归恢复对方变得简单,不需要担心栈溢出。这个能力是编写高效、安全的异步协程类型的关键,就像 $task$ 那样。
        这篇关于对称转移的文章比预期的要长,十分感谢你坚持读完了它!希望能帮到你。
        在下篇文章,我会讲解编译器怎么把协程函数转换成状态机。

致谢

        也是不翻了~

C++协程(3):理解promise

        这篇文章是C++协程标准系列的第三篇文章。
        之前的文章可以在这里查看:

        在这篇文章我会讲解你写的代码是怎么被编译器编译成协程代码的,并且你可以通过定义自己的 $Promise$ 类型来自定义协程行为。

协程concepts

        协程标准添加了三个新的关键字:$co_-await$,$co_-yield$ 和 $co_-return$ 。无论你在函数体里使用哪个,编译器都会把这个函数编译成协程,这个函数也不再是普通函数。
        编译器使用一些相当机械的转换来把你写的代码变成状态机,从而可以在函数内的特定点暂停,并在之后恢复执行。
        在之前的文章我描述了协程标准引入的两个接口中的第一个接口:$Awaitable$ 接口。而第二个接口, $Promise$ 接口则对于这种代码转换来说十分重要。
        $Promise$ 接口规定了自定义它所在协程行为的方法。库开发者可以用它定义协程被调用时的行为,协程返回的行为 ( 包括普通方式返回或者通过未处理异常返回 ),协程内使用 $co_-await$ 或者 $co_-yield$ 表达式的行为。

Promise对象

        $Promise$ 对象通过实现协程执行过程中特定点的调用方法的形式来定义和控制对应协程的行为。

在继续之前,我希望你能忘掉之前所有关于 $promise$ 是什么的记忆。在一些用例中,协程的 $promise$ 对象与 $std$::$future$ 的 $std$::$promise$ 的功能很相似,但在其他用例中,并不能拿来相比。可能把协程的 $promise$ 对象想象成一个用于控制协程行为、跟踪协程状态的“协程状态控制器”会更合适。

        $promise$ 对象实例会跟随着每个协程函数调用时创建的协程帧一起被构造。
        编程器会在协程执行的关键点生成对 $promise$ 对象的特定方法的调用。
        在接下来的例子中,我们假设某个特定协程调用创建的 $promise$ 对象叫做 $promise$ 。
        当你编写一个协程函数体 <$body$-$statement$> ,函数体包含了某个协程关键字 ( $co_-return$,$co_-await$,$co_-yield$ ),那么协程体将转变为 ( 大致地 ) 如下形式:

{
  co_await promise.initial_suspend();
  try
  {
    <body-statement>
  }
  catch (...)
  {
    promise.unhandled_exception();
  }
FinalSuspend:
  co_await promise.final_suspend();
}

        一个协程函数被调用后,在执行之前会有一系列的准备步骤,这些步骤与常规的函数会有些不同。
        以下是一个步骤总结 ( 我会在接下来详细介绍每个步骤 ):

  1. 通过 $operator$ $new$ ( 可选的 ) 分配协程帧。
  2. 将所有函数参数拷贝到协程帧。
  3. 调用 $promise$ 对象的构造函数,$promise$ 类型记为 $P$ 。
  4. 调用 $promise.get_-return_-object\left(\right)$ 方法获取协程首次暂停时需要返回给主调的返回值,并保存为一个局部变量。
  5. $co_-await$ 调用 $promise.initial_-suspend\left(\right)$ 方法获取结果。
  6. 当 $co_-await$ $promise.initial_-suspend\left(\right)$ 表达式恢复 ( 可能立即恢复或者异步恢复 ) 后,协程开始执行你编写的协程体语句。

        当协程执行到 $co_-return$ 语句时,会进行一些额外的步骤:

  1. 调用 $promise.return_-void\left(\right)$ 或者 $promise.return_-value$(<$expr$>)。
  2. 以与创建顺序相反的顺序自动销毁所有具有自动生命周期的变量。
  3. $co_-await$ 调用 $promise.final_-suspend\left(\right)$ 获取结果。

        如果执行因为一个未处理异常停止,那么会发生:

  1. catch块中捕获异常,调用 $promise.unhandled_-exception\left(\right)$ 。
  2. $co_-await$ 调用 $promise.final_-suspend\left(\right)$ 获取结果。

        一旦执行到协程体之外,协程帧就会被销毁。通过以下步骤销毁协程帧:

  1. 调用 $promise$ 对象的析构函数。
  2. 调用拷贝的函数参数的析构函数。
  3. 调用 $operator$ $delete$ 释放协程帧的内存空间 ( 可选的 )。
  4. 返回控制权给主调/恢复者。

        当执行首次到达 $co_-await$ 表达式的 <$return$-$to$-$caller$-$resumer$> 点,或者协程没有到达任何 <$return$-$to$-$caller$-$or$-$resumer$> 点就完成时,协程要么被暂停,要么被销毁,然后之前 $promise.get_-return_-object\left(\right)$ 调用返回的结果就会被返回给协程的主调。

分配协程帧

        首先,编译器生成一个对协程帧的 $operator$ $new$ 调用来分配内存。
        如果 $promise$ 类型 $P$,定义了一个特殊的 $operator$ $new$ ,那么就会调用它定义的,否则就调用全局的。
        这里有几个重要点需要注意:
        传给 $operator$ $new$ 的大小不是 $sizeof\left(P\right)$ ,而是整个编译过程中编译器计算出来的协程帧大小,包括了入参的数量和大小、$promise$ 对象的大小、局部变量的数量和大小,以及一些其他编译器相关的用来管理协程状态的空间。
        如果满足以下条件,编译器可以省掉对 $operator$ $new$ 的调用:

  • 协程帧的生命周期严格内嵌于主调。
  • 编译器可以在调用点知道协程帧需要的内存大小。

        在这种情况下,编译器可以在主调的调用栈 ( 栈帧部分或者协程帧部分 ) 上分配协程帧的空间。
        协程标准没有定义什么情况下需要跳过分配,所以你需要在协程帧分配可能抛出 $std$::$bad_-alloc$ 异常的前提下编写代码。这意味着你不应该把一个协程函数声明成noexcept,除非你确定协程分配协程帧内存失败后会调用 $std$::$terminate\left(\right)$ 。
        然而,有一个后备方案可以处理协程帧分配失败。在不允许异常的环境下,例如集成场景或者高性能场景这种不能容忍异常开销的场景,这种方式十分必要。
        如果 $promise$ 类型提供了一个静态的 $P$::$get_-return_-object_-on_-allocation_-failure\left(\right)$ 成员函数,编译器会生成一个对 $operator$ $new$($size_-t$, $nothrow_-t$) 的重载调用。如果调用返回nullptr,那么协程会立即调用 $P$::$get_-return_-object_-on_-allocation_-failure\left(\right)$ 并把结果返回给主调,而不是抛出异常。

自定义协程帧内存分配

        你的 $promise$ 类型可以重载 $operator$ $new$ ,这样如果编译器需要分配协程帧内存,就会使用你定义的 $operator$ $new$ 而不是全局的 $operator$ $new$ 。
        例如:

struct my_promise_type
{
  void* operator new(std::size_t size)
  {
    void* ptr = my_custom_allocate(size);
    if (!ptr) throw std::bad_alloc{};
    return ptr;
  }

  void operator delete(void* ptr, std::size_t size)
  {
    my_custom_free(ptr, size);
  }

  ...
}

        我知道你想问什么:“但是自定义 $allocator$ 呢?”。
        你也可以提供 $P$::$operator$ $new\left(\right)$ 的重载,接收额外的参数,如果能找到合适的重载,将使用协程函数参数的左值引用调用。这种方式可以用来 $hook$ $operator$ $new$ ,使其调用某个作为参数传递给协程函数的 $allocator$ 的 $allocate\left(\right)$ 方法。
        你需要做一些额外工作来在分配好的内存中拷贝一份 $allocator$ 的副本,这样才能在相应的 $operator$ $delete$ 调用中引用它,因为 $allocator$ 不会作为参数传递给的 $operator$ $delete$ 。这是因为入参会被存储在协程帧,所以他们有可能会在 $operator$ $delete$ 调用之前就被销毁了。
        例如,你可以实现 $operator$ $new$ 来给协程帧分配一些额外内存,并利用这个空间来存储 $allocator$,用于释放协程帧内存。
        例如:

template <typename ALLOCATOR>
struct my_promise_type
{
  template <typename... ARGS>
  void* operator new(std::size_t sz, std::allocator_arg_t, ALLOCATOR& allocator, ARGS&... args)
  {
    // Round up sz to next multiple of ALLOCATOR alignment
    std::size_t allocatorOffset =
      (sz + alignof(ALLOCATOR) - 1u) & ~(alignof(ALLOCATOR) - 1u);
    
    // CAll onto allocator to allocate space for coroutine frame.
    void* ptr = allocator.allocate(allocatorOffset + sizeof(ALLOCATOR));


    // Take a copy of the allocator (assuming noexcept copy constructor here)
    new (((char*)ptr) + allocatorOffset) ALLOCATOR(allocator);

    return ptr;
  }

  void operator delete(void* ptr, std::size_t sz)
  {
    std::size_t allocatorOffset =
      (sz + alignof(ALLOCATOR) - 1u) & ~(alignof(ALLOCATOR) - 1u);

    ALLOCATOR& allocator = *reinterpret_cast<ALLOCATOR*>(
      ((char*)ptr) + allocatorOffset);

    // Move allocator to local variable first so it isn't freeing its
    // own memory from underneath itself.
    // Assuming allocator move-constructor is noexcept here.
    ALLOCATOR allocatorCopy = std::move(allocator);

    // But don't forget to destruct allocator object in coroutine frame
    allocator.~ALLOCATOR();

    // Finally, free the memory using the allocator.
    allocatorFactory.deallocate(ptr, allocatorOffset + sizeof(ALLOCATOR));
  }
}

        为了 $hook$ 成功,让自定义的 $my_-promise_-type$ 被协程使用,需要让 $std$::$allocator_-arg$ 作为第一个入参,这需要通过 $coroutine_-traits$ 类 ( 详细见后续的 $coroutine_-traits$ 章节 ) 来指定。
        例如:

namespace std::experimental
{
  template <typename ALLOCATOR, typename... ARGS>
  struct coroutine_traits<my_return_type, std::allocator_arg_t, ALLOCATOR, ARGS...>
  {
    using promise_type = my_promise_type<ALLOCATOR>;
  };
}

        注意,即使你自定义了协程的内存分配策略,编译器还是可以跳过你的内存分配过程。

拷贝参数到协程帧

        协程需要从原始主调中把所有传给协程的参数拷贝到协程帧,这样才能保证协程被暂停后,它们依然有效。
        如果参数是值传递的,那么这些参数将会通过移动构造函数拷贝到协程帧。
        如果参数以引用传递 ( 左值或者右值 ),那么只有引用会被拷贝到协程帧,而不是它们指向的值。
        注意如果类型的析构函数是默认的,并且在到达 <$return$-$to$-$caller$-$or$-$resumer$> 点后不再引用,编译器可以跳过析构该参数副本。
        在以引用方式传递参数给协程的过程中,存在几个陷阱,也因此你不能在协程生命周期中依赖引用。许多常见的用于普通函数的技术,例如完美转发和通用引用,用到协程上可能导致代码出现未定义行为。如果你想了解更多,Toby Allsopp 写了一篇关于这个主题的好文章
        如果某个参数的拷贝/移动构造函数抛出了异常,那么已经构造好的参数的析构函数将被调用,协程帧会被释放,异常会传播给主调。

构造 promise 对象

        一旦所有参数被拷贝到协程帧,协程会构造 $promise$ 对象。
        参数在 $promise$ 对象之前构造的原因是让 $promise$ 对象可以在构造函数中访问先前拷贝的参数。
        首先,编译器检查 $promise$ 构造函数是否存在接收每个拷贝的参数左值引用的重载函数,如果找到了这种函数,编译器会生成对这个构造函数的调用,如果没有找到,编译器会使用 $promise$ 类型的默认构造函数。
        注意 $promise$ 构造函数可以“看到”入参是一个对协程标准相对较新的改动,在 Jacksonville 2018 会议的N4723 中被采纳,提案为P0914R1。因此对于一些较老版本的Clang或者MSVC来说可能还不支持。
        如果 $promise$ 构造函数抛出了一个异常,那么入参的拷贝会析构,协程帧会在异常传播回给主调之前被释放。

获取返回对象

        协程要对 $promise$ 对象做的第一件事是调用 $promise.get_-return_-object\left(\right)$ 获取返回对象。
        返回对象是协程函数初始暂停或者运行完成移交控制权后要返回给主调的值。
        你可以认为控制流是这样的 ( 大致的 ):

// Pretend there's a compiler-generated structure called 'coroutine_frame'
// thta holds all of the state needed for the coroutines. It's constructor
// takes a copy of parameters and default-constructs a promise object.
struct coroutine_frame { ... };

T some_corouine(P param)
{
  auto* f = new coroutine_frame(std::forward<P>(param));

  auto returnObject = f->promise.get_return_object();

  // Start execution of the coroutine body by resuming it.
  // This call will return when the coroutine gets to the first.
  // suspend-point or when the coroutine runs to completion.
  coroutine_handle<decltype(f->promise)>::from_promise(f->promise).resume();

  // Then the return object is returned to the caller.
  return returnObject;
}

        注意我们需要在协程体之前获取返回对象,因为协程帧 ( 或者说 $promise$ 对象 ) 可能在 $coroutine_-handle$::$resume\left(\right)$ 调用返回前就被销毁,可能在当前线程销毁,也可能在其他线程,所以在协程体开始执行后调用 $get_-return_-object\left(\right)$ 是不安全的。

初始暂停点

        协程帧初始化,获取返回对象之后,要做的下一件事是执行 $co_-await$ $promise.initial_-suspend\left(\right);$ 。
        这让 $promise$ 类型的开发者可以控制协程在执行编写好的协程体代码之前是否需要等待,或者立即执行协程体。
        如果协程在初始暂停点暂停,那么它可以在之后你选择的某个时间点对 $coroutine_-handle$ 调用 $resume\left(\right)$ 来恢复,或者调用 $destroy\left(\right)$ 销毁。
        $co_-await$ $promise.initial_-suspend\left(\right)$ 表达式的结果会被丢弃,所以实现 $awaiter$ 的 $await_-resume\left(\right)$ 函数实现应该返回void
        注意这个语句是在try/catch块保护之外的 ( 如果你忘记了,可以往上翻翻再看下代码 )。这意味着 $co_-await$ $promise.initial_-suspend\left(\right)$ 抛出的异常等同于到达它的 <$return$-$to$-$caller$-$or$-$resumer$> 点,会在协程销毁调用栈和返回对象后,再抛回给主调。
        要明白如果你的返回对象是RAII,即会在协程帧被析构的时候销毁,那么你需要保证 $co_-await$ $promise.initial_-suspend\left(\right)$ 是noexcept的,这样才能避免 double-free。

注意这里有个提案,希望调整语义,以便 $co_-await$ $promise.initial_-suspend\left(\right)$ 表达的全部或者部分位于try/catch块内,所以这里的明确语义可能会在最终确定前发生变化。

        对于许多协程类型来说,$initial_-suspend\left(\right)$ 方法要么返回 $std$::$experimental$::$suspend_-always$ ( 如果操作懒开始 ),要么返回 $std$::$experimental$::$suspend_-never$ ( 如果操作立即开始 )。而它们都是noexcept的 $awaitable$ ,所以这通常不是问题。

返回给主调

        当协程函数达到首个 <$return$-$to$-$caller$-$or$-$resumer$> 点 ( 或者没有到达,但是协程执行完成 ) 时,通过 $get_-return_-object\left(\right)$ 获取的返回对象会返回给协程的主调。
        注意返回对象的类型并不需要与协程函数的返回类型相同。在必要的时候,返回对象可以是一个能够隐式转换为协程返回类型的类型。

注意Clang的协程实现 ( 从$5.0$ 开始 ) 会在返回对象从协程调用返回后才会转换,而MSVC自从 2017 Update 3 开始会在 $get_-return_-object\left(\right)$ 调用之后立即转换。虽然协程标准没有显式给出预期行为,我相信MSVC有计划把它们的实现改成更像Clang的实现,因为这种实现允许一些有趣的用例

使用 co_return 从协程返回

        当协程到达 $co_-return$ 语句,它会被翻译成一个 $promise.return_-void\left(\right)$ 调用或者 $promise.return_-value$(<$expr$>) 以及一个 $goto$ $FinalSuspend$ 调用。
        翻译规则如下:

  • $co_-return$;
    • -> $promise.return_-void\left(\right)$
  • $co_-return$ <$expr$>
    • -> 如果 <$expr$> 类型为void,<$expr$>; $promise.return_-void\left(\right)$;
    • -> 如果 <$expr$> 类型不为void,$promise.return_-value$(<$expr$>);

        随后的 $goto$ $FinalSuspend;$ ,会在 $co_-await$ $promise.final_-suspend\left(\right)$ 之前触发所有自动生命周期的局部变量按照与构造顺序相反的顺序析构。
        注意如果协程函数体代码没有以 $co_-return$ 语句结束,会等同于以 $co_-return;$ 结束。在这个例子中,如果 $promise$ 类型没有 $return_-void\left(\right)$ 方法,那么行为会是未定义的。
        如果 <$expr$> 或者 $promise.return_-void\left(\right)$ 调用或者 $promise.return_-value\left(\right)$ 抛出了一个异常,那么异常仍然会传给 $promise.unhandled_-exception\left(\right)$ ( 见以下 )。

处理传播到协程体以外的异常

        如果一个异常传播到协程体之外,那么异常会被catch块捕获,并调用 $promise.unhandled_-exception\left(\right)$ 方法。
        这个方法典型实现是调用 $std$::$current_-exception\left(\right)$ 来捕获异常的副本,在后续抛出到其他上下文。
        其他可选实现是立即通过 $throw;$ 语句重新抛出异常,例如 $folly$::$Optional$。然而,这么做会 ( 或者说很可能,见以下 ) 导致协程帧被立即销毁,并将异常传播回主调 / 恢复者。这可能会导致某些 $abstractions$ 出现问题,因为它们假设 / 要求对 $coroutine_-handle$::$resume\left(\right)$ 的调用是noexcept,所以你应该只在能够完全控制谁 / 什么调用 $resume\left(\right)$ 时使用这种方式。
        注意现在协程标准在调用 $unhandled_-exception\left(\right)$ 时重新抛出异常 ( 或者在try-catch块之外抛出异常 ) 的预期行为描述并不是很清晰
        我现在对标准的解释是如果控制权离开了协程体,可以通过 $co_-await$ $promise.initial_-suspend\left(\right)$ 、$promise.unhandled_-exception\left(\right)$ 或者 $co_-await$ $promise.final_-suspend\left(\right)$ 把异常传播出去,也可以通过 $co_-await$ $promise.final_-suspend\left(\right)$ 调用同步结束协程执行。无论哪种方式,都会在返回给主调 / 恢复者前自动销毁协程帧。然而,这种解释也有问题。
        我希望在未来版本中能够细化并阐述清楚这种情况。不管如何,在那之前我不会从 $initial_-suspend\left(\right)$ 、$final_-suspend\left(\right)$ 或者 $unhandled_-exception\left(\right)$ 中抛出异常。敬请关注!

最终暂停点

        一旦协程体执行离开了用户定义的部分后,结果会通过 $return_-void\left(\right)$ 、$return_-value\left(\right)$ 或者 $unhandled_-exception\left(\right)$ 调用捕获,所有局部变量被析构,在返还控制权给主调 / 恢复者之前,协程可以执行一些额外的逻辑。
        这时候协程执行的是 $co_-await$ $promise.final_-suspend\left(\right);$ 语句。
        它允许协程执行一些逻辑,比如生成结果,发出完成信号或者恢复一个任务。它也允许协程选择性地在协程完成、协程帧被销毁前立即暂停。
        注意 $resume\left(\right)$ 一个在 $final_-suspend$ 点暂停的协程是未定义行为。对于这种协程,你只能调用 $destroy\left(\right)$。
        根据 Gor Nishanov 的观点,这种限制的理由是它能减少一些协程的暂停状态,以及一些潜在的分支,让编译器可以做出更好的优化。
        注意协程也可以不在 $final_-suspend$ 点暂停,只是建议你设计一个会在这个点暂停的协程。因为这样可以强制你在协程外调用 $.destroy\left(\right)$ ( 通常通过RAII对象的析构函数调用 ),并且可以方便编译器确定协程帧的生命周期是否内嵌于主调,从更让编译器更有可能优化掉协程帧的内存分配。

编译器怎么选择 promise 类型

        让我们看看编译器怎么决定协程使用的 $promise$ 类型。
        协程 $promise$ 对象的类型通过 $std$::$experimental$::$coroutine_-traits$ 类决定。
        如果你有一个协程函数签名如下:

task<float> foo(std::string x, bool flag);

        那么编译器会通过返回类型和参数类型作为 $coroutine_-traits$ 的模板参数来推导协程的 $promise$ 类型。

typename coroutine_traits<task<float>, std::string, bool>::promise_type;

        如果函数是非静态成员函数,class类型会作为第二个模板参数传递给 $coroutine_-traits$。注意如果你的方法是右值引用重载的,那么第二个模板参数也会是右值引用。
        例如,如果你有以下方法:

task<void> my_class::method1(int x) const;
task<foo> my_class::method2() &&;

        编译器会使用以下 $promise$ 类型:

// method1 promise type
typename coroutine_traits<task<void>, const my_class&, int>::promise_type;

// method2 promise type
typename coroutine_traits<task<foo>, my_class&&>::promise_type;

        $coroutine_-traits$ 模板会默认查找返回类型是否存在内嵌 $promise$ 类型,并使用其作为 $promise$ 类型,例如这样 ( 不过通过一些SFINAE魔法,如果 $RET$::$promise_-type$ 没有定义,那么 $promise$ 类型也是未定义的 ):

namespace std::experimental
{
  template <typename RET, typename... ARGS>
  struct coroutine_traits<RET, ARGS...>
  {
    using promise_type = typename RET::promise_type;
  };
}

        所以对于你可以控制的协程返回类型,你可以在里面定义一个内嵌的 $promise$ 类型,让编译器直接使用作为协程的 $promise$ 对象。
        例如:

template <typename T>
struct task
{
  using promise_type = task_promise<T>;
  ...
};

        然而,对于你无法控制的协程返回类型,你可以在不修改类型的前提下,通过指定 $coroutine_-traits$ 来控制 $promise$ 类型。
        例如,给一个返回 $std$::$optional$<$T$> 的协程定义 $promise$ 类型:

namespace std::experimental
{
  template <typename T, typename... ARGS>
  struct coroutine_traits<std::optional<T>, ARGS...>
  {
    using promise_type = optional_promise<T>;
  };
}

识别特定协程调用栈

        当你调用一个协程函数时,协程帧会被创建。为了恢复协程或者销毁协程帧,你需要一些方法识别或者引用对应的协程帧。
        协程标准通过 $coroutine_-handle$ 类型提供了这个机制。
        类型接口 ( 大致的 ) 如下:

namespace std::experimental
{
  template <typename Promise = void>
  struct coroutine_handle;

  // Type-erased coroutine handle. Can refer to any kind of coroutine.
  // Doesn't allow access to the promise object.
  template <>
  struct coroutine_handle<void>
  {
    // Constructs to the null handle.
    constexpr coroutine_handle();

    // Convert to/from a void* for passing into C-style interop functions.
    constexpr void* address() const noexcept;
    static constexpr coroutine_handle from_address(void* addr);

    // Query if the handle is non-null.
    constexpr explicit operator bool() const noexcept;

    // Query if the coroutine is suspended at the final_suspend point.
    // Undefined behavior if coroutine is not currently suspend.
    bool done() const;

    // Resume/Destroy the suspended coroutinie
    void resume();
    void destroy();
  };

  // Coroutine handle for coroutines with a known promise type.
  // Template argument must exactly match coroutine's promise type.
  template <typename Promise>
  struct coroutine_handle : coroutine_handle<>
  {
    using coroutine_handle<>::coroutine_handle;

    static constexpr coroutine_handle from_address(void* addr);

    // Access to the coroutine's promise object.
    Promise& promise() const;

    // You can reconstruct the coroutine handle from the promise object.
    static coroutine_handle from_promise(Promise& promise);
  };
}

        你可以通过两种方式获取一个协程的 $coroutine_-handle$ :

  1. $coroutine_-handle$ 会在 $co_-await$ 表达式中被传递给 $await_-suspend\left(\right)$ 。
  2. 如果你有一个协程 $promise$ 对象的引用,你可以通过 $coroutine_-handle$<$Promise$>::$from_-promise\left(\right)$ 重新构造它的 $coroutine_-handle$ 。

        挂起协程的 $coroutine_-handle$ 会在 $co_-await$ 表达式中,协程到达 <$suspend$-$point$> 被暂停后,传给 $awaiter$ 的 $await_-suspend\left(\right)$ 方法。你可以认为这个 $coroutine_-handle$ 代表 连续传递风格 ( $continuation-passing$ $style$ ) 调用协程的连续传递。
        注意 $coroutine_-handle$ 不是RAII对象。你必须手动调用 $.destroy\left(\right)$ 来销毁协程帧,释放资源。可以把它视为一个用于管理内存的 $void*$ 值。这么设计是出于性能原因:RAII会带来额外的开销,比如需要引用计数管理。
        你应该尝试使用更高级的支持协程RAII语义的类型,例如cppcoro提供的 ( 不要脸地打广告 ),或者编写一个你自己的更高级的类型,封装你协程类型的协程帧生命周期。

自定义 co_await 行为

        $promise$ 类型可以自定义出现在协程体内的每个 $co_-await$ 表达式行为。
        通过简单地定义 $promise$ 类型的 $await_-transform\left(\right)$ 方法,编译器会把协程体里的每个 $co_-await$ <$expr$> 转换成 $co_-await$ $promise.await_-transform$(<$expr$>)。
        这里有一些重要且有用的用法:
        它能挂起一些非 $awaitable$ 类型。
        例如,一个返回 $std$::$optional$<$T$> 类型的协程可能会重载 $promise$ 类型的 $await_-transform\left(\right)$ ,使用 $std$::$optional$<$U$> 并返回一个 $awaitable$ 类型,该类型会返回类型 $U$ 的值,或者在nullopt时暂停协程。

template <typename T>
class optional_promise
{
  ...

  template <typename U>
  auto await_transform(std::optional<U>& value)
  {
    class awaiter
    {
      std::optional<U>& value;
    public:
      explicit awaiter(std::optional<U>& x) noexcept : value(x) {}
      bool await_ready() noexcept { return value.has_value(); }
      void await_suspend(std::experimental::coroutine_handle<>) noexcept {}
      U& await_resume() noexcept { return *value; }
    };
    return awaiter{ value };
  }
}

        它可以让你通过删除 $await_-transform$ 方法来禁止挂起一个特定类型。
        例如,一个 $std$::$generator$<$T$> 返回类型的 $promise$ 类型可能会声明一个已删除的 $await_-transform\left(\right)$ 且接收所有类型的模板成员函数。这基本上禁止了所有在协程内的 $co_-await$ 调用。

template <typename T>
class generator_promise
{
  ...

  // Disable any use of co_await within this type of coroutine.
  template <typename U>
  std::experimental::suspend_never await_transform(U&&) = delete;
};

        它可以适配和改变一般 $awaitable$ 值的行为。
        例如,你可以定义一个协程类型,通过把 $awaitable$ 包装在 $resume_-on\left(\right)$ 操作中 ( 参考 $cppcoro$::$resume_-on\left(\right)$ ),保证协程始终在关联的 $executor$ 上的 $co_-await$ 表达式中恢复。

template <typename T, typename Executor>
class executor_task_promise
{
  Executor executor;

public:

  template <typename Awaitable>
  auto await_transform(Awaitable&& awaitable)
  {
    using cppcoro::resume_on;
    return resume_on(this->executor, std::forward<Awaitable>(awaitable));
  }
};

        作为 $await_-transform\left(\right)$ 介绍的最后一句话,要注意的是,如果 $promise$ 类型定义了 $await_-transform\left(\right)$ 成员,编译器就会把所有 $co_-await$ 替换成 $promise.await_-transform\left(\right)$ 调用。这意味着如果你想要只给某些类型定制 $co_-await$ 行为,你也需要重载默认的只转发参数的 $await_-transform\left(\right)$ 实现。

自定义 co_yield 行为

        最后一个你可以通过 $promise$ 类型自定义行为的是 $co_-yield$ 关键字。
        如果 $co_-yield$ 关键字在协程内出现,编译器会把表达式 $co_-yield$ <$expr$> 翻译成 $co_-await$ $promise.yield_-value$(<$expr$>) 。因此 $co_-yield$ 的行为可以通过定义一个或多个 $promise$ 类型的 $yield_-value\left(\right)$ 方法定制。
        注意,不像 $await_-transform$ ,如果 $promise$ 类型没有定义 $yield_-value\left(\right)$ 方法,$co_-yield$ 没有默认行为。所以如果需要禁止 $co_-await$ 表达式,$promise$ 类型需要显式删除 $await_-transform\left(\right)$ 方法,但是 $co_-yield$ 则不用。
        一种典型的 $promise$ 类型的 $yield_-value\left(\right)$ 方法实现是 $generator$<$T$> 类型:

template <typename T>
class generator_promise
{
  T* valuePtr;
public:
  ...

  std::experimental::suspend_always yield_value(T& value) noexcept
  {
    // Stash the address of the yielded value and then return an awaitable
    // that will cause the coroutine to suspend at the co_yield expression.
    // Execution will then return from the call to coroutine_handle<>::resume()
    // inside either generator<T>::begin() or generator<T>::iterator::operator++().
    valuePtr = std::addressof(value);
    return {};
  }
};

总结

        在这篇文章中,我依次介绍了编译器将函数编译成协程时的每个转换。
        希望这篇能够帮你理解怎么通过定义自己的 $promise$ 类型的方式来定制不同类型的协程行为。协程机制中有许多可以变动的部分,所以有许多种自定义行为的方式。
        然而,编译器有一个更重要的转换我还没讲——把协程体转换成状态机。不过要是讲这块的话,这篇文章就太长了,所以我把它放到了下一篇文章。请保持关注!

C++协程(2):理解co_await

        在之前的文章中,我介绍了函数和协程在高级表现的区别,但还没有讲到到C++协程标准的语法和语义。
        C++协程标准提供的一个关键能力是暂停协程,并在之后恢复。这个机制是通过 $co_-await$ 提供的。
        在揭开协程的神秘面纱前,我们需要理解 $co_-await$ 的工作方式,了解它是如何暂停和恢复协程的。在这篇文章中,我会解释 $co_-await$ 的机制,并介绍AwaitableAwaiter的概念。
        在这之前,作为背景,我想先简单回顾下协程标准。

协程标准带来了什么?

  • 三个关键字 $co_-await$ ,$co_-yield$ 和 $co_-return$
  • 一些 $std$::$experimental$ 命名空间的新类型:
    • $coroutine_-handle$<$P$>
    • $coroutine_-traits$<$Ts$…>
    • $suspend_-always$
    • $suspend_-never$
  • 一种允许库开发者与协程交互、客制化行为的机制
  • 一个让异步代码更容易编写的语言能力

        C++协程标准可以认为在语言层面提供了一个低级汇编语言协程。这些功能很难以一种安全的方式直接使用,主要是提供给库开发者来实现高级抽象,从而应用开发者可以安全使用。
        之后的语言标准 ( 希望是C++20 ) 计划加入这些低级功能,同时让标准库提供一些包装了这些功能的高级类型,让应用开发者以更方便安全的方式使用。

编译器 <-> 库交互

        有趣的是,协程标准并没有明确定义协程语义,包括协程如何把值返回给主调,$co_-return$ 如何处理返回值,如何处理传播到协程之外的异常,以及协程应该在哪个线程恢复。
        相反的,它指定了一种可以让库代码客制化协程行为的机制,通过实现特定的接口类型。编译器使用库代码生成对应的调用。这种方式很像range模式,后者可以让库开发者通过定义 $begin\left(\right)$ / $end\left(\right)$ 和 $iterator$ 类型,来客制化代码。
        协程的这种没有给机制定义特殊语义的方式,让它变成了一个强大的工具,允许库开发者实现各种类型的协程,以各种方式,出于各种目的。
        例如,你可以实现一个异步生成单个值的协程,或者一个 $Lazy$ 生成值序列的协程,或者一个遇到 $nullopt$ 就提前退出的,简化 $optional$<$T$> 值的协程。
        协程标准定义了两个接口:$Promise$ 和 $Awaitable$ 。
        $Promise$ 接口指定了协程客制化行为的方法。库开发者可以通过该接口,实现当协程被调用时的行为,协程返回时的行为 ( 包括正常返回和抛出未处理异常 ),协程内使用 $co_-await$ 或者 $co_-yield$ 的行为。
        $Awaitable$ 接口指定控制 $co_-await$ 语义的方法。当一个值是可以 $co_-await$ 的,代码就会被翻译成一系列的对 $awaitable$ 对象方法的调用。这个对象可以决定是否暂停当前协程,在暂停前执行一些逻辑用于后续的恢复,以及在恢复后生成 $co_-await$ 表达式的值。
        我会在之后的文章再介绍 $Promise$ ,现在先来看看 $Awaitable$ 接口。

Awaiter和 Awatiable:解释co_await

        $co_-await$ 运算符是一个新的一元运算符,接收一个值,比如:$co_-await$ $someValue$ 。
        $co_-await$ 运算符只能在协程上下文中使用。这有一点重言了,因为根据定义,包含 $co_-await$ 操作符的函数都应该被编译成协程。
        支持 $co_-await$ 运算符的类型被叫做 $Awaitable$ 类型。
        注意,$co_-await$ 运算符能否对一个类型使用,取决于 $co_-await$ 出现的协程上下文。协程使用的 $promise$ 类型可以通过 $await_-transform$ 方法 ( 之后介绍 ),决定 $co_-await$ 的语义。
        为了更精确,我喜欢使用术语一般 Awaitable ( $Normally$ $Awaitable$ ) 来描述一个协程上下文的 $promise$ 类型没有 $await_-transform$ 方法且支持 $co_-await$ 的协程。使用术语上下文 Awaitable来描述一个协程上下文的 $promise$ 类型存在 $await_-transform$ 方法的协程,这种类型仅在某些特别的上下文中才支持 $co_-await$ 运算符。
        一个Awaiter类型实现了 $co_-await$ 调用会时使用到的三个方法:$await_-ready$ ,$await_-suspend$ 和 $await_-resume$ 。
        注意我不要脸地从C# asnyc关键字中“借了” $Awaiter$ 这个术语。C#中这个术语指代可以通过 $GetAwaiter\left(\right)$ 方法返回类似于C++ $Awaiter$ 对象的类型,而C#的 $Awaiter$ 也与C++有着诡异的相似,具体可以看这篇文章
        注意一个类型既可以是 $Awaitable$ 类型,也可以是 $Awaiter$ 类型。
        当编译器看到 $co_-awaiter$<$expr$> 表达式时,根据类型的不同,可能会有多种行为。

获取 Awaiter

        编译器要做的第一件事就是生成从挂起对象上获取 $Awaiter$ 对象的代码。$N4680$ 在 $5.3.8\left(3\right)$ 小节中列出了一系列的获取 $awaiter$ 的步骤。
        假设挂起协程的 $promise$ 类型是 $P$ ,并且 $promise$ 在当前协程中是一个左值引用。
        如果 $promise$ 类型 $P$ ,存在 $await_-transform$ 方法,<$expr$> 会被传入 $promise$.$await_-transform$(<$expr$>) 调用来获取 $Awaitable$ 值,记为 $awaitable$ 。相反,如果 $promise$ 类型没有 $await_-transform$ 成员,我们会直接使用 <$expr$> 来作为 $Awaitable$ 对象,同样记为 $awaitable$ 。
        然后,如果 $Awaitable$ 对象 $awaitable$ 重载了 $operator$ $co_-awaiter\left(\right)$ ,那么这个函数就会被调用,来获取 $Awaiter$ 对象。否则,$awaitable$ 会被直接作为 $awaiter$ 对象使用。
        如果我们把这些规则使用函数 $get_-awaitable\left(\right)$ 和 $get_-awaiter\left(\right)$ 来编码,看起来就是这样:

template <typename P, typename T>
decltype(auto) get_awaitable(P& promise, T&& expr)
{
  if constexpr (has_any_await_transform_member_v<P>)
    return promise.await_transform(static_cast<T&&>(expr));
  else
    return static_cast<T&&>(expr);
}

template <typename Awaitable>
decltype(auto) get_awaiter(Awaitable&& awaitable)
{
  if constexpr (has_member_operator_co_await_v<Awaitable>)
    return static_cast<Awaitable&&>(awaitable).operator co_await();
  else if constexpr (has_non_member_operator_co_await_v<Awaitable&&>)
    return operator co_await(static_cast<Awaitable&&>(awaitable));
  else
    return static_cast<Awaitable&&>(awaitable);
}

挂起 Awaiter

        假设我们把逻辑封装成上面那种获取 $Awaiter$ 对象的函数,那么 $co_-await$<$expr$> 可以简单翻译成:

{
  auto&& value = <expr>;
  auto&& awaitable = get_awaitable(promise, static_cast<decltype(value)>(value));
  auto&& awaiter = get_awaiter(static_cast<decltype(awaitable)>)(awaitable);
  if (!awaiter.await_ready())
  {
    using handle_t = std::experimental::coroutine_handle<P>;

    using await_suspend_result_t =
      decltype(awaiter.await_suspend(handle_t::from_promise(p)));
    
    <suspend-coroutine>

    if constexpr (std::is_void_v<await_suspend_result_t>)
    {
      awaiter.await_suspend(handle_t::from_promise(p));
      <return-to-caller-or-resumer>
    }
    else
    {
      static_assert(
          std::is_same_v<await_suspend_result_t, bool>,
          "await_suspend() must reutrn 'void' or 'bool'.");

      if (awaiter.await_suspend(handle_t::from_promise(p)))
      {
        <return-to-caller-or-resumer>
      }
    }

    <resume-point>
  }

  return awaiter.await_resume();
}

        void版本的 $await_-suspend\left(\right)$ 无条件地在调用返回后交还控制权,而bool版本的则允许 $awaiter$ 对象有条件地在返回后恢复协程执行。
        bool版本的 $await_-suspend\left(\right)$ 在 $awaiter$ 发起一个需要异步完成的操作时十分有用。在这种情况下,当任务异步完成后,$await_-suspend$ 可以返回false,表示协程应该立即恢复并继续执行。
        在 <$suspend$-$coroutine$> ( 暂停协程 ) 处,编译器会生成一些代码来保存协程状态,并准备好被恢复。这些状态包括存储恢复点的位置,以及将一些寄存器值保存到当前协程帧内存中。
        当前协程在 <$suspend$-$coroutine\left(\right)$> 操作完成后,就被认为已暂停。协程是在 $await_-suspend$ 调用内部被暂停的,一旦被暂停,就可以在之后被恢复或者销毁。
        一旦 $await_-suspend\left(\right)$ 操作完成,就要在之后对这个协程进行恢复 ( 或者销毁 )。注意 $await_-suspend\left(\right)$ 返回false相当于在当前线程中立即恢复协程。
        $await_-ready\left(\right)$ 方法可以让你避免 <$suspend$-$coroutine$> 带来的开销,因为它可以返回操作是否可以在不需要暂停的前提下同步完成。
        在 <$return$-$to$-$caller$-$or$-$resumer$> 点,控制权会返还给主调或者恢复者,并弹出协程调用栈帧,保留协程栈帧。
        当 ( 或者 ) 执行到 <$resume$-$point$> ( 恢复点 ),暂停的协程将被恢复。例如,在通过 $await_-resume\left(\right)$ 方法获取结果之前。
        $await_-resume\left(\right)$ 方法的返回值会作为 $co_-await$ 表达式的结果。$await_-resume\left(\right)$ 方法也可以抛出一个异常,并将其传播到 $co_-await$ 表达式之外。
        注意如果一个异常传播到 $await_-suspend\left(\right)$ 调用之外,协程将在没有 $await_-resume\left(\right)$ 调用的情况下被自动恢复,并将异常传播到 $co_-await$ 表达式之外。

协程句柄

        你可能注意到了,$coroutine_-handle$<$P$> 类型会被作为参数,在 $co_-await$ 表达式中传给 $await_-suspend\left(\right)$ 调用。
        这个类型代表一个无主的协程帧句柄,可以用来恢复协程执行,或者销毁协程帧。它可以用来获取协程的 $promise$ 对象。
        $coroutine_-handle$ 类型有着如下 (省略了的) 接口:

namespace std::experimental
{
  template <typename Promise>
  struct coroutine_handle;

  template <>
  struct coroutine_handle<void>
  {
    bool done() const;

    void resume();
    void destroy();

    void* address() const;
    static coroutine_handle from_address(void* address);
  };

  template <typename Promise>
  struct coroutine_handle : coroutine_handle<void>
  {
    Promise& promise() const;

    static coroutine_handle from_promise(Promise& promise);

    static coroutine_handle from_address(void* address);
  };
}

        实现一个 $Awaitable$ 类型的时候,与 $coroutine_-handle$ 相关的关键方法是 $.resume\left(\right)$,它会在在操作完成并且想要恢复挂起中的协程执行的时候被调用。对 $coroutine_-handle$ 调用 $.resume\left(\right)$ 会在 <$resume$-$point$> 重新激活一个暂停的协程,在到达下一个 <$return$-$to$-$caller$-$or$-$resumer$> 点处返回。
        $.destroy\left(\right)$ 方法摧毁协程帧,调用作用域内的变量析构器,释放协程帧使用的内存空间。一般情况下,你不需要主动 ( 事实上真的需要避免 ) 调用 $.destroy\left(\right)$,除非你是一个正在实现协程 $promise$ 类型的库开发者。通常,协程帧会被某些协程返回的RAII类型所持有。所以再调用 $.destroy\left(\right)$ 可能导致一个双重析构的bug。
        $.promise\left(\right)$ 方法返回一个协程 $promise$ 对象的引用。然后,就像 $.destroy\left(\right)$,一般只会在需要编写 $promise$ 类型时有用。你应该把 $promise$ 对象视为协程的一个内部细节实现。对于大部分一般 $Awaitable$ 类型,你应该使用 $coroutine_-handle$<$void$> 而不是 $coroutine_-handle$<$Promise$> 作为 $await_-suspend\left(\right)$ 方法的参数。
        $coroutine_-handle$<$P$>::$from_-promise$($P$& $promise$) 函数允许从协程 $promise$ 对象引用中重新构造协程。注意你必须保证类型 $P$ 与某个正在使用的协程帧匹配,如果 $P$ 的类型继承了 $promise$ 类型,并用于构造 $coroutine_-handle$<$Base$> ,会导致一些未定义行为。
        $.address\left(\right)$ / $from_-address\left(\right)$ 函数允许把一个协程句柄转换成 $void*$ 指针,或者从一个 $void*$ 指针转换协程句柄。

不需要同步的异步代码

        $co_-await$ 的一个十分有用的设计是协程可以在被暂停后,把控制权移交给主调 / 恢复者前执行代码。
        这可以让 $Awaiter$ 对象在被暂停后发起一个异步操作,把被暂停协程的 $coroutine_-handle$ 传给该操作,并在不需要任何额外同步的前提下安全地恢复操作 ( 可能在其他线程上 )。
        例如,在 $await_-suspend\left(\right)$ 调用中,发起一个异步读操作,当协程被暂停,意味着我们可以在读操作完成后,恢复协程的执行。这一步不需要任何线程同步操作,不需要协调发起线程和执行线程的关系。

Time     Thread 1                           Thread 2
  |      --------                           --------
  |      ....                               Call OS - Wait for I/O event
  |      Call await_ready()                    |
  |      <supend-point>                        |
  |      Call await_suspend(handle)            |
  |        Store handle in operation           |
  V        Start AsyncFileRead ---+            V
                                  +----->   <AsyncFileRead Completion Event>
                                            Load coroutine_handle from operation
                                            Call handle.resume()
                                              <resume-point>
                                              Call to await_resume()
                                              execution continues....
           Call to AsyncFileRead returns
         Call to await_suspend() returns
         <return-to-caller/resumer>

        当使用这种方便的方式时,你需要小心的一点是,当把协程句柄发给其他线程时,其他线程可能在 $await_-suspend\left(\right)$ 调用返回前就尝试恢复协程执行,导致在 $await_-suspend\left(\right)$ 未完成时,协程继续执行了。
        当协程恢复后,做的第一件事是调用 $await_-resume\left(\right)$ 获取结果,然后通常立马析构 $Awaiter$ 对象 ( 例如,$await_-suspend\left(\right)$ 调用的this指针 )。然后协程可能执行直到完成,在 $await_-suspend\left(\right)$ 返回前销毁协程和 $promise$ 对象。
        所以在 $await_-suspend\left(\right)$ 方法,一旦协程有可能被另一个线程并行地恢复,你就需要避免访问this或者协程的 $.promise\left(\right)$ 对象,因为两者都可能被摧毁。总的来说,当暂停操作从发起到完成期间,只有局部变量是安全的。

与有栈协程相比

        我想换个话题,快速地与现有的其他有栈协程比较下无栈协程在暂停后执行逻辑的能力,比如Win32 fibers或者boost::context
        对于许多有栈协程来说,暂停和恢复操作被合并成了“上下文切换”操作。通过“上下文切换”操作,当前协程在暂停后没有机会去执行逻辑,只是单纯地将控制权移交给其他协程。
        这意味着,如果我们想要基于有栈协程实现一个类似于异步文件读取操作,我们需要在暂停协程之前发起操作。因此有可能操作会在线程被暂停之前就在其他线程上完成了,并等待恢复。这种潜在的其他线程完成操作和协程暂停之间的竞态,需要一些线程同步机制来选择并决定胜者。
        一种可选的方式是使用内嵌上下文 ( $tranpoline$ $context$ ),它可以通过暂停发起操作的上下文来表示发起某个操作。然而,这可能需要额外的架构,和另外的上下文切换来保证正常使用,并且成本可能大于它试图避免的同步操作的成本。

减少内存分配

        异步操作经常需要给每个操作分配状态,用于跟踪操作执行阶段。这些状态需要在操作执行过程中保留,直到操作完成才会被释放。
        例如,调用异步Win32 I/O函数需要你分配并传递一个 $OVERLAPPED$ 结构指针。主调需要保证指针在操作完成前都是有效的。
        典型的基于回调的API要求这些状态分配在堆上,并保证它们有着合适的生命周期。如果你正在执行很多操作,你可能需要给每个操作都分配和释放状态。如果这导致性能问题,你可能需要通过内存池来分配这些状态。
        然而,当使用协程时,利用局部变量在协程帧内的特性,我们可以避免堆分配存储,因为协程帧内的变量在协程被暂停时也是存活的。
        通过把 $co_-await$ 表达式中每个操作状态放到 $Awaiter$ 对象中,我们可以有效的从协程帧中“借用”内存。一旦操作完成,协程被恢复,$Awaiter$ 对象被摧毁,释放协程帧中其他局部变量使用的内存。
        最终,协程帧还是分配在堆上。然而,只需要一次堆分配,协程帧就可以被用来执行许多异步操作。
        你想一想,协程帧其实就是arena内存分配器的一种高级表现。编译器计算出所有局部变量需要的arena大小,然后零成本地分配所有局部变量!试着用传统的分配器来战胜它;)

例子:实现一个单线程同步原语

        现在我们已经讲了许多 $co_-await$ 运算符的机制,我想实现一个基本的 $awaitable$ 同步原语:一个异步手动重置 $event$,来把这些知识运用到实践上。
        这个 $event$ 的基本需求是对许多并行执行的协程来说是 $Awaitble$ 的,挂起操作会等暂停挂起协程,直到一个线程调用 $.set\left(\right)$ 方法,此时恢复所有挂起的协程。如果一个线程已经调用了 $.set\left(\right)$,协程应该在不暂停的前提下继续执行。
        理想情况下,我想要让方法noexcept,这需要避免堆分配,以及无锁实现。
        2017/11/23 编辑:增加 async_manual_reset_event 的用例
        用例看起来像是这样:

T value;
async_manual_reset_event event;

// A single call to produce a value
void producer()
{
  value = some_long_running_computation();

  // Publish the value by setting the event.
  event.set();
}


// Supports multiple concurrent consumers
task<> consumer()
{
  // Wait until the event is signalled by call to event.set()
  // in the producer() function
  co_await event;

  // Now it's safe to consume 'value'
  // This is guaranteed to 'happen after' assignment to 'value'
  std::cout << value << std::endl;
}

        让我们先想想这个 $event$ 可能的状态:$not$ $set$ 和 $set$ 。
        当处于 $not$ $set$ 状态,存在一个等待 $set$ 状态挂起协程的列表 ( 可能为空 )。
        当设置 $set$ 状态,就不应该存在挂起的协程,因为正在 $co_-await$ 这个 $event$ 的协程可以不暂停地继续执行。
        这个状态可以通过一个 $std$::$atomic$<$void*$> 表示:

  • 保存一个特殊指针,指向 $set$ 状态。我们使用 $event$ 的this指针来表示,因为它不可能跟列表中其他对象的地址相同。
  • 否则,$event$ 处于 $not$ $set$ 状态,指针值是一个单独的挂起协程的链表头。

        我们可以通过把状态存储在协程帧的 $awaiter$ 对象的方式,避免在堆上对节点进行额外分配。
        让我们实现一个类似如下的类接口:

class async_manual_reset_event
{
public:
  async_manual_reset_event(bool initiallySet = false) noexcept;

  // No copying/moving
  async_manual_reset_event(const asnyc_manual_reset_event&) = delete;
  async_manual_reset_event(async_manual_reset_event&&) = delete;
  async_manual_reset_event& operator=(const asnyc_manual_reset_event&) = delete;
  async_manual_reset_event& operator=(async_manual_reset_event&&) = delete;

  bool is_set() const noexcept;

  struct awaiter;
  awaiter operator co_await() const noexcept;

  void set() noexcept;
  void reset() noexcept;

private:

  friend struct awaiter;

  // - 'this' => set state
  // - otherwise => not set, head of linked list of awaiter*.
  mutable std::atomic<void*> m_state;
};

        这里我们给出了一个相当简单直接的接口。主要要注意的点是 $operator$ $co_-await\left(\right)$ 返回一个未定义的 $awaiter$ 类型。
        让我们先实现 $awaiter$ 。

实现 Awaiter

        首先,需要知道正在等待哪个 $async_-manual_-reset_-event$ ,所以需要一个 $event$ 引用和一个初始化它的构造器。
        它同样需要作为 $awaiter$ 链表的节点,所以也需要保存下一个 $awaiter$ 对象的指针。
        它还需要保存正在执行 $co_-await$ 的挂起协程的 $coroutine_-handle$ ,用来在 $event$ 被 $set$ 之后恢复协程。我们不关心协程的 $promise$ 类型,所以只需要使用 $coroutine_-handle$<> ( $coroutine_-handle$<$void$> 的缩写 )。
        最后,它需要实现 $Awaiter$ 接口,所以需要三个特殊方法 $await_-ready$ ,$await_-suspend$ 和 $await_-resume$ 。我们不需要 $co_-await$ 返回值,所以 $await_-resume$ 可以返回 $void$ 。
        一旦我们把所有东西放到一起,基本的 $awaiter$ 接口看起来就像这样:

struct async_manual_reset_event::awaiter
{
  awaiter(const async_manual_reset_event& event) noexcept
  : m_event(event)
  {}

  bool await_ready() const noexcept;
  bool await_suspend(std::experimental::coroutine_handle<> awaitinigCoroutine) noexcept;
  void await_resume() noexcept {};

private:
  friend class async_manual_reset_event;

  const async_manual_reset_event& m_event;
  std::experimental::coroutine_handle<> m_awaitingCoroutine;
  awaiter* m_next;
};

        当 $co_-await$ 一个 $event$ 时,我们不想让挂起协程在 $event$ 被 $set$ 的时候暂停。所以我们让 $await_-ready\left(\right)$ 在 $event$ 已经 $set$ 的时候返回true

bool async_manual_reset_event::awaiter::await_ready() const noexcept
{
  return m_event.is_set();
}

        接下来,让我们看看 $await_-suspend\left(\right)$ 方法,许多 $awaitable$ 类型的有趣行为都发生在这里。
        首先它需要在 $m_-awaitingCoroutine$ 成员中保存协程句柄,用于后续调用 $.resume\left(\right)$ 。
        保存句柄之后,需要把 $awaiter$ 自动地加入链表中。成功入队之后,如果 $event$ 还没有被设置成 $set$ 状态,返回true表示我们不希望立即恢复协程,否则返回false,表示协程应该立马恢复。

bool async_manual_reset_event::awaiter:await_suspend(
  std::experimental::coroutine_handle<> awaitingCoroutine) noexcept
{
  // Special m_state value that indicates the event is in the 'set' state.
  const void* const setState = &m_event;

  // Remember the handle of the awaiting coroutine.
  m_awaitingCoroutine = awaitingCoroutine;

  // Try to atomically push this awaiter onto the front of the list.
  void* oldValue = m_event.m_state.load(std::memory_order_acquire);
  do
  {
    // Resume immediately if already in 'set' state.
    if (oldValue == setState) return false;

    // Upload linked list to point at current head.
    m_next = static_cast<awaiter*>(oldValue);

    // Finally, try to swap the old list head, inserting this awaiter
    // as the new list head.
  } while (!m_event.m_state.compare_exchange_weak(
            oldValue,
            this,
            std::memory_order_release,
            std::memory_order_acquire));
  
  // Successfully enqueued. Remain suspend.
  return true;
}

        注意我们使用 $acquire$ 内存序读取旧状态,这样我们就可以看到所有发生在 $set\left(\right)$ 调用之前的写操作了。
        我们需要用 $release$ 内存序调用 $compare$-$exchange$ ,这样之后的 $set\left(\right)$ 就可以看到我们写入的 $m_-awaitingCoroutine$ 和更早写入的协程状态。

完成 event 类的剩余实现

        我们已经定义好了 $awaiter$ 类型,再看回来 $async_-manual_-reset_-event$ 的方法。
        首先是构造函数,它需要使用空的链表 ( 比如nullptr) 初始化为 $not$ $set$ 状态,或者初始化为 $set$ 状态 ( 比如this )。

async_manual_reset_event::async_manual_reset_event(
  bool initiallySet) noexcept
: m_state(initiallySet ? this : nullptr)
{}

        接下来,$is_-set\left(\right)$ 方法更直接,如果持有this指针,那么就是 $set$ 状态。

bool async_manual_reset_event::is_set() const noexcept
{
  return m_state.load(std::memory_order_acquire) == this;
}

        接着是 $reset\left(\right)$ 方法,如果当前是 $set$ 状态,我们需要重置为空链表的 $not$ $set$ 状态,否则不做任何事:

void async_manual_reset_event::reset() noexcept
{
  void* oldValue = this;
  m_state.compare_exchange_strong(oldValue, nullptr, std::memory_order_acquire);
}

        通过 $set\left(\right)$ 方法,我们可以用特殊的this指针与当前值交换,从而设置状态为 $set$。如果存在挂起的协程,我们需要在返回前依次恢复。

void async_manual_reset_event::set() noexcept
{
  // Needs to be 'release' so that subsequent 'co_await' has
  // visibility of our prior writes.
  // Needs to be 'acquire' so that we have visibility of prior
  // writes by awaiting coroutines.
  void* oldValue = m_state.exchange(this, std::memory_order_acq_rel);
  if (oldValue != this)
  {
    // Wasn't already in 'set' state.
    // Treat old value as head of a linked-list of waiters
    // which we have now acquired and need to resume.
    auto* waiters = static_cast<awaiter*>(oldValue);
    while (waiters != nullptr)
    {
      // Read m_next before resuming the coroutines as resuming
      // the coroutine will likely destroy the awaiter object.
      auto* next = waiters->m_next;
      waiters->m_awaitingCoroutine.resume();
      waiters = next;
    }
  }
}

        最后我们需要实现 $operator$ $co_-await\left(\right)$ 方法,只需要构造一个 $awaiter$ 对象。

async_manual_reset_event::awaiter
async_manual_reset_event::operator co_await() const noexcept
{
  return awaiter{ *this };
}

        这样我们就完成了一个无锁,无额外内存分配,noexcept实现的 $awaitable$ 的异步手动-重置 $event$ 。
        如果你想要试试这段代码,看看它在MSVCClang下的编译结果,可以看一下godbolt上的源码
        你也可以在cppcoro库 上看到这段实现,以及更多的其他的有用的 $awaitable$ 类型,例如 $async_-mutex$ 和 $async_-auto_-reset_-event$ 。

写在结尾

        这篇文章介绍了 $operator$ $co_-await$ 是怎么通过 $Awaitable$ 和 $Awaiter$ 这两个concept实现的。
        同样也讲了怎么实现一个 $awaitable$ 的异步线程同步原语,使用了 $awaiter$ 对象在协程帧上分配的优点,避免了额外的堆分配。
        我希望这篇文章可以帮助你理解新的 $co_-await$ 运算符。
        在下一篇文章中,我将介绍 $Promise$ concept,以及一个协程类型的开发者可以怎样设计协程的行为。

致谢

        这段就不翻了吧~

C++协程(1):协程理论

        这是C++ Coroutines标准 ( C++ Coroutines TS ) 系列的第一篇文章,协程作为新技术被引入C++20标准。
        在这个系列里我会介绍C++协程的底层机制,以及如何使用它们实现一些更高级的抽象,就像 cppcoro 库一样。
        在这篇文章,我会介绍函数和协程的区别,以及相应的一些他们所支持的行为的理论。文章的目标是介绍一些基础概念,帮助你理解C++协程。

协程是函数,函数是协程

        协程是一种允许暂停 ( $suspend$ ) 和恢复 ( $resume$ ) 的函数的统称。
        在解释这个含义之前,我们先复习一下一个“普通”C++函数的行为是什么。

“普通”函数

        一个普通函数具有两种行为:调用 ( $call$ ) 和返回 ( $return$ , 注意这里返回涵盖了抛出异常 )。
        调用创建一个调用栈 ( $activation$ $frame$ ),暂停主调函数,执行被调函数的第一条命令。
        返回会把返回值传回给主调,销毁调用栈,然后在主调函数的调用点处,恢复主调函数的执行。
        让我们再分析一下这些语义……

调用栈

        什么是“调用栈”?
        你可以认为调用栈是一块存储着当前函数调用的内存。这些状态包括了入参和局部变量。
        对于“普通”函数,调用栈也包含返回地址——函数返回时将跳转到的指令,和主调函数的调用栈地址。你可以认为这些信息是用来保持函数继续调用的,比如,它们描述了函数调用结束后,该继续执行哪个函数。
        对于“普通”函数,所有的调用栈都有着严格嵌套的生命周期。严格嵌套可以更有效的分配和释放每个函数调用的内存。这种数据结构也被称为“栈”。
        调用栈分配在栈上,也被称为“栈帧”。
        栈是一种十分常见的数据结构,大部分CPU架构都有个专门的寄存器来保存栈顶地址 ( 例如,X64rsp寄存器 )。
        分配一个调用栈,只需要让寄存器加上栈帧大小的值。同样的,释放一个调用栈,也只需要让寄存器减去栈帧大小。

调用

        当一个函数调用另一个函数,主调必须暂停自己。
        “暂停”一般指保存当前CPU寄存器中的值,并在之后恢复的时候重新设置这些值。保存寄存器这一步,可以是主调执行,也可以是被调执行,取决于不同的调用方式,但你可以认为它们都是调用的其中一部分。
        调用也会把入参在新的调用栈里保存一份,让被调可以访问。
        最后,主调在新调用栈里面写入恢复点的地址,并把执行权让出给被调函数。
        在X86/X64架构,最后一步有单独对应的 $call$ 指令,会把当前指令的下一条指令地址写入栈,递增栈寄存器,然后跳转到指令操作地址。

返回

        当一个函数通过 $return$ 返回,函数会先把返回值保存到主调可以访问的地方,可以在主调调用栈,也可以在当前调用栈 ( 对于跨越两个调用栈的参数和返回值,这种概念可能没有明确的区别 )。
        然后函数通过以下步骤销毁调用栈:

  • 在返回点销毁所有局部变量
  • 销毁所有入参对象
  • 释放调用占内存

        最后,通过以下步骤恢复主调执行:

  • 栈寄存器指向当前调用存储的主调调用栈地址,恢复所有当前函数可能修改的CPU寄存器
  • 跳转到之前“调用”操作存储的恢复点

        注意,与“调用”操作一样,一些“返回”操作中主调和被调的细分职责可能不一样。

协程

        协程是一种在函数的调用返回操作的基础上细分出暂停恢复销毁三种额外操作的操作行为统称。
        暂停 ( $Suspend$ ) 操作让协程在当前函数的执行点处暂停,在不销毁调用栈的前提下,将控制权交还给主调函数的操作。协程执行过程中的所有对象在暂停之后依然存活。
        注意,就像函数的返回操作一样,协程需要在预先定义好的暂停点处主动暂停。
        恢复操作将在先前的暂停点处恢复协程的执行,这将会重新激活协程的调用栈。
        销毁会在不恢复协程的前提下,销毁协程的调用栈,暂停点作用域内的所有对象也会被一并销毁。

协程调用栈

        因为协程可以在不销毁调用栈的前提下被暂停,我们也就无法保证调用栈之间是严格嵌套的了。这意味着,调用栈不再能使用栈结构分配,取而代之的,是更多的堆存储。
        C++协程标准中有些规定,允许在主调的调用栈上分配协程调用栈,只要编译器能保证协程的生命周期与主调严格嵌套。如果编译器足够智能,这种方式可以在一定程度上减少堆分配。
        协程调用栈有一部分需要在协程暂停时保留,而另一个部分只会在协程执行时被用到。例如,不在协程暂停点上的变量,这些变量可以在栈上存储。
        你可以认为协程调用栈由两部分组成:“协程帧”和“栈帧”。
        “协程帧”指代调用栈中需要在协程暂停期间保留的部分,“栈帧”则指只需要在协程执行期间存在的部分,负责在协程暂停时将控制权移交给主调或者协程的恢复者 ( $resumer$ ) 并释放。

暂停

        暂停允许一个协程在函数执行的中间点暂停执行,并将控制权移交给主调 / 协程的恢复者。
        C++协程标准定义协程内有几个指定暂停点,它们会使用 $co_-await$ 或者 $co_-yield$ 关键字标识。
        当一个协程执行到暂停点时,会通过以下几步来暂停:

  • 确保寄存器值已写入协程帧
  • 将暂停点写入协程帧,用于后续恢复操作恢复执行,或者销毁操作销毁暂停点前的变量

        一旦协程准备好被恢复,就可以认为“已暂停”。
        协程在把控制权交还给主调 / 恢复者之前,有机会执行一些额外的逻辑,返回当前协程帧的句柄,用于后续恢复或者销毁。
        在暂停后允许执行额外逻辑的能力,让协程可以在不同步的前提下恢复。否则因为暂停和恢复的竞态,这种操作可能需要同步执行。我将在后续的文章中详细讨论。
        协程可以选择立即恢复 / 继续协程执行,或者可以选择移交控制权给主调 / 恢复者。
        如果控制权被移交给主调 / 恢复者,协程调用栈的栈帧部分将被释放。

恢复

        恢复可以对一个处于“暂停”状态的协程使用。
        当一个函数恢复一个协程时,它需要快速地跳到对应函数执行的中间点。恢复者通过调用 $void$ $resume()$ 来在暂停返回的协程帧上,找到对应的指令。
        就像普通函数调用,$resume()$ 会分配一个新的调用栈,并在移交控制权之前保存主调调用栈的返回地址。
        然而,并非将移交控制权到被调函数的开始,而是从协程帧中读取最后的暂停点,并跳转到那里。
        当协程下一次暂停或者完成调用,$resume()$ 调用将会返回,恢复主调执行。

销毁

        销毁操作可以在不恢复协程执行的前提下销毁协程帧。
        这个操作只能对已暂停的协程使用。
        销毁操作就像恢复操作那样,重新激活协程调用栈,包括分配一个新的栈帧,保存主调返回地址。
        然而,并非将控制权移交到上个暂停点,而是移交到一个可选的代码路径上,调用协程暂停点之前作用域内所有局部变量的析构器,然后释放协程帧的内存。
        类似于恢复操作,销毁需要对协程暂停时返回的句柄调用 $void$ $destroy()$ 函数。

协程调用

        协程调用几乎跟普通函数调用一样,事实上,对于主调来说,它们之间毫无区别。
        然而,不像函数调用必须执行完才会返回,协程调用可以在到达暂停点处返回,并且主调可以在后续恢复。
        对协程执行调用,主调会分配一个新的栈帧,把入参、返回地址写入栈帧,移交控制权。这些步骤和普通函数调用一样。
        协程要做的第一件事是在堆上分配一个协程帧,并把入参从栈帧拷贝到协程帧,保证它们的生命周期和协程一样。

协程返回

        协程返回与普通函数有一点不同。
        当一个协程执行 $return$ 语句 ( 标准里是 $co_-return$ 语句 ) 时,他会把返回值存储到某个地方 ( 可以被协程修改的地方 ),接着销毁作用域内的局部变量 ( 不包括入参 )。
        然后协程有机会在移交控制权之前,执行一些额外的逻辑。
        额外的逻辑可能会返回值,或者恢复另一个等待返回值的协程。这些逻辑时完全客制化的。
        协程然后执行暂停 ( 协程帧继续存活 ) 或者销毁 ( 协程帧销毁 ) 操作。
        控制权在暂停 / 销毁操作后被移交给主调 / 恢复者,最后弹出栈帧。
        需要注意的是,返回操作的返回值与调用操作的返回值不一样,因为返回可能是在初始调用之后很长时间后才执行的。

例子

        为了以图像形式表达这些概念,我会通过一个简单的示例来演示协程暂停,并在之后恢复过程中的情况。
        假设函数 ( 或者协程 ) $f()$ 调用一个协程 $x(int\ \ a)$ 。
        调用前大概是这样的:

STACK                     REGISTERS               HEAP

                          +------+
+---------------+ <------ | rsp  |
|  f()          |         +------+
+---------------+
| ...           |
|               |

        然后调用 $x(42)$ ,创建一个栈帧,就像普通函数那样。

STACK                     REGISTERS               HEAP
+----------------+ <-+
|  x()           |   |
| a  = 42        |   |
| ret= f()+0x123 |   |    +------+
+----------------+   +--- | rsp  |
|  f()           |        +------+
+----------------+
| ...            |
|                |

        然后,一旦协程 $x()$ 在堆上分配协程帧,并且将入参拷贝到协程帧,我们就得到了下面的图。注意编译器一般使用一个单独的寄存器存储协程帧地址 ( 例如MSVC使用rbp寄存器 )。

STACK                     REGISTERS               HEAP
+----------------+ <-+
|  x()           |   |
| a  = 42        |   |                   +-->  +-----------+
| ret= f()+0x123 |   |    +------+       |     |  x()      |
+----------------+   +--- | rsp  |       |     | a =  42   |
|  f()           |        +------+       |     +-----------+
+----------------+        | rbp  | ------+
| ...            |        +------+
|                |

        如果协程 $x()$ 之后调用另一个普通函数 $g()$ ,就会像这样:

STACK                     REGISTERS               HEAP
+----------------+ <-+
|  g()           |   |
| ret= x()+0x45  |   |
+----------------+   |
|  x()           |   |
| coroframe      | --|-------------------+
| a  = 42        |   |                   +-->  +-----------+
| ret= f()+0x123 |   |    +------+             |  x()      |
+----------------+   +--- | rsp  |             | a =  42   |
|  f()           |        +------+             +-----------+
+----------------+        | rbp  |
| ...            |        +------+
|                |

        当 $g()$ 返回,他会销毁自己的调用栈,然后恢复 $x()$ 的调用栈。假设 $g()$ 的返回值存储在协程帧的局部变量 $b$ 中。

STACK                     REGISTERS               HEAP
+----------------+ <-+
|  x()           |   |
| a  = 42        |   |                   +-->  +-----------+
| ret= f()+0x123 |   |    +------+       |     |  x()      |
+----------------+   +--- | rsp  |       |     | a =  42   |
|  f()           |        +------+       |     | b = 789   |
+----------------+        | rbp  | ------+     +-----------+
| ...            |        +------+
|                |

        如果 $x()$ 执行到暂停点,并暂停执行,他会把控制权返回给 $f()$ 。
        这导致 $x()$ 的栈帧会弹出,但是协程帧会保留。当协程被暂停,返回值返回给主调。返回值通常是协程句柄,指向堆上已暂停的协程帧,可以用于后续的恢复。当 $x()$ 暂停后,协程帧中会存储恢复点 ( 记为 $RP$ )。

STACK                     REGISTERS               HEAP
                                        +----> +-----------+
                          +------+      |      |  x()      |
+----------------+ <----- | rsp  |      |      | a =  42   |
|  f()           |        +------+      |      | b = 789   |
| handle     ----|---+    | rbp  |      |      | RP=x()+99 |
| ...            |   |    +------+      |      +-----------+
|                |   |                  |
|                |   +------------------+

        这个句柄现在可以作为一个普通值在函数间传递。在一些点之后,可能是另一个调用栈,或者另一个线程,在某个时机,例如异步I/O完成的时候,另一个调用 $h()$ 恢复了协程。
        恢复协程的函数调用 $void$ $resume(handle)$ ,恢复了协程执行。对于主调来说,就好像调用了一个 $void$ 返回的函数。
        这会创建一个新的栈帧,记录 $resume()$ 调用的返回,激活协程栈,重新设置寄存器,在上一个暂停点处恢复 $x()$ 的执行。

STACK                     REGISTERS               HEAP
+----------------+ <-+
|  x()           |   |                   +-->  +-----------+
| ret= h()+0x87  |   |    +------+       |     |  x()      |
+----------------+   +--- | rsp  |       |     | a =  42   |
|  h()           |        +------+       |     | b = 789   |
| handle         |        | rbp  | ------+     +-----------+
+----------------+        +------+
| ...            |
|                |

总结

        这篇文章中,我把协程描述为一种不仅具有调用和返回操作,还具有暂停、恢复和销毁这三种额外操作的函数。
        我希望这对你理解协程及其控制流有帮助。
        下一篇文章我将介绍C++协程标准语言扩展的机制,以及编译器是怎么把你写的代码翻译成协程的。