回到顶部 暗色模式

C++协程(4):理解对称转移

        协程标准提供了一种有趣的方式来编写异步代码,就好像你还在写同步代码那样。你只需要在合适的点调用 $co_-await$,编译器就会负责暂停协程,在暂停点之间保存状态,并在操作完成后恢复协程执行。
        然而,协程标准,就像它最初定义的,有着很不友好的限制,一不小心就很容易导致栈溢出。并且如果想要避免这种溢出,就需要你在的 $task$<$T$> 类型中进行额外的同步开销。
        还好,2018年对协程设计进行了调整,增加了一种叫做“对称转移” ( $symmetric$ $transfer$ ) 的功能,让你可以在不消耗任何额外栈空间的前提下暂停一个协程的同时恢复另一个协程的执行。此功能的加入解决了协程标准的一个关键限制,并且允许更简单有效的,不需要任何安全方面的措施来防止栈溢出的方式来实现一个异步协程。
        在这篇文章我会试着解释栈溢出问题,以及这个关键的“对称转移”能力是怎么解决这个问题的。

首先是一些关于任务协程怎么工作的背景

        考虑如下协程:

task foo() {
  co_return;
}

task bar() {
  co_await foo();
}

        假设我们有一个简单的 $task$ 类型,当其他协程等待它时会懒执行。这个特别的 $task$ 类型没有返回值。
        让我们分析下当 $bar\left(\right)$ 执行 $co_-await$ $foo\left(\right)$ 时发生了什么。

        好,这么看来一个简单的调用包含了许多步骤。
        为了更深的理解这些步骤,让我们看看一种简单的使用协程标准设计 ( 不支持对称转移 ) 的 $task$ 类实现。

task 的大体实现

        类的实现大体是这样:

class task {
public:
  class promise_type { /* see below */ };

  task(task&& t) noexcept
  : coro_(std::exchange(t.coro_, {}))
  {}

  ~task() {
    if (coro_)
      coro_.destroy();
  }

  class awaiter { /* see below */ };

  awaiter operator co_await() && noexcept;

private:
  explicit task(std::coroutine_handle<promise_type> h) noexcept
  : coro_(h)
  {}

  std::coroutine_handle<promise_type> coro_;
};

        $task$ 独占一个指向协程帧的 $std$::$coroutine_-handle$,在协程调用时创建。$task$ 对象是RAII对象,保证当 $task$ 离开作用域时,$std$::$coroutine_-handle$ 的 $.destroy\left(\right)$ 会被调用。
        接着让我们实现 $promise$ 类型。

实现 task::promise_type

        根据以前的文章,我们知道 $promise$ 类型成员定义了协程帧中的 $Promise$ 对象类型,控制协程行为。
        首先,我们需要实现 $get_-return_-object\left(\right)$ 来构造在协程创建时返回的 $task$ 对象。这个方法只需要使用新创建的协程帧的 $std$::$coroutine_-handle$ 来初始化 $task$。
        我们可以使用 $std$::$coroutine_-handle$::$from_-promise\left(\right)$ 方法来从 $promise$ 对象获取一个句柄。

class task::promise_type {
public:
  task get_return_object() noexcept {
    return task{std::coroutine_handle<promise_type>::from_promise(*this)};
  }
}

        接着,我们想要协程在左大括号处进行初始化暂停,这样稍后可以在挂起 $task$ 的点恢复协程。
        懒启动协程有一些优点:

  1. 可以在开始协程执行前设置 $std$::$coroutine_-handle$。这意味着我们不需要使用线程同步来处理设置和协程运行之间的竞态。
  2. $task$ 析构函数可以无条件地销毁协程帧,不需要担心协程是否还在另一个线程运行,因为协程只会在主动 $await$ 时执行。并且开始后主调协程会被暂停,所以直到协程执行完成,我们才能调用 $task$ 的析构函数。这让编译器可以更好的把协程帧直接分配在主调的调用栈上,参考P0981R0了解更多关于堆分配跳过优化 ( $Heap$ $Allocation$ $eLision$ $Optimisation$, $HALO$ )。
  3. 提升协程代码的异常安全性。如果你不立即 $co_-await$ 返回的 $task$,并且做了一些其他逻辑,抛出了异常,导致栈回退,这时 $task$ 的析构函数会被调用,这样我们也可以安全的销毁协程帧,因为我们知道它还没开始运行。我们不需要处理 $detach$,悬垂引用,析构函数阻塞,进程终止或者未定义行为。这也是我在CppCon 2019 talk on Structured Concurrency中讲到的。

        为了让协程在左大括号处进行初始化暂停,我们实现了一个返回内置 $suspend_-always$ 类型的 $initial_-suspend\left(\right)$ 方法。

std::suspend_always initial_suspend() noexcept {
  return {};
}

        接着,我们需要定义 $return_-void\left(\right)$ 方法,这个方法会在执行到 $co_-return;$ 时,或者协程执行结束时被调用。这个方法并不需要做任何事,只需要声明来让编译器知道这个协程类型可以使用 $co_-return;$ 语句。

void return_void() noexcept {}

        我们也需要增加一个 $unhandled_-exception\left(\right)$ 方法,如果一个异常逃出协程体之外,这个方法会被调用。在我们的场景,可以认为 $task$ 协程体是noexcept的,如果出现异常,直接调用 $std$::$terminate\left(\right)$。

void unhandled_exception() noexcept {
  std::terminate();
}

        最后,当协程执行到右大括号时,我们想让协程在最终暂停点暂停,并恢复等待这个协程完成的另一个协程的执行。
        为了实现这个,我们需要 $promise$ 有一个数据成员来保存接着执行的协程的 $std$::$coroutine_-handle$。我们也需要定义 $final_-suspend\left(\right)$ 方法,返回一个 $awaitble$ 对象,在当前协程在最终暂停点暂停后,恢复下一个协程。
        把恢复延后到当前协程暂停后进行是很重要的,因为下一个协程可能会立即调用 $task$ 的析构函数,后者再调用协程帧的 $.destroy\left(\right)$ 。$.destroy\left(\right)$ 只对暂停协程有效,所以这时可能会导致未定义行为。
        编译器会在右大括号处插入 $co_-await$ $promise.final_-suspend\left(\right)$ 语句。
        十分要注意的是,当 $final_-suspend\left(\right)$ 被调用时,当前协程还没有暂停。在协程暂停前,我们需要等待返回的 $awaitble$ 对象 $await_-suspend\left(\right)$ 方法调用。

struct final_awaiter {
  bool await_ready() noexcept {
    return false;
  }

  void await_suspend(std::coroutin_handle<promise_type> h) noexcept {
    // The coroutine is now suspended at the final-suspend point.
    // Lookup its continuation in the promise and resume it.
    h.promise().continuation.resume();
  }

  void await_resume() noexcept {}
};

final_awaiter final_suspend() noexcept {
  return {};
}

std::coroutine_handle<> continuation;
};

        好了,$promise$ 类型完成了。最后还剩下 $task$::$operator$ $co_-await\left(\right)$。

实现 task::operator co_await()

        你可能记得理解co_await这篇文章讲的,当执行 $co_-await$ 表达式时,编译器会生成一个对 $operator$ $co_-await\left(\right)$ 的调用,如果定义了这个方法,返回对象必须同时定义 $await_-ready\left(\right)$、$await_-suspend\left(\right)$ 和 $await_-resume\left(\right)$ 方法。
        当一个协程 $await$ 一个 $task$ 时,我们希望挂起的协程总是暂停,并且在暂停后,把挂起协程的句柄保存在即将恢复的协程的 $promise$ 中,并在之后对 $task$ 的 $std$::$coroutine_-handle$ 调用 $.resume\left(\right)$ 来开始 $task$ 执行。
        因此代码会相对直接:

class task::awaiter {
public:
  bool await_ready() noexcept {
    return false;
  }

  void await_suspend(std::coroutine_handle<> continuation) noexcept {
    // Store the continuation in the task'promise so that the final_suspend()
    // knows to resume this corotuine when the task completes.
    coro_.promise().continuation = continuation;

    // Then we resume the task's coroutine, which is currently suspended
    // at the initial-suspend-point (ie. at the open curly brace).
    coro_.resume();
  }

  void await_resume() noexcept {}

private:
  explicit awaiter(std::coroutine_handle<task::promise_type> h) noexcept
  : coro_(h)
  {}

  std::coroutine_handle<task::promise_type> coro_;
};

task::awaiter task::operator co_await() && noexcept {
  return awaiter{coro_};
}

        这样就完成了 $task$ 类型功能所需的代码。
        你可以在 $Compiler$ $Explorer$ 查看完整代码:https://godbolt.org/z/-Kw6Nf

栈溢出问题

        然而,当你尝试在协程中编写循环,并且 $co_-await$ 可能在循环体内同步完成的 $task$ 时,会受到一些限制。
        例如:

task completes_synchronously() {
  co_return;
}

task loop_synchronously(int count) {
  for (int i = 0; i < count; i++) {
    co_await completes_synchronously();
  }
}

        使用上文中的 $task$ 实现,$loop_-synchronously\left(\right)$ 函数 ( 可能 ) 会在 $count$ 是 $10$、$10000$,甚至 $100'000$ 时都能正常运行。但是有些值会导致协程崩溃。
        例如,当 $count$ 是 $1'000'000$ 时就会崩溃,可以查看:https://godbolt.org/z/gy5Q8q
        崩溃的原因是栈溢出。
        为了理解栈溢出的原因,我们需要看看代码执行的时候发生了什么,尤其是栈帧上面发生了什么。
        当其他协程 $co_-await$ 了返回的 $task$ 后,$loop_-synchronously\left(\right)$ 开始执行。这将暂停挂起的协程,调用 $task$::$awaiter$::$await_-suspend\left(\right)$,后者调用 $task$ 的 $std$::$coroutine_-handle$ 的 $.resume\left(\right)$ 方法。
        因此在 $loop_-synchronously\left(\right)$ 启动时,栈看起来就像这样:

           Stack                                                   Heap
+------------------------------+  <-- top of stack   +--------------------------+
| loop_synchronously$resume    | active coroutine -> | loop_synchronously frame |
+------------------------------+                     | +----------------------+ |
| coroutine_handle::resume     |                     | | task::promise        | |
+------------------------------+                     | | - continuation --.   | |
| task::awaiter::await_suspend |                     | +------------------|---+ |
+------------------------------+                     | ...                |     |
| awaiting_coroutine$resume    |                     +--------------------|-----+
+------------------------------+                                          V
|  ....                        |                     +--------------------------+
+------------------------------+                     | awaiting_coroutine frame |
                                                     |                          |
                                                     +--------------------------+

注意:一个协程函数通常被编译器编译成两部分:

  1. “活动函数” ( $ramp$ $function$ ),处理协程帧构造、参数拷贝,$promise$ 构造和获取返回值,以及
  2. “协程体” ( $coroutine$ $body$ ),包含用户编写的逻辑。

我使用 $\$resume$ 后缀表示协程的“协程体”部分。
后面的文章会详细介绍这种分割方式。

        当 $loop_-synchronously\left(\right)$ $await$ $complete_-synchronously\left(\right)$ 返回的 $task$ 时,当前协程会被暂停,调用 $task$::$awaiter$::$await_-suspend\left(\right)$。$await_-suspend\left(\right)$ 方法再调用 $complete_-synchronously\left(\right)$ 协程句柄的 $.resume\left(\right)$ 方法。
        这会恢复 $complete_-synchronously\left(\right)$ 协程,后者会同步运行结束,并在最终暂停点暂停,调用 $task$::$promise$::$final_-awaiter$::$await_-suspend\left(\right)$ 方法,后者会调用 $loop_-synchronously\left(\right)$ 协程句柄的 $.resume\left(\right)$ 方法。
        接着当 $loop_-synchronously\left(\right)$ 协程恢复之后、$complete_-synchronously\left(\right)$ 返回的临时变量 $task$ 在分号处销毁时,我们再看看当前程序状态,栈 / 堆看起来就像这样:

           Stack                                                   Heap
+-------------------------------+ <-- top of stack
| loop_synchronously$resume     | active coroutine -.
+-------------------------------+                   |
| coroutine_handle::resume      |            .------'
+-------------------------------+            |
| final_awaiter::await_suspend  |            |
+-------------------------------+            |  +--------------------------+ <-.
| completes_synchronously$resume|            |  | completes_synchronously  |   |
+-------------------------------+            |  | frame                    |   |
| coroutine_handle::resume      |            |  +--------------------------+   |
+-------------------------------+            '---.                             |
| task::awaiter::await_suspend  |                V                             |
+-------------------------------+ <-- prev top  +--------------------------+   |
| loop_synchronously$resume     |     of stack  | loop_synchronously frame |   |
+-------------------------------+               | +----------------------+ |   |
| coroutine_handle::resume      |               | | task::promise        | |   |
+-------------------------------+               | | - continuation --.   | |   |
| task::awaiter::await_suspend  |               | +------------------|---+ |   |
+-------------------------------+               | - task temporary --|---------'
| awaiting_coroutine$resume     |               +--------------------|-----+
+-------------------------------+                                    V
|  ....                         |               +--------------------------+
+-------------------------------+               | awaiting_coroutine frame |
                                                |                          |
                                                +--------------------------+

        接着下一个要做的就是调用 $task$ 的析构函数,销毁 $complete_-synchronously\left(\right)$ 的协程帧,并递增 $count$ 变量,继续循环,再创建一个新的 $complete_-synchronously\left(\right)$ 协程并恢复它。
        事实上,这里会做的就是 $loop_-synchronously\left(\right)$ 和 $complete_-synchronously\left(\right)$ 返回递归调用对方,每次调用都会占用一部分栈空间,直到迭代够一定次数后,栈溢出并导致了未定义行为,导致程序立即崩溃。
        协程内编写循环这种看着没有任何递归的行为,却是很简单的导致函数越界的方式。
        那么,在这种原始协程标准设计之下,又该怎么解决这个问题呢?

协程标准解法

        好,那么这种无界递归问题该怎么避免呢?
        在上面的实现中,我们使用返回 $void$ 的 $await_-suspend\left(\right)$。协程标准中还有另外一个返回bool的 $await_-suspend\left(\right)$,返回true代表协程已暂停,控制权移交给 $resume\left(\right)$ 的调用者,返回false则协程立即恢复,不需要消耗任何额外的栈空间。
        所以,为了避免无界地循环递归,我们希望利用返回bool版本的 $await_-suspend\left(\right)$,让 $task$::$awaiter$::$await_-suspend\left(\right)$ 在任务同步结束时返回false,而不是递归地通过 $std$::$coroutine_-handle$::$resume\left(\right)$ 恢复协程。
        实现这种解法需要两部分:

  1. 在 $task$::$awaiter$::$await_-suspend\left(\right)$ 方法,你可以调用 $.resume\left(\right)$ 来启动协程执行。然后在 $.resume\left(\right)$ 返回时,检查协程是否执行完成。如果执行完成,我们可以返回false,表示挂起的协程应当立即恢复。如果没有完成,我们可以返回true,表示将控制权移交给 $std$::$coroutine_-handle$::$resume\left(\right)$ 的调用者。
  2. 在协程执行完成后调用的 $task$::$promise_-type$::$final_-awaiter$::$await_-suspend\left(\right)$ 方法内,我们需要检查挂起的协程是否已经 ( 或者即将 ) 从 $task$::$awaiter$::$await_-suspend\left(\right)$ 调用中返回true。如果是,调用 $.resume\left(\right)$ 来恢复它。否则我们需要避免恢复协程,并且告诉 $task$::$awaiter$::$await_-suspend\left(\right)$,让它返回false

        然而,有一个更复杂的问题,就是协程可能在当前线程开始执行、暂停,并在 $.resume\left(\right)$ 调用返回前,在其他线程恢复并执行完成。因此,我们需要解决第一部分和第二部分之间可能的并发冲突。
        我们需要使用 $std$::$atomic$ 值同步。
        现在我们可以对代码进行以下修改:

class task::promise_type {
  ...

  std::coroutine_handle<> continuation;
  std::atomic<bool> ready = false;
};

bool task::awaiter::await_suspend(
    std::coroutine_handle<> continuation) noexcept {
  promise_type& promise = coro_.promise();
  promise.continuation = continuation;
  coro_.resume();
  return !promise.ready.exchange(true, std::memory_order_acq_rel);
}

void task::promise_type::final_awaiter::await_suspend(
    std::coroutine_handle<promise_type> h) noexcept {
  promise_type& promise = h.promise();
  if (promise.ready.exchange(true, std::memory_order_acq_rel)) {
    // The coroutine did not complete synchronously, resume it here.
    promise.continuation.resume();
  }
}

        可以在 $Compiler$ $Explorer$ 上查看修改后的例子:https://godbolt.org/z/7fm8Za。注意这时当 $count$ $==$ $1'000'000$ 时,程序不再崩溃了。
        这就是 $cppcoro$::$task$<$T$> 里面实现的,规避无界递归问题的方式 ( 在某些平台也是一样 ),而且运行得很好。
        哇哦!问题解决了,吗?发布!可以吗…?

问题

        以上方法在解决递归的同时,也引入了一些问题。
        首先,它需要 $std$::$atomic$ 操作,开销可能很大。主调等待挂起的协程需要进行一次值交换,并且被调执行完成后也需要一次原子交换。如果你的程序是单线程的,那么你会在永远不需要的线程同步原子操作上花费额外的开销。
        其次,它引入了额外的分支。一个在主调,需要判断是否暂停或者立即恢复协程,另一个在被调,需要判断是否恢复下个协程或者暂停。
        注意这个额外分支的开销,甚至原子操作的开销,跟协程程序的业务逻辑相比通常不值一提。然而,协程被称为零成本抽象,并且有许多人使用协程暂停操作来避免等待L1缓存未命中问题 ( 更多细节可以查看 Gor 的 great CppCon talk on nanocoroutines )。
        最后,也可能是最重要的,它在挂起协程的恢复中引入了一些执行时不确定的值。
        假设我有以下代码:

cppcoro::static_thread_pool tp;

task foo();
{
  std::cout << "foo1 " << std::this_thread::get_id() << "\n";
  // Suspend coroutine and reschedule onto thread-poll thread.
  co_await tp.schedule();
  std::cout << "foo2 " << std::this_thread::get_id() << "\n";
}

task bar()
{
  std::cout << "bar1 " << std::this_thread::get_id() << "\n";
  co_await foo();
  std::cout << "bar2 " << std::this_thread::get_id() << "\n";
}

        在原来实现中,我们保证在 $co_-await$ $foo\left(\right)$ 执行完成后的代码会在相同的线程中内联执行。
        例如,一种可能的输出:

bar1 1234
foo1 1234
foo2 3456
bar2 3456

        然而,通过使用原子变量,$foo\left(\right)$ 执行完成可能与 $bar\left(\right)$ 的暂停之间并发冲突,在一些情况下,这意味着 $co_-await$ $foo\left(\right)$ 之后的代码可能在 $bar\left(\right)$ 启动的线程上执行。
        例如,一种可能的输出:

bar1 1234
foo1 1234
foo2 3456
bar2 1234

        对于许多用例来说,这种行为没有区别。然而,对于想要传递运行上下文的算法来说就有问题了。
        例如,$via\left(\right)$ 算法等待一些 $Awaitable$,然后在对应的 $scheduler$ 的运行上下文上返回。这个算法的一个简单版本如下:

template <typename Awaitable, typename Scheduler>
task<await_result_t<Awaitable>> via(Awaitable a, Scheduler s)
{
  auto result = co_await std::move(a);
  co_await s.schedule();
  co_return result;
}

task<T> get_value();
void consume(const T&);

task<void> consumer(static_thread_pool::scheduler s)
{
  T result = co_await via(get_value(), s);
  consume(result);
}

        最初版本的 $consume\left(\right)$ 调用总是保证在线程池 $s$ 上执行。然而,在使用原子变量后,$consume\left(\right)$ 可能在与 $scheduler$ $s$ 相关的线程上执行,也可能在 $consumer\left(\right)$ 开始执行的线程上执行。
        所以我们应该怎么在没有原子操作、额外分支和不确定的恢复上下文的前提下解决栈溢出问题呢?

进入“对称转移”

        Gor Nishanov ( $2018$ ) 的论文P0913R0《增加对称协程控制转移》,提出了这种问题的解法,通过提供一种允许一种不需要消耗任何额外栈空间的前提下,让一个协程暂停时对称恢复另一个协程的能力。
        论文提出了两个关键改变:

        所以“对称转移”是什么意思呢?
        当你调用 $std$::$coroutine_-handle$ 的 $.resume\left(\right)$ 来恢复协程,$.resume\left(\right)$ 的调用者在协程恢复后仍然是活跃的。接着,协程暂停,在暂停点调用 $await_-suspend\left(\right)$ 返回void( 无条件暂停 ) 或者true ( 有条件暂停 ) 时,$.resume\left(\right)$ 的调用才会返回。
        这可以认为是一种“对称转移”,协程执行和行为就像普通函数调用一样。$.resume\left(\right)$ 的主调可以是任意函数 ( 协程或非协程 )。当协程暂停并且 $await_-suspend\left(\right)$ 返回true或者void时,控制权会返回给 $.resume\left(\right)$ 的调用者。
        每次调用 $.resume\left(\right)$ 恢复协程时,都会创建一个新的协程的栈帧。
        然而,通过“对称转移”,我们可以简单的暂停协程,并恢复另一个协程。两个协程之间不需要显式的主被调关系,一个协程暂停时,它就可以把控制权转移给其他暂停的协程 ( 包括它自己 ),并且不需要在下次暂停或者执行完成时把控制权转移给上一个协程。
        让我们看看在 $awaiter$ 使用对称转移时,编译器会怎么处理 $co_-await$ 表达式:

{
  decltype(auto) value = <expr>;
  decltype(auto) awaitble =
      get_awaitable(promise, static_cast<decltype(value)&&>(value));
  decltype(auto) awaiter =
      get_awaiter(static_cast<decltype(awaitable)&&>(awaitable));
  if (!awaiter.await_ready())
  {
    using handle_t = std::coroutine_handle<P>;

    // <suspend-coroutine>

    auto h = awaiter.await_suspend(handle_t::from_promise(p));
    h.resume();
    // <return-to-caller-or-resumer>

    // <resume-point>
  }

  return awaiter.await_resume();
}

        让我们放大与其他 $co_-await$ 行为不同的部分:

auto h = awaiter.await_suspend(handle_t::from_promise(p));
h.resume();
// <return-to-caller-or-resumer>

        一旦协程状态机处于 $lower$ 状态 ( 另一篇文章的主题 ),<$return$-$to$-$caller$-$or$-$resumer$> 部分一般是 $return;$ 语句,这会导致上次 $.resume\left(\right)$ 的调用返回给主调。
        这意味着我们在原来 $std$::$coroutine_-handle$::$resume\left(\right)$ 调用的函数体内,又产生了一个对同一个签名的函数的调用,即 $std$::$coroutine_-handle$::$resume\left(\right)$ ,跟着一条 $return;$ 语句。
        一些编译器在开启编译优化后,能够在条件满足的时候,把函数结尾的调用 ( 即返回之前 ) 转变为尾调用 ( $tail$-$calls$ )。
        而这种优化正是我们想要的,可以避免栈溢出问题的方式。但是与其期望优化器来决定进行尾调用转移,我们更想要确保这种优化一直有效,即使是未开启优化的情况。
        但是首先,我们先了解下尾调用的含义。

尾调用

        尾调用意味着在当前栈帧弹出前,把当前函数的返回地址修改为被调的返回地址 ( 即被调直接返回给当前函数的主调 )。
        在X86/X64架构,这通常意味着编译器生成的代码会先弹出当前栈帧,然后调用 $jmp$ 指令来跳转到被调函数入口,而不是使用 $call$ 指令,并在 $call$ 返回后才弹出当前栈帧。
        这个优化通常只会在有限情况下发生,然而:
        特别的,它需要:

        $co_-await$ 对称转移形式的设计使得它刚好能满足所有需求。我们一个个来看。
        调用机制:当编译器把协程 $lower$ 成状态机时,它实际上分成两部分:活跃部分 ( $ramp$,分配和初始化协程框架 ) 和主体 ( 包含用户编写的协程体的状态机 )。
        协程的函数签名 ( 以及所有用户指定的调用机制 ) 只会影响活跃函数部分,主体部分则受编译器控制,永远不会被用户代码调用,只能通过 $ramp$ 函数和 $std$::$coroutine_-handle$::$resume\left(\right)$ 调用。
        协程体部分的调用机制是用户不可见的,完全由编译器决定,因此它可以选择一种合适的支持尾调用的机制,并被所有协程体使用。
        返回类型相同:源和目的协程的 $.resume\left(\right)$ 方法的返回值都是void,所以这个需求也能满足。
        没有非默认析构函数:当执行尾调用时,我们需要在调用目标函数前释放当前栈帧,这需要所有栈上对象的生命周期都在调用结束之前。
        一般情况下,一旦作用域内有对象的析构函数非默认,这些对象会在协程暂停的时候存活,所以它们会被分配在协程帧上,而非栈帧上。
        局部变量不需要在协程暂停时存活,可能分配在栈帧上,它们的生命周期会在协程下次暂停时结束。
        因此栈分配对象不存在需要在尾调用返回之前调用的非默认析构函数。
        调用不在try/catch块内:这个可能有点 trick,因为每个协程内都有个显式的try/catch块,包裹用户编写的协程体。
        根据规范,协程被定义为:

{
  promise_type promise;
  co_await promise.initial_suspend();
  try { F; }
  catch (...) { promise.unhandled_exception(); }
final_suspend:
  co_await promise.final_suspend();
}

        $F$ 就是用户编写的协程体部分。
        因此所有用户编写的 $co_-await$ 表达式 ( 除了 $initial$/$final_-suspend$ ) 都在try/catch块的上下文中。
        然而,实现上会把 $.resume\left(\right)$ 执行放到try/catch块的外部。
        我希望在另一篇文章讲协程怎么变成状态机的文章中讲解更多这方面的细节 ( 这篇文章已经够长了 )。

注意,然而,当前C++规范对实现这个操作需求的描述不够清晰,并且只是一个非规范注释,暗示这可能是必须的。希望我们将来能够修复这个规范。

        那么我们知道了执行对称转移的协程已经满足了所有尾调用的需求。编译器会确保它永远是尾调用,无论是否开启了优化。
        这意味着通过使用 $await_-suspend\left(\right)$ 返回的 $std$::$coroutine_-handle$,我们可以暂停当前协程,并在不消耗额外栈空间的前提下,把控制权转移给另一个协程。
        这让我们可以编写任意深度的相互递归调用代码,不需要担心栈溢出。
        这就是我们需要修复的 $task$ 的实现。

再看task

        有了“对称转移”能力,我们再来修复 $task$ 类型实现。
        我们要修改两个 $await_-suspend\left(\right)$ 方法的实现:

        为了指明 $await$ 的方向,我们需要把 $task$::$awaiter$ 方法从:

void task::awaiter::await_suspend(
    std::coroutine_handle<> continuation) noexcept {
  // Store the continuation in the task's promise so that the final_suspend()
  // knows to resume this coroutine when the task completes.
  coro_.promise().continuation = continuation;

  // Then we resume the task's coroutine, which is currently suspended
  // at the initial-suspend-point (ie. at the open curly brace).
  coro_.resume();
}

        改成:

std::coroutine_handle<> task::awaiter::await_suspend(
    std::coroutine_handle<> continuation) noexcept {
  // Store the continuation in the task's promise so that the final_suspend()
  // knows to resume this coroutine when the task completes.
  coro_.promise().continuation = continuation;


  // Then we tail-resume the task's coroutine, which is currently suspended
  // at the initial-suspend-point (ie. at the open curly brace), by returning
  // its handle from await_suspend().
  return coro_;
}

        同时为了指明返回路径,我们需要把 $task$::$promise_-type$::$final_-awaiter$ 方法从:

void task::promise_type::final_awaiter::await_suspend(
    std::coroutine_handle<promise_type> h) noexcept {
  // The coroutine is now suspended at the final-suspend point.
  // Lookup its continuation in the promise and resume it.
  h.promise().continuation.resume();
}

        改成:

std::coroutine_handle<> task::promise_type::final_awaiter::await_suspend(
    std::coroutine_handle<promise_type> h) noexcept {
  // The coroutine is now suspended at the final-suspend point.
  // Lookup its continuation in the promise and resume it symmetrically.
  return h.promise().continuation;
}

        这样我们就有了一个不需要担心栈溢出问题的 $task$ 实现,并且 $await_-suspend$ 是void返回的,没有bool返回带来的不确定恢复上下文问题。

观察栈

        现在让我们看看最初的例子:

task completes_synchronously() {
  co_return;
}

task loop_synchronously(int count) {
  for (int i = 0; i < count; ++i) {
    co_await completes_synchronously();
  }
}

        当其他协程 $co_-await$ $task$ 时, $loop_-synchronously\left(\right)$ 协程首次开始执行。这会触发其他协程的对称转移,并通过 $std$::$coroutine_-handle$::$resume\left(\right)$ 调用恢复。
        因此当 $loop_-synchronously\left(\right)$ 开始时,栈看起来会像这样:

           Stack                                                Heap
+---------------------------+  <-- top of stack   +--------------------------+
| loop_synchronously$resume | active coroutine -> | loop_synchronously frame |
+---------------------------+                     | +----------------------+ |
| coroutine_handle::resume  |                     | | task::promise        | |
+---------------------------+                     | | - continuation --.   | |
|     ...                   |                     | +------------------|---+ |
+---------------------------+                     | ...                |     |
                                                  +--------------------|-----+
                                                                       V
                                                  +--------------------------+
                                                  | awaiting_coroutine frame |
                                                  |                          |
                                                  +--------------------------+

        现在,当执行 $co_-await$ $completes_-synchronously\left(\right)$ ,它会对称转移到 $completes_-synchronously$ 协程。
        通过:

        在 $completes_-synchronously$ 恢复后,栈看起来会像这样:

              Stack                                          Heap
                                            .-> +--------------------------+ <-.
                                            |   | completes_synchronously  |   |
                                            |   | frame                    |   |
                                            |   | +----------------------+ |   |
                                            |   | | task::promise        | |   |
                                            |   | | - continuation --.   | |   |
                                            |   | +------------------|---+ |   |
                                            `-, +--------------------|-----+   |
                                              |                      V         |
+-------------------------------+ <-- top of  | +--------------------------+   |
| completes_synchronously$resume|     stack   | | loop_synchronously frame |   |
+-------------------------------+ active -----' | +----------------------+ |   |
| coroutine_handle::resume      | coroutine     | | task::promise        | |   |
+-------------------------------+               | | - continuation --.   | |   |
|     ...                       |               | +------------------|---+ |   |
+-------------------------------+               | task temporary     |     |   |
                                                | - coro_       -----|---------`
                                                +--------------------|-----+
                                                                     V
                                                +--------------------------+
                                                | awaiting_coroutine frame |
                                                |                          |
                                                +--------------------------+

        注意这里栈帧数量增长了。
        在 $completes_-synchronously$ 协程完成,执行到右大括号处,它会执行 $co_-await$ $promise.final_-suspend\left(\right)$。
        这会暂停协程,并调用 $final_-awaiter$::$await_-suspend\left(\right)$ ,后者返回下个协程的 $std$::$coroutine_-handle$ ( 即指向 $loop_-synchronously$ 协程的句柄 )。这会触发一次对称转移 / 尾调用来恢复 $loop_-synchronously$ 协程。
        $loop_-synchronously$ 恢复后的栈看起来就像这样:

           Stack                                                   Heap
                                                   +--------------------------+ <-.
                                                   | completes_synchronously  |   |
                                                   | frame                    |   |
                                                   | +----------------------+ |   |
                                                   | | task::promise        | |   |
                                                   | | - continuation --.   | |   |
                                                   | +------------------|---+ |   |
                                                   +--------------------|-----+   |
                                                                        V         |
+----------------------------+  <-- top of stack   +--------------------------+   |
| loop_synchronously$resume  | active coroutine -> | loop_synchronously frame |   |
+----------------------------+                     | +----------------------+ |   |
| coroutine_handle::resume() |                     | | task::promise        | |   |
+----------------------------+                     | | - continuation --.   | |   |
|     ...                    |                     | +------------------|---+ |   |
+----------------------------+                     | task temporary     |     |   |
                                                   | - coro_       -----|---------`
                                                   +--------------------|-----+
                                                                        V
                                                   +--------------------------+
                                                   | awaiting_coroutine frame |
                                                   |                          |

        $loop_-synchronously$ 协程被恢复后,在执行到达分号后,要做的第一件事就是调用 $completes_-synchronously$ 调用返回的临时 $task$ 的析构函数。这会销毁协程帧,释放内存,这时情况会是这样:

           Stack                                                   Heap
+---------------------------+  <-- top of stack   +--------------------------+
| loop_synchronously$resume | active coroutine -> | loop_synchronously frame |
+---------------------------+                     | +----------------------+ |
| coroutine_handle::resume  |                     | | task::promise        | |
+---------------------------+                     | | - continuation --.   | |
|     ...                   |                     | +------------------|---+ |
+---------------------------+                     | ...                |     |
                                                  +--------------------|-----+
                                                                       V
                                                  +--------------------------+
                                                  | awaiting_coroutine frame |
                                                  |                          |
                                                  +--------------------------+

        我们现在返回到 $loop_-synchronously$ 协程的执行,并且栈帧和协程帧数量与开始执行时一样,并且之后每次循环也是一样。
        因此我们可以执行许多次迭代,只消耗常数级的存储空间。
        完整的对称转移版本的 $task$ 类型可以看以下的 $Compiler$ $Explorer$ 链接:https://godbolt.org/z/9baieF

通用形式 await_suspend 的对称转移

        既然我们已经见识过了对称转移形式的 $awaitable$ $concept$ 的能力和重要性,我想向你展示一下通用形式,理论上可以替换voidbool返回形式的 $await_-suspend\left(\right)$。
        但首先,我们需要看一下P0913R0提案添加的新的协程设计:$std$::$noop_-coroutine\left(\right)$。

循环终止

        通过对称转移形式的协程,每次协程暂停,它都会对称恢复另一个协程。只要你有其他协程可以恢复,它就非常有用。但是有时我们不想要执行其他协程,只需要暂停并把控制权返回给 $std$::$coroutine_-handle$::$resume\left(\right)$ 的主调。
        voidbool返回形式的 $await_-suspend\left(\right)$ 都允许协程暂停并从 $std$::$coroutine_-handle$::$resmue\left(\right)$ 调用中返回,那么我们怎么让对称转移形式的协程返回呢?
        答案是使用内置的特殊 $std$::$coroutine_-handle$,称为无操作协程句柄 ( $noop$ $coroutine$ $handle$ ),通过函数 $std$::$noop_-coroutine\left(\right)$ 生成。
        无操作协程句柄之所以叫这个名字,是因为它的 $.resume\left(\right)$ 实现是立即返回,即恢复协程后没有操作。一般它的实现包含一条简单的 $ret$ 指令。
        如果 $await_-suspend\left(\right)$ 方法返回 $std$::$noop_-coroutine\left(\right)$ 句柄,那么协程不会把控制权转移给下一个协程,而是转移给 $std$::$coroutine_-handle$::$resume\left(\right)$ 的主调。

其他风格的 await_suspend() 表示方式

        有了这个信息,我们再看其他风格的 $await_-suspend\left(\right)$ 如何使用对称转移。
        我们有void返回格式的:

void my_awaiter::await_suspend(std::coroutine_handle<> h) {
  this->coro = h;
  enqueue(this);
}

        可以改成bool返回格式的:

bool my_awaiter::await_suspend(std::coroutine_handle<> h) {
  this->coro = h;
  enqueue(this);
  return true;
}

        也可以再改成对称转移风格:

std::noop_coroutine_handle my_awaiter::await_suspend(
    std::coroutine_handle<> h) {
  this->coro = h;
  enqueue(this);
  return std::noop_coroutine();
}

        我们有bool返回格式的:

bool my_awaiter::await_suspend(std::coroutine_handle<> h) {
  this->coro = h;
  if (try_start(this)) {
    // Operation will complete asynchronously.
    // Return true to transfer execution to caller of
    // coroutine_handle::resume().
    return true;
  }

  // Operation completed asynchronously.
  // Return false to immediately resume the current coroutine.
  return false;
}

        可以改成对称转移格式的:

std::coroutine_handle<> my_awaiter::await_suspend(std::coroutine_handle<> h) {
  this->coro = h;
  if (try_start(this)) {
    // Operation will complete asynchronously.
    // Return std::noop_coroutine() to transfer execution to caller of
    // coroutine_handle::resume().
    return std::noop_coroutine();
  }

  // Operation completed asynchronously.
  // Return current coroutine's handle to immediately resume
  // the current coroutine.
  return h;
}

为什么有三种风格?

        那么为什么在有对称转移风格的前提下,我们还继续使用voidbool返回风格的 $await_-suspend\left(\right)$ 呢?
        一部分历史原因,一部分实用性原因,一部分性能原因。
        $await_-suspend\left(\right)$ 返回 $std$::$noop_-coroutine_-handle$ 可以完全替代void返回版本,因为这两种对于编译器来说,都表示协程会无条件地把控制权转移给 $std$::$coroutine_-handle$::$resume\left(\right)$ 的主调。
        在我看来,它还能留下来的原因,一部分是它在对称转移被提出之前就已经在使用了,另一部分是因为void返回对于无条件暂停的情况可以少写一点代码。
        bool返回版本,在某些情况下,可以优化得比对称转移形式的更好。
        假设我们有一个bool返回的 $await_-suspend\left(\right)$,定义在另一个编译单元。这时编译器可以在挂起协程生成代码,暂停当前协程并在 $await_-suspend\left(\right)$ 调用返回之后,通过执行下一块代码的方式来有条件地恢复它。如果 $await_-suspend\left(\right)$ 返回false,那么就可以明确知道需要执行哪段代码。
        即使有了对称转移风格,我们还是需要表示相同的结果:返回主调 / 恢复者,或者恢复当前协程。相比返回true或者false,我们需要返回 $std$::$noop_-coroutine\left(\right)$ 或者当前协程的句柄。我们可以把这些句柄都统一成 $std$::$coroutine_-handle$<$void$> 返回。
        然而,因为 $await_-suspend\left(\right)$ 定义在其他编译单元,编译器看不到协程返回的句柄指向什么,所以当协程恢复后,它需要一些更重的间接调用,并且相比bool返回的单个分支而言,可能会有更多的恢复协程分支。
        有可能在将来的某一天,对称转移版本可以获得同等性能。例如,我们可以编写内联的 $await_-suspend\left(\right)$,但是调用一个bool返回的非内联方法,并有条件地返回合适的句柄。
        例如:

struct my_awaiter {
  bool await_ready();

  // Compiler should in-theory be able to optimise this to the same
  // as the bool-returning version, but currently don't do this optimisation.
  std::coroutine_handle<> await_suspend(std::coroutine_handle<> h) {
    if (try_start(h)) {
      return std::noop_coroutine();
    } else {
      return h;
    }
  }

  void await_resume();

private:
  // This method is defined out-of-line in a seperate translation unit.
  bool try_start(std::coroutine_handle<> h);
}

        然而,现在的编译器 ( 比如Clang 10 ) 还不能把这种情况优化成和bool返回版本一样高效的代码。话虽如此,除非你编写了一个非常紧凑的循环,否则可能不会注意到它们的差异。
        到目前为止,通用的规则是:

补充

        C++ 20新加的对称转移能力,使得协程递归恢复对方变得简单,不需要担心栈溢出。这个能力是编写高效、安全的异步协程类型的关键,就像 $task$ 那样。
        这篇关于对称转移的文章比预期的要长,十分感谢你坚持读完了它!希望能帮到你。
        在下篇文章,我会讲解编译器怎么把协程函数转换成状态机。

致谢

        也是不翻了~

C++协程(4):理解对称转移