回到顶部 暗色模式
recent
C++协程(5):理解编译器转换

无栈协程介绍,翻译自 https://lewissbaker.github.io/

2025 Apr 23 23:34
Tags: C&C++
C++协程(4):理解对称转移

无栈协程介绍,翻译自 https://lewissbaker.github.io/

2024 Jan 28 23:45
Tags: C&C++
C++协程(3):理解promise

无栈协程介绍,翻译自 https://lewissbaker.github.io/

2024 Jan 27 16:19
Tags: C&C++
C++协程(2):理解co_await

无栈协程介绍,翻译自 https://lewissbaker.github.io/

2024 Jan 25 00:00
Tags: C&C++
C++协程(1):协程理论

无栈协程介绍,翻译自 https://lewissbaker.github.io/

2024 Jan 23 23:59
Tags: C&C++
μblog

C++协程(5):理解编译器转换

介绍

        之前的“理解C++协程”文章中讲到了编译器会对协程,以及它的 $co_-await$、$co_-yield$、$co_-return$ 表达式执行的不同类型的转换。这些文档描述了编译器如何将每个表达式翻译成底层的对多种自定义的点 / 用户定义的方法调用。

  1. 协程理论
  2. 理解 $co_-await$
  3. 理解 $promise$
  4. 理解对称转移

        然而,这些描述还有一部分你可能不太满意。它们对于“挂起点”的概念一笔带过,含糊地说成“协程在这里挂起”、“协程在这里恢复”,并没有详细说明它的含义,以及编译器是怎么实现的。
        在这篇文章,我会把之前文章的所有概念更深入地讲解。我会把一个协程转换为底层等价的非协程、命令式的C++代码,来展示当协程执行到挂起点时发生了什么。
        注意我不会描述某个特定编译器怎么把协程编译成机器码 ( 编译器在这方面会有些额外技巧 ),而是只描述一种可能的把协程转化成可移植的C++代码的方式。
        警告:这次讨论会有点深!

设置场景

        对于初学者,我们假设有一个基本的 $task$ 类型,同时作为 $awaitable$ 和协程的返回类型。为了简化,假设这个协程类型异步生成一个 $int$ 类型结果。
        在这篇文章,我们会描述怎么把下面的协程代码转换为不包含任何协程关键字 $co_-await$、$co_-return$的C++代码,以便我们更好理解它的含义。

// Forward declaration of some other function. Its implementation is not relevant.
task f(int x);

// A simple coroutine that we are going to translate to non-C++ code
task g(int x) {
  int fx = co_await f(x);
  co_return fx * fx;
}

定义 task 类型

        首先,我们声明一个会被用到的 $task$ 类。
        为了理解协程怎么转化成底层代码,我们不需要明白这个类型的方法定义。对它们的调用会被插入转换中。
        这些方法的定义并不复杂,我会把它们作为读者理解以前文章的练习实践。

class task {
public:
  struct awaiter;

  class promise_type {
  public:
    promise_type() noexcept;
    ~promise_type();

    struct final_awaiter {
      bool await_ready() noexcept;
      std::coroutine_handle<> await_suspend(
        std::coroutine_handle<promise_type> h) noexcept;
      void await_resume() noexcept;
    };

    task get_return_object() noexcept;
    std::suspend_always initial_suspend() noexcept;
    final_awaiter final_suspend() noexcept;
    void unhandled_exception() noexcept;
    void return_value(int result) noexcept;
  
  private:
    friend task::awaiter;
    std::coroutine_handle<> continuation_;
    std::variant<std::monostate, int, std::exception_ptr> result_;
  };

  task(task&& t) noexcept;
  ~task();
  task& operator=(task&& t) noexcept;

  struct awaiter {
    explicit awaiter(std::coroutine_handle<promise_type> h) noexcept;
    bool awaite_ready() noexcept;
    std::coroutine_handle<promise_type> await_suspend(
      std::coroutine_handle<> h) noexcept;
    int await_resume();
  private:
    std::coroutine_handle<promise_type> coro_;
  };

  awaiter operator co_await() && noexcept;

private:
  explicit task(std::coroutine_handle<promise_type> h) noexcept;

  std::coroutine_handle<promise_type> coro_;
};

        这个 $task$ 类型的定义与理解 $promise$的相似。

第一步:确定 promise 类型

task g(int x) {
  int fx = co_await f(x);
  co_return fx * fx;
}

        当编译器看到这个函数包含三个协程关键字 ( $co_-await$、$co_-yield$、$co_-return$ ) 中的一个时,它开始执行协程转换步骤。
        第一步是确定协程的 $promise$ 类型。
        这一步是通过把函数签名的返回类型和参数类型作为 $std$::$coroutine_traits$ 类型的模板参数判断的。
        例如,我们的函数 $g$,返回类型为 $task$,有一个参数类型是 $int$,编译器会使用 $std$::$coroutine_-traits$<$task$, $int$>::$promise_-type$ 判断。
        让我们定义个别名,方便以后使用。

using __g_promise_t = std::coroutine_traits<task, int>::promise_type;

        注意:我使用了两个下划线开始的类型名,表示这个符号是编译器内部生成的。这种符号是实现时保留的,并且不应该在你的代码中使用。
        现在,因为我们没有特例化 $std$::$coroutine_-traits$,这会导致主模板实例化,而主模板只是把内嵌的 $promise_-type$ 作为返回类型的 $promise_-type$ 的别名,在我们的例子中也就是解析为 $task$::$promise_-type$。

第二步:创建协程状态

        一个协程函数需要在挂起时保存协程状态、参数和局部变量,以便在后续恢复时保持可用。
        这个状态,在C++标准中,被称为协程状态,通常分配在堆上。
        让我们开始给协程 $g$ 定义个协程状态结构。

struct __g_state {
  // to be filled out
};

        协程状态包含了一系列不同的东西:

  • $promise$ 对象
  • 所有函数参数的副本
  • 协程当前挂起所在的挂起点信息,以及如何恢复 / 销毁它
  • 所有局部变量 / 跨挂起点的临时变量存储

        让我们加上 $promise$ 对象和参数副本存储。

struct __g_state {
  int x;
  __g_promise_t __promise;

  // to be filled out
};

        接着,我们加个构造函数来初始化这些数据成员。
        回想一下,编译器会先尝试使用参数副本的左值引用来调用 $promise$ 的构造函数,如果不行,就会调用 $promise$ 的默认构造函数。
        让我们创建一个简单的 $helper$ 来帮忙:

template<typename Promise, typename... Params>
Promise construct_promise([[maybe_unused]] Params&... params) {
  if constexpr (std::constructible_from<Promise, Params&...>) {
    return Promise(params...);
  } else {
    return Promise();
  }
}

        因此,协程状态构造器看起来可能会像这样:

struct __g_state {
  __g_state(int&& x)
  : x(static_cast<int&&>(x))
  , __promise(construct_promise<__g_promise_t>(this->x))
  {}

  int x;
  __g_promise_t __promise;
  // to be filled out
};

        现在我们有了一个表示协程状态的类型雏形,我们可以通过在堆上分配一个 __$g_-state$ 实例,传递函数参数给它拷贝 / 移动的方式,构建出 $g\left(\right)$ 底层实现的雏形。
        一些术语:我使用“启动函数” ( $ramp$ $function$ ) 指代协程初始化协程状态和准备开始执行协程的逻辑,就像一条进入协程体执行的坡道。

task g(int x) {
  auto* state = new __g_state(static_cast<int&&>(x));
  // ... implement rest of the ramp function
}

        注意我们的 $promise$ 类型没有重载 $operator$ $new$,所以我们会调用全局的 ::$operator$ $new$。
        如果 $promise$ 类型重载了 $operator$ $new$,我们不会调用全局的 ::$operator$ $new$,而是先确认参数列表 $\left(size, paramLvalues…\right)$ 是否可以作为 $operator$ $new$ 的参数,如果可以的话,就使用这个参数列表调用;否则,我们只会使用 $\left(size\right)$ 参数列表调用。$operator$ $new$ 的访问协程函数参数列表的能力有时被称为“参数预览” ( $parameter$ $preview$ ),在你想要使用 $allocator$ 作为参数来给协程状态分配空间时很有用。
        如果编译器发现 __$g_-promise_-t$::$operator$ $new$ 的实现,逻辑会被转换为以下:

template<typename Promise, typename... Args>
void* __promise_allocate(std::size_t size, [[maybe_unused]] Args&... args) {
  if constexpr (requires { Promise::operator new(size, args...); }) {
    return Promise::operator new(size, args...);
  } else {
    return Promise::operator new(size);
  }

task g(int x) {
  void* state_mem = __promise_allocate<__g_promise_t>(sizeof(__g_state), x);
  __g_state* state;
  try {
    state = ::new (state_mem) __g_state(static_cast<int&&>(x));
  } catch (...) {
    __g_promise_t::operator delete(state_mem);
    throw;
  }
  // ... implement rest of the ramp function
}
}

        同样,这个 $promise$ 类型没有定义静态函数成员 $get_-return_-object_-on_-allocation_-failure\left(\right)$。如果 $promise$ 类型定义了这个函数,这里的分配会使用 $std$::$nothrow_-t$ 形式的 $operator$ $new$,并且当分配返回 $nullptr$ 时返回 __$g_-promise_-t$::$get_-return_-object_-on_-allocation_-failure\left(\right)$。
        即,看起来会像这样:

task g(int x) {
  auto* state = ::new (std::nothrow) __g_state(static_cast<int&&>(x));
  if (state == nullptr) {
    return __g_promise_t::get_return_object_on_allocation_failure();
  }
  // ... implement rest of the ramp function
}

        为了简化剩下的例子,我们使用最简单的调用全局 ::$operator$ $new$ 内存分配函数。

第三步:调用 get_return_object()

        启动函数要做的下一件事是调用 $promise$ 对象的 $get_-return_-object\left(\right)$ 方法,获取启动函数的返回值。
        返回值会作为局部变量存储,并在启动函数结束返回 ( 在其他步骤完成后 )。

task g(int x) {
  auto* state = new __g_state(static_cast<int&&>(x));
  decltype(auto) return_value = state->__promise.get_return_object();
  // ... implement rest of ramp function
  return return_value;
}

        然而,$get_-return_-object\left(\right)$ 的调用可能抛出异常,这时我们想要释放协程状态分配的内存。一种比较好的方法是使用 $std$::$unique_-ptr$ 来管理,这样在后续操作抛出异常时,它会被释放:

task g(int x) {
  std::unique_ptr<__g_state> state(new __g_state(static_cast<int&&>(x)));
  decltype(auto) return_value = state->__promise.get_return_object();
  // ... implement rest of ramp function
  return return_value;
}

第四步:初始化挂起点

        启动函数在调用 $get_-return_-object\left(\right)$ 之后的下一件事是开始执行协程体,协程体执行的第一件事是初始化挂起点,即等价于 $co_-await$ $promise.initial_-suspend\left(\right)$。
        现在,理想的话,我们只需要让协程初始化挂起,然后实现恢复一个初始化挂起协程的启动。然而,初始化挂起点在处理协程以及协程状态的生命周期问题上,有一些奇怪的细节。这些是C++20发布前对初始化挂起点语义的一些后期调整,目的是修复一些存在的问题。
        根据初始化挂起点的定义,如果一个异常从这些地方抛出:

  • $initial_-suspend\left(\right)$ 的调用,
  • 对返回的 $awaitable$ 的 $operator$ $co_-await\left(\right)$ ( 如果有定义 ),
  • 调用 $awaiter$ 的 $await_-ready\left(\right)$,或者
  • 调用 $awaiter$ 的 $await_-suspend\left(\right)$

        那么异常会传播到启动函数的调用方,协程状态会被自动销毁。
        如果一个异常从以下地方抛出:

  • $await_-resume\left(\right)$ 调用,
  • $operator co_-await\left(\right)$ 返回对象的析构函数 ( 如果可用 ),或者
  • $initial_-suspend\left(\right)$ 返回对象的析构函数

        那么异常会被协程体捕获,然后调用 $promise.unhandled_-exception\left(\right)$。
        这意味着我们需要注意这部分的转换,一部分需要实现在启动函数,另一部分需要放在协程体。
        而且,因为 $initial_-suspend\left(\right)$ 返回的对象和 $operator$ $co_-await$ 返回的对象 ( 可选的 ) 具有跨挂起点的生命周期 ( 它们会在协程挂起点前创建,恢复后销毁 ),这些对象的存储需要放在协程状态里。
        在我们特别的例子中,$initial_-suspend\left(\right)$ 返回类型是 $std$::$suspend_-always$,后者是一个空的、平凡构造类型。然而,逻辑上我们还是需要在协程状态里保存这个对象的实例,所以我们会一直加上这部分存储,展示它如何工作。
        这个对象只在调用 $initial_-suspend\left(\right)$ 的位置构造,所以我们需要新增一个确定类型的数据成员,允许我们显示地控制它的生命周期。
        为了支持这些功能,我们先定义一个 $helper$ 类 $manual_-lifetime$,后者具有平凡的构造和析构函数,我们会在需要的时候显式地构造 / 析构存储在其中的值:

template<typename T>
struct manual_lifetime {
  manual_lifetime() noexcept = default;
  ~manual_lifetime() = default;

  // Not copyable/movable
  manual_lifetime(const manual_lifetime&) = delete;
  manual_lifetime(manual_lifetime&&) = delete;
  manual_lifetime& operator=(const manual_lifetime&) = delete;
  manual_lifetime& operator=(manual_lifetime&&) = delete;

  template<typename Factory>
    requires
      std::invocable<Factory&> &&
      std::same_as<std::invoke_result_t<Factory&>, T>
  T& construct_from(Factory factory) noexcept(std::is_nothrow_invocable_v<Factory&>) {
    return *::new (static_cast<void*>(&storage)) T(factory());
  }

  void destroy() noexcept(std::is_nothrow_destructible_v<T>) {
    std::destroy_at(std::launder(reinterpret_cast<T*>(&storage)));
  }

  T& get() & noexcept {
    return *std::launder(reinterpret_cast<T*>(&storage));
  }

private:
  alignas(T) std::byte storage[sizeof(T)];
};

        注意 $construct_-from\left(\right)$ 方法设计成接受lambda而不是接受构造函数参数。这让我们可以使用拷贝消除特性,就地通过函数调用结果初始化构造对象。如果使用构造函数参数,我们就需要进行一次不必要的移动构造函数。
        现在我们可以通过 $manual_-lifetime$ 声明 $promise.initial_-suspend\left(\right)$ 返回的临时对象数据成员。

struct __g_state {
  __g_state(int&& x);

  int x;
  __g_promise_t __promise;
  manual_lifetime<std::suspend_always> __tmp1;
  // to be filled out
};

        $std$::$suspend_-always$ 类型没有定义 $operator$ $co_-await\left(\right)$,所以我们不需要额外的内存保存其返回的临时对象。
        一旦我们通过 $initial_-suspend\left(\right)$ 构造这个对象,我们就需要调用 $await_-ready\left(\right)$、$await_-suspend\left(\right)$ 和 $await_-resume\left(\right)$ 实现 $co_-await$ 表达式。
        当调用 $await_-suspend\left(\right)$ 时,我们需要传递当前协程句柄。现在我们可以通过直接把 $promise$ 引用作为参数调用 $std$::$coroutine_-handle$<__$g_-promise_-t$>::$from_-promise\left(\right)$ 的方式实现。我们稍后再看看它的内部结构。
        同样,$.await_-suspend\left(handle\right)$ 调用的结果类型是 $void$,因此不需要像 $bool$ 返回或者 $coroutine_-handle$ 返回那样考虑是否恢复当前协程或者另一个协程。
        最终,所有 $std$::$suspend_-always$ $awaiter$ 的方法都是 $noexcept$ 的,我们不需要担心异常。如果它们可能会抛出异常,我们需要添加额外的代码保证临时的 $std$::$supsend_-always$ 对象在异常传播到启动函数之外前被销毁。
        一旦 $await_-suspend\left(\right)$ 成功返回或者协程体准备开始执行时,如果有异常抛出,我们就不再需要自动销毁协程状态。所以我们可以调用持有协程状态的 $std$::$unique_-ptr$ 的 $release\left(\right)$ 来避免当我们从函数返回时协程状态的自动销毁。
        所以现在我们可以实现初始化挂起表达式的第一部分:

task g(int x) {
  std::unique_ptr<__g_state> state(new __g_state(static_cast<int&&>(x)));
  decltype(auto) return_value = state->__promise.get_return_object();

  state->__tmp1.construct_from([&]() -> decltype(auto) {
    return state->__promise.initial_suspend();
  });
  if (!state->__tmp1.get().await_ready()) {
    //
    // ... suspend-coroutine here
    //
    state->__tmp1.get().await_suspend(
      std::coroutine_handle<__g_promise_t>::from_promise(state->__promise));

    state.release();

    // fall through to return statement below.
  } else {
    // Coroutine did not suspend.

    state.release();

    //
    // ... start executing the coroutine body
    //
  }
  return __return_val;
}

        $await_-resume\left(\right)$ 调用和 __$tmp1$ 的析构函数会在协程体出现,所以不会在启动函数出现。
        现在我们有了一个 ( 大部分 ) 功能等价的初始化挂起点逻辑,但启动函数仍然有一系列 TODO。为了解决这些,我们首先需要绕道研究一下挂起然后恢复一个协程的策略。

第五步:记录挂起点

        当协程挂起时,需要确保恢复点与之前的挂起点相同。
        此外,还需要跟踪每个挂起点上自动生命周期对象的存活状态,以便之后协程如果被销毁而不是挂起时,知道需要销毁哪些对象。
        一种实现方式是给每个挂起点一个唯一编号,然后在协程状态使用一个整型数据成员存储。
        无论什么时候,当一个协程挂起,它需要把当前挂起点编号写入协程状态,在之后恢复 / 销毁的时候,根据这个整型来找到之前的挂起点。
        注意这不是通过协程状态存储挂起点的唯一方法,然而,主流的三个编译器 ( MSVCClangGCC ) 在这篇文章发布 ( $2022$ 年 ) 的时候都采用了这个方法。另一个可能的解决办法是每个挂起点都使用不同的恢复 / 销毁函数指针,但这篇文章不会讲解这个办法。
        那么让我们来扩展协程状态,使用一个整型数据成员来保存挂起点下标,并初始化为 $0$ ( 我们把使用值表示初始化挂起点 )。

struct __g_state {
  __g_state(int&& x);

  int x;
  __g_promise_t __promise;
  int __suspend_point = 0;  // <-- add the suspend-point index
  manual_lifetime<std::suspend_always> __tmp1;
  // to be filled out
};

第六步:实现 coroutine_handle::resume() 和 coroutine_handle::destory()

        当协程被 $coroutine_-handle$::$resume\left(\right)$ 调用恢复时,我们需要调用某些函数实现被挂起协程的剩余部分,被调用的函数查找挂起点下标,然后跳转到控制流中的适当位置。
        此外,我们需要实现 $coroutine_-handle$::$destroy\left(\right)$ 函数,通过合适的逻辑销毁当前挂起点作用内的对象。然后我们需要实现 $coroutine_-handle$::$done\left(\right)$ 来确认当前挂起点是否为最终挂起点。
        $coroutine_-handle$ 方法接口不知道协程状态的具体类型,$coroutine_-handle$<$void$> 类型可以指向任何协程实例。这意味着我们需要以类型被擦除的协程状态实现。
        我们可以存储指向协程类型的恢复 / 销毁函数指针,并让 $coroutine_-handle$::$resume$ / $destroy\left(\right)$ 调用这些函数指针。
        $coroutine_-handle$ 类型同样需要实现通过 $coroutine_-handle$::$address\left(\right)$ 转换为 $void*$,和通过 $coroutine_-handle$::$from_-address\left(\right)$ 转换为 $void*$。
        进一步,协程可以被任意一个指向它的句柄恢复 / 销毁,不只是最近一个传给 $await_-suspend\left(\right)$ 调用的。
        这些要求让我们定义的 $coroutine_-handle$ 类型只能包含一个指向协程状态的指针,在状态里通过数据成员存储恢复 / 销毁函数指针,而不是把恢复 / 销毁函数指针存在 $coroutine_-handle$ 里。
        同样,因为我们需要 $coroutine_-handle$ 能够指向任意协程状态对象,所以所有协程状态类型的函数指针数据成员应该保持一致。
        一种直接方法是让协程状态类型继承一些包含这些数据成员的基类。
        例如,我们可以定义以下类型作为所有协程状态类型的基类:

struct __coroutine_state {
  using __resume_fn = void(__coroutine_state*);
  using __destroy_fn = void(__coroutine_state*);

  __resume_fn* __resume;
  __destroy_fn* __destroy;
};

        这样 $coroutine_-handle$::$resume\left(\right)$ 可以直接叫 __$resume\left(\right)$,使用 __$coroutine_-state$ 指针作为参数。$coroutine_-handle$::$destroy\left(\right)$ 和 __$destroy\left(\right)$ 函数指针也是一样。
        对于 $coroutine_-handle$::$done\left(\right)$ 方法,我们选择一个空 __$resume\left(\right)$ 函数指针表示最终挂起点。这种方法很方便,因为最终挂起点不支持 $resume\left(\right)$,只支持 $destroy\left(\right)$。如果尝试对一个在最终挂起点挂起的协程调用 $resume\left(\right)$ ( 这是未定义行为 ),那么会调用一个空函数指针,这会失败并且能快速指出错误。
        基于这些,我们可以实现 $coroutine_-handle$<$void$>:

namespace std
{
  template<typename Promise = void>
  class coroutine_handle;

  template<>
  class coroutine_handle<void> {
  public:
    coroutine_handle() noexcept = default;
    coroutine_handle(const coroutine_handle&) noexcept = default;
    coroutine_handle& operator=(const coroutine_handle&) noexcept = default;

    void* address() const {
      return static_cast<void*>(state_);
    }

    static coroutine_handle from_address(void* ptr) {
      coroutine_handle h;
      h.state_ = static_cast<__coroutine_state*>(ptr);
      return h;
    }

    explicit operator bool() noexcept {
      return state_ != nullptr;
    }

    friend bool operator==(coroutine_handle a, coroutine_handle b) noexcept {
      return a.state_ == b.state_;
    }

    void resume() const {
      state_->__resume(state_);
    }
    void destroy() const {
      state_->__destroy(state_);
    }

    bool done() const {
      return state_->__resume == nullptr;
    }
  
  private:
    __coroutine_state* state_ = nullptr;
  };
}

第七步:实现 coroutine_handle<Promise>::promise() 和 from_promise()

        对于更通用的 $coroutine_-handle$<$Promise$> 特例化,大部分实现都可以复用 $coroutine_-handle$<$void$> 实现。然而,我们也需要通过 $promise\left(\right)$ 方法的返回,访问协程状态的 $promise$ 对象,以及从 $promise$ 对象引用构造出一个 $coroutine_-handle$ 。
        然而,因为 $coroutine_-handle$<$Promise$> 类型必须能指向任何 $promise$ 类型是 $Promise$ 的协程状态,因此我们无法简单从指针获取具体协程类型。
        我们需要定义一个新的协程状态基类继承 __$coroutine_-state$,前者应当包含 $promise$ 对象,这样我们就可以定义所有使用特定 $promise$ 类型的协程状态类型,并让它们继承基类。

template<typename Promise>
struct __coroutine_state_with_promise : __coroutine_state {
  __coroutine_state_with_promise() noexcept {}
  ~__coroutine_state_with_promise() {}

  union {
    Promise __promise;
  };
};

        你可能好奇为什么 __$promise$ 成员会定义在一个匿名的 $union$ 里面……
        原因是派生类是为特定的包含参数副本数据成员的协程函数创建的。派生类的数据成员默认会在所有基类数据成员后初始化,所以把 $promise$ 对象声明为普通数据成员意味着它会在参数副本数据成员之前被构造。
        然而,我们需要 $promise$ 的构造函数在参数副本构造函数之后调用,因为参数副本引用可能会被传给 $promise$ 构造函数。
        因此,我们在基类中为 $promise$ 对象保留空间,这样它们就在协程状态内部有一个一致的偏移量,并在参数副本被初始化后的合适点位,调用派生类的构造 / 析构函数。这种控制是通过把 $promise$ 声明为 $union$ 成员实现的。
        让我们更新 __$g_-state$ 类型,继承新的基类。

struct __g_state : __coroutine_state_with_promise<__g_promise_t> {
  __g_state(int&& __x)
  : x(static_cast<int&&>(__x)) {
    // Use placement-new to intialise the promise object in the base-class
    ::new ((void*)std::addressof(this->__promise))
      __g_promise_t(construct_promise<__g_promise_t>(x));
  }

  ~__g_state() {
    // Also need to manually call the promise destructor before the
    // argument objects are destroyed.
    this->__promise.~__g_promise_t();
  }

  int __suspend_point = 0;
  int x;
  manual_lifetime<std::suspend_always> __tmp1;
  // to be filled out
}

        现在我们已经定义了 $promise$ 基类,可以开始实现 $std$::$coroutine_-handle$<$Promise$> 类模板。
        大部分实现都很像 $coroutine_-handle$<$void$>,除了使用 __$coroutine_-state_-with_-promise$<$Promise$> 指针而不是 __$coroutine_-state$ 指针。
        新增部分只有 $promise\left(\right)$ 和 $from_-promise\left(\right)$ 函数。

  • $promise\left(\right)$ 方法直接返回协程状态的 __$promise$ 成员。
  • $from_-promise\left(\right)$ 方法需要我们从 $promise$ 对象地址计算出协程状态对象地址。我们只需要从 $promise$ 对象地址中减去 __$promise$ 成员的偏移量就可以了。

        $coroutine_-handle$<$Promise$> 的实现是:

namespace std
{
  template<typename Promise>
  class coroutine_handle {
    using state_t = __coroutine_state_with_promise<Promise>;
  public:
    coroutine_handle() noexcept = default;
    coroutine_handle(const coroutine_handle&) noexcept = default;
    coroutine_hanlde& operator=(const coroutine_handle&) noexcept = default;

    operator coroutine_handle<void>() const noexcept {
      return coroutine_handle<void>::from_address(address());
    }

    explicit operator bool() const noexcept {
      return state_ != nullptr;
    }

    friend bool operator==(coroutine_handle a, coroutine_handle b) noexcept {
      return a.state_ == b.state_;
    }

    void* address() const {
      return static_cast<void*>(static_cast<__coroutine_state*>(state_));
    }

    static coroutine_handle from_address(void* ptr) {
      coroutine_handle h;
      h.state_ = static_cast<state_t*>(static_cast<__coroutine_state*>(ptr));
      return h;
    }

    Promise& promise() const {
      return state_->__promise;
    }

    static coroutine_handle from_promise(Promise& promise) {
      coroutine_handle h;

      // We know the address of the __promise member, so calculate the
      // address of the coroutine-state by subtracting the offset of
      // the __promise field from this address.
      h.state_ = reinterpret_cast<state_t*>(
        reinterpret_cast<unsigned char*>(std::addressof(promise) -
        offsetof(state_t, __promise)));
      
      return h;
    }

    // Define these in terms of their coroutine_handle<void> implementations

    void resume() const {
      static_cast<coroutine_handle<void>>(*this).resume();
    }

    void destroy() const {
      static_cast<coroutine_handle<void>>(*this).destroy();
    }

    bool done() const {
      return static_cast<coroutine_handle<void*>>(*this).done();
    }
  }

  private:
    state_t* state_;
}

        现在已经定义了协程恢复机制,我们可以回到启动函数,并实现初始化我们往协程状态新加的函数指针数据成员。

第八步:函数体开始部分

        现在我们前向声明具有正确签名的恢复/销毁函数,并更新 __$g_-state$ 构造函数来初始化协程状态,以便恢复/销毁函数指针指向它们:

void __g_resume(__coroutine_state* s);
void __g_destroy(__coroutine_state* s);

struct __g_state : __coroutine_state_with_promise<__g_promise_t> {
  __g_state(int&& __x)
  : x(static_cast<int&&>(__x)) {
    // Initialise the function-pointers used by coroutine_handle methods.
    this->__resume = &__g_resume;
    this->__destroy = &__g_destroy;

    // Use placement-new to intialise the promise object in the base-class
    ::new ((void*)std::addressof(this->__promise))
      __g_promise_t(construct_promise<__g_promise_t>(x));
  }


  // ... rest omitted for brevity
};

task g(int x) {
  std::unique_ptr<__g_state> state(new __g_state(static_cast<int&&>(x)));
  decltype(auto) return_value = state->__promise.get_return_object();

  state->__tmp1.construct_from([&]() -> decltype(auto) {
    return state->__promise.initial_suspend();
  });
  if (!state->__tmp1.get().await_ready()) {
    state->__tmp1.get().await_suspend(
      std::coroutine_handle<__g_promise_t>::from_promise(state->__promise));
    state.release();
    // fall through to return statement below.
  } else {
    // Coroutine did not suspend. Start executing the body immediately.
    __g_resume(state.release());
  }
  return return_value;
}

        完成了启动函数部分后,我们可以开始看 $g\left(\right)$ 的恢复/销毁函数。
        让我们继续转换初始化挂起表达式。
        当 __$g_-resume\left(\right)$ 被调用且 __$suspend_-point$ 的下标是 $0$,我们就需要调用 __$tmp1$ 的 $await_-resume\left(\right)$,并在之后调用 $tmp1$ 的析构函数。

void __g_resume(__coroutine_state* s) {
  // We know that 's' points to a __g_state.
  auto* state = static_cast<__g_state*>(s);

  // Generate a jump-table to jump to the correct place in the code based
  // on the value of the suspend-point index.
  switch (state->__suspend_point) {
  case 0: goto suspend_point_0;
  default: std::unreachable();
  }

suspend_point_0:
  state->__tmp1.get().await_resume();
  state->__tmp1.destroy();


  // TODO: Implement rest of coroutine body.
  //
  // int fx = co_await f(x);
  // co_return fx * fx;
}

        当 __$g_-destroy\left(\right)$ 被调用且 __$suspend_-point$ 下标是 $0$,我们要在销毁和释放协程状态前销毁 __$tmp1$。

void __g_destroy(__coroutine_state* s) {
  auto* state = static_cast<__g_state*>(s);

  switch (state->__suspend_point) {
  case 0: goto suspend_point_0;
  default: std::unreachable();
  }

suspend_point_0:
  state->__tmp1.destroy();
  goto destroy_state;

  // TODO: Add extra logic for other suspend-points here.

destroy_state:
  delete state;
}

第九步:转换 co_await 表达式

        接着,我们看看怎么转换 $co_-await$ $f\left(x\right)$ 表达式。
        首先我们来看看一个返回 $task$ 临时对象的 $f\left(x\right)$。
        因为临时的 $task$ 直到语句结尾的分号才会被销毁,而且语句含有 $co_-await$ 表达式,$task$ 的生命周期会扩散出挂起点,因此它需要存储在协程状态中。
        当计算这个临时 $task$ 的 $co_-await$ 表达式时,我们需要调用返回临时 $awaiter$ 对象的 $operator$ $co_-await\left(\right)$ 方法。这个对象的生命周期也扩散出挂起点,所以也要保存在协程状态中。
        让我们给 __$g_-state$ 类型加上必要成员:

struct __g_state : __coroutine_state_with_promise<__g_promise_t> {
  __g_state(int&& __x);
  ~__g_state();

  int __suspend_point = 0;
  int x;
  manual_lifetime<std::suspend_always> __tmp1;
  manual_lifetime<task> __tmp2;
  manual_lifetime<task::awaiter> __tmp3;
};

        然后我们可以更新 __$g_-resume\left(\right)$ 函数来初始化这些临时对象,接着再计算 $co_-await$ 表达式包含的 $await_-ready$、$await_-suspend$ 和 $await_-resume$ 这 $3$ 个调用。
        注意 $task$::$awaiter$::$await_-suspend\left(\right)$ 方法返回协程句柄,因此我们需要生成恢复返回句柄的代码。
        我们也需要再调用 $await_-suspend\left(\right)$ 之前更新挂起点下标 ( 使用下标 $1$ ),然后给跳表添加额外的条目,确保在正确的位置恢复。

void __g_resume(__coroutine_state* s) {
  // We know that 's' points to a __g_state.
  auto* state = static_cast<__g_state*>(s);


  // Generate a jump-table to jump to the correct place in the code based
  // on the value of the suspend-point index.
  switch (state->__suspend_point) {
  case 0: goto suspend_point_0;
  case 1: goto suspend_point_1;  // <-- add new jump-table entry
  default: std::unreachable();
  }

suspend_point_0:
  state->__tmp1.get().await_resume();
  state->__tmp1.destroy();

  // int fx = co_await f(x);
  state->__tmp2.construct_from[&] {
    return f(state->x);
  };
  state->__tmp3.construct_from([&] {
    return static_cast<task&&>(state->__tmp2.get()).operator co_await();
  });
  if (!state->__tmp3.get().await_ready()) {
    // mark the suspend-point
    state->__suspend_point = 1;

    auto h = state->__tmp3.get().await_suspend(
      std::coroutine_handle<__g_promise_t>::from_promise(state->__promise));
    
    // Resume the returned coroutine_handle before returning.
    h.resume();
    return;
  }

suspend_point_1:
  int fx = state->__tmp3.get().await_resume();
  state->__tmp3.destroy();
  state->__tmp2.destroy();

  // TOOD: Implement
  // co_return fx * fx;
}

        注意 $int$ $fx$ 局部变量生命周期没有扩散出挂起点,因此它不需要存储在协程状态中。我们可以把它作为一个普通的 __$g_-resume$ 函数局部变量。
        我们也需要给 __$g_-destroy\left(\right)$ 函数添加必要的条目来处理协程在挂起点销毁的情况。

void __g_destroy(__coroutine_state* s) {
  auto* state = static_cast<__g_state*>(s);

  switch (state->__suspend_point) {
  case 0: goto suspend_point_0;
  case 1: goto suspend_point_1;  // <-- add new jump-table entry
  default: std::unreachable();
  }

suspend_point_0:
  state->__tmp1.destroy();
  goto destroy_state;

suspend_point_1:
  state->__tmp3.destroy();
  state->__tmp2.destroy();
  goto destroy_state;

  // TODO: Add extra logic for other suspend-ponits here.

destroy_state:
  delete state;
}

        这样我们就完成了语句:

int fx = co_await f(x);

        然而,函数 $f\left(\right)$ 并没有标为noexcept,意味着它可能抛出异常。同样,$awaiter$::$await_-resume\left(\right)$ 方法也没有标为noexcept,也可能抛出异常。
        当协程体抛出异常时,编译器生成代码来捕获,然后调用 $promise$.$unhandled_-exception\left(\right)$ 来给 $promise$ 机会对异常做些事情。我们来看看这方面的实现。

第十步:实现 unhandled_exception()

        协程定义规范 dcl.fct.def.coroutine 中说,协程的行为就好像它的函数体被替换为:

{
  promise-type promise promise-constructor-arguments;
  try {
    co_await promise.initial_suspend();
    function-body
  } catch (...) {
    if (!intiali-await-resume-called)
      throw;
    promise.unhandled_exception();
  }
final-suspend:
  co_await promise.final_suspend();
}

        我们已经单独处理了启动函数中的 $initial-await_-resume-called$ 分支,所以我们不需要关注这块。
        来让我们调整 __$g_-resume\left(\right)$ 函数体,插入try/catch块。
        注意我们需要小心放置 $switch$,跳转到try块的正确位置,因为我们不允许通过 $goto$ 进入一个try块。
        同样,我们需要谨慎地对 $await_-suspend\left(\right)$ 返回的在try/catch块外的协程句柄调用 $.resume\left(\right)$。如果一个返回的协程的 $.resume\left(\right)$ 抛出了异常,它不应被当前协程捕获,而是传播给调用 $resume\left(\right)$ 恢复它的协程。因此我们在函数开头声明一个存储协程句柄的变量,然后 $goto$ 到try/catch之外的位置,执行 $.resume\left(\right)$ 调用。

void __g_resume(__coroutine_state* s) {
  auto* state = static_cast<__g_state*>(s);

  std::coroutine_handle<void> coro_to_resume;

  try {
    switch (state->__suspend_point) {
    case 0: goto suspend_point_0;
    case 1: goto suspend_point_1;  // <-- add new jump-table entry
    default: std::unreachable();
    }

suspend_point_0:
    state->__tmp1.get().await_resume();
    state->__tmp1.destroy();

    // int fx = co_await f(x);
    state->__tmp2.construct_from([&] {
      return f(state->x);
    });
    state->__tmp3.construct_from([&] {
      return static_cast<task&&>(state->__tmp2.get()).operator co_await();
    });

    if (!state->__tmp3.get().await_ready()) {
      state->__suspend_point = 1;
      coro_to_resume = state->__tmp3.get().await_suspend(
        std::coroutine_handle<__g_promise_t>::from_promise(state->__promise));
      goto resume_coro;
    }

suspend_point_1:
    int fx = state->__tmp3.get().await_resume();
    state->__tmp3.destroy();
    state->__tmp2.destroy();

    // TODO: Implement
    // co_return fx * fx;
  } catch (...) {
    state->__promise.unhandled_exception();
    goto final_suspend;
  } 

final_suspend:
  // TODO: Implement
  // co_await promise.final_suspend();

resume_coro:
  coro_to_resume.resume();
  return;
}

        上面的代码有个 bug。当 __$tmp3$.$get\left(\right)$.$await_-resume\left(\right)$ 调用因为异常退出,我们会在没有调用 __$tmp3$ 和 __$tmp2$ 的析构函数情况下捕获异常。
        注意我们不能简单地捕获异常,调用析构函数然后重新抛出异常,因为这样会改变这些析构函数的行为,相当于在调用 $std$::$unhandled_-exceptions\left(\right)$ 前“处理”了异常。如果在异常展开时调用析构函数,那么 $std$::$unhandled_-exceptions\left(\right)$ 应该返回非零值。
        我们可以定义个RAII辅助类确保当异常抛出,退出作用域时析构函数被调用。

template<typename T>
struct destructor_guard {
  explicit destructor_guard(manual_lifetime<T>& obj) noexcept
  : ptr_(std::addressof(obj))
  {}

  // non-movable
  destructor_guard(destructor_guard&&) = delete;
  destructor_guard& operator=(destructor_guard&&) = delete;

  ~destructor_guard() noexcept(std::is_nothrow_destructible_v<T>) {
    if (ptr_ != nullptr) {
      ptr_->destroy();
    }
  }

  void cancel() noexcept { ptr_ = nullptr; }

private:
  manual_lifetime<T>* ptr_;
};

// Partial specialisation for types that don't need their destructors called.
template<typename T>
  requires std::is_trivially_destrcutible_v<T>
struct destructor_guard<T> {
  explicit destructor_guard(manual_lifetime<T>&) noexcept {}
  void cancel() noexcept {}
};

// Class-template argument deduction to simplify usage
template<typename T>
destructor_guard(manual_lifetime<T>& obj) -> destructor_guard<T>;

        通过这个工具,我们现在可以确保当异常抛出时协程状态中的变量被正确析构。
        让我们对其他变量也使用这个类,确保当退出作用域时它们的析构函数也会被调用。

void __g_resume(__coroutine_state* s) {
  auto* state = static_cast<__g_state*>(s);

  std::coroutine_handle<void> coro_to_resume;

  try {
    switch (state->__suspend_point) {
    case 0: goto suspend_point_0;
    case 1: goto suspend_point_1;  // <-- add new jump-table entry
    default: std::unreachable();
    }

suspend_point_0:
    {
      destructor_guard tmp1_dtor{state->__tmp1};
      state->__tmp1.get().await_resume();
    }

    // int fx = co_await f(x);
    {
      state->__tmp2.construct_from([&] {
        return f(state->x);
      });
      destructor_guard tmp2_dtor{state->__tmp2};

      state->__tmp3.construct_from([&] {
        return static_cast<task&&>(state->__tmp2.get()).operator co_await();
      });
      destructor_guard tmp3_dtor{state->__tmp3};

      if (!state->__tmp3.get().await_ready()) {
        state->__suspend_point = 1;

        coro_to_resume = state->__tmp3.get().await_suspend(
          std::coroutine_handle<__g_promise_t>::from_promise(state->__promise));
        
        // A coroutine suspends without exiting scopes.
        // So cancel the destructor-guards.
        tmp3_dtor.cancel();
        tmp2_dtor.cancel();

        goto resume_coro;
      }

      // Don't exit the scope here.
      //
      // We can't 'goto' a label that enters the scope of a variable with a
      // non-trivial destructor. So we have to exit the scope of the destructor
      // guards here without calling the destructors and then recreate them after
      // the `suspend_point_1` label.
      tmp3_dtor.cancel();
      tmp2_dtor.cancel();
    }

suspend_point_1:
    int fx = [&]() -> decltype(auto) {
      destructor_guard tmp2_dtor{state->__tmp2};
      destructor_guard tmp3_dtor{state->__tmp3};
      return state->__tmp3.get().await_resume();
    }();

    // TODO: Implement
    // co_return fx * fx;
  } catch (...) {
    state->__promise.unhandled_exception();
    goto final_suspend;
  }

final_suspend:
  // TODO: Implement
  // co_await promise.final_suspend();

resume_coro:
  coro_to_resume.resume();
  return;
}

        现在我们的协程体会在任何异常出现的地方正确销毁局部变量,并且会在这些异常传播出协程体时正确调用 $promise$.$unhandled_-exception\left(\right)$。
        要注意的是,如果 $promise$.$unhandled_-exception\left(\right)$ 方法自身因为异常退出 ( 例如重新抛出当前异常 ) 时,可能需要特殊处理。
        在这种情况,协程需要捕获异常,标记为在最终挂起点挂起,然后重抛异常。
        例如,__$g_-resume\left(\right)$ 函数的catch块会像这样:

try {
  // ...
} catch (...) {
  try {
    state->__promise.unhandled_exception();
  } catch (...) {
    state->__suspend_point = 2;
    state->__resume = nullptr;  // mark as final-suspend-point
    throw;
  }
}

        然后我们需要给 __$g_-destroy$ 函数跳表增加额外条目:

switch (state->__suspend_point) {
case 0: goto suspend_point_0;
case 1: goto suspend_point_1;
case 2: goto destroy_state;  // no variables in scope that need to be destroyed
                             // just destroy the coroutine-state object.
}

        注意这个例子中,最终挂起点并不是必须跟 $co_-await$ $promise$.$final_-suspend\left(\right)$ 的挂起点一样。
        这是因为 $promise$.$final_-suspend\left(\right)$ 挂起点通常有些额外的与 $co_-await$ 表达式相关的临时对象,这些对象需要在 $coroutine_-handle$::$destroy\left(\right)$ 调用时销毁。而在这里,如果 $promise$.$unhandled_-exception\left(\right)$ 因为异常退出,这些对象不再存活,所以不需要被 $coroutine_-handle$::$destroy\left(\right)$ 销毁。

第十一步:实现 co_return

        下一步是实现 $co_-return$ $fx$ $*$ $fx;$ 语句。
        与前面相比,这一步相对简单。
        $co_-return$ <$expr$> 语句会映射成:

promise.return_value(<expr>);
goto final-suspend-point;

        所以我们可以简单地把 TODO 注释替换成:

state->__promise.return_value(fx * fx);
goto final_suspend;

        简单。

第十二步:实现 final_suspend()

        代码的最后一个 TODO 是实现 $co_-await$ $promise$.$final_-suspend\left(\right)$ 语句。
        $final_-suspend\left(\right)$ 方法返回一个临时的 $task$::$promise_-type$::$final_-awaiter$ 类型,后者存储在协程状态中,并在 __$g_-destroy$ 内销毁。
        这个类型没有重载 $operator$ $co_-await\left(\right)$,所以我们不需要额外的临时对象存储调用结果。
        就像 $task$::$awaiter$ 类型,它也通过 $await_-suspend\left(\right)$ 返回协程句柄。所以我们需要确保对返回的句柄调用 $resume\left(\right)$。
        如果协程没有在最终挂起点挂起,那么协程状态会被隐式销毁。所以我们需要在执行到协程结束时,删除状态对象。
        而且,因为所有的最终挂起逻辑需要是noexcept的,我们不需要担心这里的子表达式会抛出异常。
        我们先给 __$g_-state$ 类型加上数据成员。

struct __g_state : __coroutine_state_with_promise<__g_promise_t> {
  __g_state(int&& __x);
  ~__g_state();

  int __suspend_point = 0;
  int x;
  manual_lifetime<std::suspend_always> __tmp1;
  manual_lifetime<task> __tmp2;
  manual_lifetime<task::awaiter> __tmp3;
  manual_lifetime<task::promise_type::final_awaiter> __tmp4;  // <---
};

        然后我们可以像下面这样实现最终挂起表达式体:

final_suspend:
  // co_await promise.final_suspend
  {
    state->__tmp4.construct_from([&]() noexcept {
      return state->__promise.final_suspend();
    });
    destructor_guard tmp4_dtor{state->__tmp4};

    if (!state->__tmp4.get().await_ready()) {
      state->__suspend_point = 2;
      state->__resume = nullptr;  // mark as final suspend-point

      coro_to_resume = state->__tmp4.get().await_suspend(
        std::coroutine_handle<__g_promise_t>::from_promise(state->__promise));

      tmp4_dtor.cancel();
      goto resume_coro;
    }

    state->__tmp4.get().await_resume();
  }

  // Destroy coroutine-state if execution flows off end of coroutine
  delete state;
  return;

        接着我们需要更新 __$g_-destroy$ 函数来处理新的挂起点。

void __g_destroy(__coroutine_state* state) {
  auto* state = static_cast<__g_state*>(s);

  switch (state->__suspend_point) {
  case 0: goto suspend_point_0;
  case 1: goto suspend_point_1;
  case 2: goto suspend_point_2;
  default: std::unreachable();
  }

suspend_point_0:
  state->__tmp1.destroy();
  goto destroy_state;

suspend_point_1:
  state->__tmp3.destroy();
  state->__tmp2.destroy();
  goto destroy_state;

suspend_point_2:
  state->__tmp4.destroy();
  goto destroy_state;

destroy_state:
  delete state;
}

        现在我们有了一个完整的 $g\left(\right)$ 协程函数转换结果。
        结束了!
        还是……

第十三步:实现对称转移和无操作协程

        我们上面实现的 __$g_-resume\left(\right)$ 函数方式有个问题。
        之前的文章中有详细讨论过,所以如果你想了解更多,可以查看C++协程(4):理解对称转移
        expr.await 规范给了一点关于我们应该怎么处理返回协程句柄的 $await_-suspend$ 的线索:

如果 $await_-suspend$ 类型时 $std$::$coroutine_-handle$<$Z$>,会调用 $.resume\left(\right)$。
备注1:这会恢复 $await_-suspend$ 返回的协程。可以用这种方式连续恢复任意数量的协程,最终控制流会返回给现在协程的调用者/恢复者。

        这个备注,虽然不是规范,也不是约束,但是十分鼓励编译器以尾调用方式实现恢复协程,而不是递归方式。因为在协程循环恢复对方的过程中,递归方式很容易导致栈无限制增长。
        问题原因是我们是在 __$g_-resume\left(\right)$ 函数体内调用下一个协程的 $.resume\left(\right)$ 然后返回,因此 __$g_-resume\left(\right)$ 的栈帧会在下一个协程挂起返回后才会释放。
        编译器可以把恢复下一个协程优化为尾调用。以这种方式,编译器生成的代码会先弹出当前栈帧,保留返回地址,然后执行 $jmp$ 指令跳转到下一个协程的恢复函数。
        因为C++并没有机制让尾部位置的函数调用变成尾调用,我们需要从恢复函数中返回来释放栈空间,然后让调用者恢复下一个协程。
        下一个协程也可能需要在挂起时恢复另一个协程,而且这种恢复可能会无限下去,所以调用者需要在循环中恢复协程。
        这种循环通常叫做蹦床循环 ( $trampoline$ $loop$ ),因为我们从一个协程回到循环然后再跳到下一个协程。
        如果我们把恢复函数的签名改成返回指向下个协程状态的指针,而不是 $void$,那么 $coroutine_-handle$::$resume\left(\right)$ 函数可以立即调用下个协程的 __$resume\left(\right)$ 指针来恢复它。
        我们改一下 __$coroutine_-state$ 的 __$resume_-fn$ 的签名:

struct __coroutine_state {
  using __resume_fn = __coroutine_state* (__coroutine_state*);
  using __destroy_fn = void (__coroutine_state*);

  __resume_fn* __resume;
  __destroy_fn* __destroy;
};

        然后我们可以像这样编写 $coroutine_-handle$::$resume\left(\right)$:

void std::coroutine_handle<void>::resume() const {
  __coroutine_state* s = state_;
  do {
    s = s->__resume(s);
  } while (/*some condition*/);
}

        下个问题是:“循环条件是什么?”
        这时就轮到 $std$::$noop_-coroutine\left(\right)$ 帮忙了。
        $std$::$noop_-coroutine$ 是一个工厂函数,返回一个特殊协程句柄,具有无操作的 $resume\left(\right)$ 和 $destroy\left(\right)$ 方法。如果一个协程挂起,并且 $await_-suspend\left(\right)$ 返回无操作句柄,意味着没有协程需要恢复,它的 $coroutine_-handle$::$resume\left(\right)$ 会返回给调用者。
        所以我们需要实现 $std$::$noop_-coroutine\left(\right)$ 以及 $coroutine_-handle$::$resume\left(\right)$ 条件,让 __$coroutine_-state$ 指针指向无操作协程状态时变为 $false$ 并退出循环。
        一种策略是定义一个静态 __$coroutine_-state$ 变量,作为无操作协程状态。$std$::$noop_-coroutine\left(\right)$ 函数可以返回一个指向该对象的协程句柄,然后我们可以比较 __$coroutine_-state$ 指针和那个对象的地址,判断协程句柄是否为无操作协程。
        首先我们定义这个特殊的无操作协程状态对象:

struct __coroutine_state {
  using __resume_fn = __coroutine_state* (__coroutine_state*);
  using __destroy_fn = void (__coroutine_state*);

  __resume_fn* __resume;
  __destroy_fn* __destroy;

  static __coroutine_state* __noop_resume(__coroutine_state* state) noexcept {
    return state;
  }

  static void __noop_destroy(__coroutine_state*) noexcept {}

  static const __coroutine_state __noop_coroutine;
};

inline const __coroutine_state __coroutine_state::__noop_coroutine{
  &__coroutine_state::__noop_resume,
  &__coroutine_state::__noop_destroy,
};

        然后我们可以特例化实现 $std$::$coroutine_-handle$<$noop_-coroutine_-promise$>。

namespace std
{
  struct noop_coroutine_promise {};

  using noop_coroutine_handle = coroutine_handle<noop_coroutine_promise>;

  noop_coroutine_handle noop_coroutine() noexcept;

  template<>
  class coroutine_handle<noop_coroutine_promise> {
  public:
    constexpr coroutine_handle(const coroutine_handle&) noexcept = default;
    constexpr coroutine_handle& operator=*(const coroutine_handle&) noexcept = default;

    constexpr explicit operator bool() noexcept { return true; }

    constexpr friend bool operator==(coroutine_handle, coroutine_handle) noexcept {
      return true;
    }

    operator coroutine_handle<void>() const noexcept {
      return coroutine_handle<void>::from_address(address());
    }

    noop_coroutine_promise& promise() const noexcept {
      static noop_coroutine_promise promise;
      return promise;
    }

    constexpr void resume() const noexcept {}
    constexpr void destroy() const noexcept {}
    constexpr bool done() const noexcept { return false; }

    constexpr void* address() const noexcept {
      return const_cast<__coroutine_state*>(&__coroutine_state::__noop_coroutine);
    }
  private:
    constexpr coroutine_handle() noexcept = default;

    friend noop_coroutine_handle noop_coroutine() noexcept {
      return {};
    }
  };
}

        然后我们可以更新 $coroutine_-handle$::$resume\left(\right)$,在返回无操作协程状态时退出。

void std::coroutine_handle<void>::resume() const {
  __coroutine_state* s = state_;
  do {
    s = s->__resume(s);
  } while (s != &__coroutine_state::__noop_coroutine);
}

        最后,我们可以更新 __$g_-resume\left(\right)$ 返回 __$coroutine_-state*$。
        这部分只涉及更新签名并替换:

coro_to_resume = ...;
goto resume_coro;

        以及

auto h = ...;
return static_cast<__coroutine_state*>(h.address());

        并在函数的最后面 ( $delete$ $state;$ 语句之后 ) 添加:

return static_cast<__coroutine_state*>(std::noop_coroutine().address());

最后一件事

        细心的人可能发现了,协程状态类型 __$g_-state$ 要比需要的大。
        $4$ 个存储临时值的数据成员分别为它们的值保留了空间。然而,一些临时值的生命周期并不重叠,所以理论上可以通过在对象生命周期结束后,给下一个对象重复使用空间的方式,节省空间。
        为了利用这点,我们可以把数据成员定义在一个合适的匿名union中。
        看一下我们现在的临时变量生命周期:

  • __$tmp1$ - 只在 $co_-await$ $promise$.$initial_-suspend\left(\right);$ 语句存活
  • __$tmp2$ - 只在 $int$ $fx$ $=$ $co_-await$ $f\left(x\right);$ 语句存活
  • __$tmp3$ - 只在 $int$ $fx$ $=$ $co_-await$ $f\left(x\right);$ 语句存活 - 生命周期内嵌于 __$tmp2$
  • __$tmp4$ - 只在 $co_-await$ $promise$.$final_-suspend\left(\right);$ 语句存活

        因为 __$tmp2$ 和 __$tmp3$ 的生命周期重启,我们必须把它们一起放在同一个struct,保证同时间它们都存活。
        然而,__$tmp1$ 和 __$tmp4$ 成员生命周期不重叠,所以可以一起放在匿名union中。
        因此,我们可以把数据成员定义改成:

struct __g_state : __coroutine_state_with_promise<__g_promise_t> {
  __g_state(int&& x);
  ~__g_state();

  int __suspend_point = 0;
  int x;

  struct __scope1 {
    manual_lifetime<task> __tmp2;
    manula_lifetime<task::awaiter> __tmp3;
  };

  union {
    manul_lifetime<std::suspend_always> __tmp1;
    __scope1 __s1;
    manual_lifetime<task::promise_type::final_awaiter> __tmp4;
  };
};

        然后,因为 __$tmp2$ 和 __$tmp3$ 变量都内嵌在 __$s1$ 对象,我们需要把它们的引用改成例如 $state$->__$s1$.$tmp2$ 的方式。不过其他代码不用变。
        这样协程状态可以节省 $16$ 字节,因为 __$tmp1$ 和 __$tmp4$ 数据成员不再需要额外的空间对齐,它们会被对齐到指针大小,即使是空类型。

放到一起

        好的,让我们看看下面协程函数生成的代码:

task g(int x) {
  int fx = co_await f(x);
  co_return fx * fx;
}

        是下面这样:

/////
// The coroutine promise-type

using __promise_t = std::coroutine_traits<task, int>::promise_type;

__coroutine_state* __g_resume(__coroutine_state* s);
void __g_destroy(__coroutine_state* s);


/////
// The coroutine-state definition

struct __g_state : __coroutine_state_with_promise<__g_promise_t> {
  __g_state(int&& x)
  : x(static_cast<int&&>(x)) {
    // Initialise the function-pointers used by coroutine-handle methods.
    this->__resume = &__g_resume;
    this->__destroy = &__g_destroy;

    // Used placement-new to initialise the promise object in the base-class
    // after we've intialised the argument copies.
    ::new ((void*)std::addressof(this->__promise))
      __g_promise_t(construct_promise<__g_promise_t>(this->x));
  }

  ~__g_state() {
    this->__promise.~__g_promise_t();
  }

  int __suspend_point = 0;

  // Argument copies
  int x;

  // Local variables/temporaries
  struct __scope1 {
    manual_lifetime<task> __tmp2;
    manual_lifetime<task::awaiter> __tmp3;
  };

  union {
    manual_lifetime<std::suspend_always> __tmp1;
    __scope __s1;
    manual_lifetime<task::promise_type::final_awaiter> __tmp4;
  };
};

/////
// The "ramp" function

task g(int x) {
  std::unique_ptr<__g_state> state(new __g_state(static_cast<int&&>(x)));
  decltype(auto) return_value = state->__promise.get_return_object();

  state->__tmp1.construct_from([&]() -> decltype(auto) {
    return state->__promise.initial_suspend();
  });
  if (!state->__tmp1.get().await_ready()) {
    state->__tmp1.get().await_suspend(
      std::coroutine_handle<__g_promise_t>::from_promise(state->__promise));
    state.release();
    // fall through to return statement below.
  } else {
    // Coroutine did not suspend. Start excuting the body immediately.
    __g_resume(state.release());
  }
  return return_value;
}

/////
// The "resume" function

__coroutine_state* __g_resume(__coroutine_state* s) {
  auto* state = static_cast<__g_state*>(s);

  try {
    switch (state->__suspend_point) {
    case 0: goto suspend_point_0;
    case 1: goto suspend_point_1;  // <-- add new jump-table entry
    default: std::unreachable();
    }

suspend_point_0:
    {
      destructor_guard tmp1_dtor{state->__tmp1};
      state->__tmp1.get().await_resume();
    }

    // int fx = co_await f(x);
    {
      state->__s1.__tmp2.construct_from([&] {
        return f(state->x);
      });
      destructor_guard tmp2_dtor{state->__s1.__tmp2};

      state->__s1.__tmp3.construct_from([&] {
        return static_cast<task&&>(state->__s1.__tmp2.get()).operator co_await();
      });
      destructor_guard tmp3_dtor{state->__s1.__tmp3};

      if (!state->__s1.__tmp3.get().await_ready()) {
        state->__suspend_point = 1;

        auto h = state->__s1.__tmp3.get().await_suspend(
          std::coroutine_handle<__g_promise_t>::from_promise(state->__promise));

        // A coroutine suspends without exiting scopes.
        // So cancel the destructor-guards.
        tmp3_dtor.cancel();
        tmp2_dtor.cancel();

        return static_cast<__coroutine_state*>(h.address());
      }

      // Don't exit the scope here.
      // We can't 'goto' a label that enters the scope of a variable with a
      // non-trivial destructor. So we have to exit the scope of the destructor
      // guards here without calling the destructors and then recreate them after
      // the `suspend_point_1` label.
      tmp3_dtor.cancel();
      tmp2_dtor.cancel();
    }

suspend_point_1:
    int fx = [&]() -> decltype(auto) {
      destructor_guard tmp2_dtor{state->__s1.__tmp2};
      destructor_guard tmp3_dtor{state->__s1.__tmp3};
      return state->__s1.__tmp3.get().await_resume();
    }();

    // co_return fx * fx;
    state->__promise.return_value(fx * fx);
    goto final_suspend;
  } catch (...) {
    state->__promise.unhandled_exception();
    goto final_suspend;
  }

final_suspend:
  // co_await promise.final_suspend
  {
    state->__tmp4.construct_from([&]() noexcept {
      return state->__promise.final_suspend();
    });
    destructor_guard tmp4_dtor{state->__tmp4};

    if (!state->__tmp4.get().await_ready()) {
      state->__suspend_point = 2;
      state->__resume = nullptr;  // mark as final suspend-point

      auto h = state->__tmp4.get().await_suspend(
        std::coroutine_handle<__g_promise_t>::from_promise(state->__promise));
      
      tmp4_dtor.cancel();
      return static_cast<__coroutine_state*>(h.address());
    }

    state->__tmp4.get().await_resume();
  }

  // Destroy coroutine-state if execution flows off end of coroutine
  delete state;

  return static_cast<__coroutine_state*>(std::noop_coroutine().address());
}

/////
// The "destroy" function

void __g_destroy(__coroutine_state* s) {
  auto* state = static_cast<__g_state*>(s);

  switch (state->__suspend_point) {
  case 0: goto suspend_point_0;
  case 1: goto suspend_point_1;
  case 2: goto suspend_point_2;
  default: std::unreachable();
  }

suspend_point_0:
  state->__tmp1.destroy();
  goto destroy_state;

suspend_point_1:
  state->__s1.__tmp3.destroy();
  state->__s1.__tmp2.destroy();
  goto destroy_state;

suspend_point_2:
  state->__tmp4.destroy();
  goto destroy_state;

destroy_state:
  delete state;
}

        最终代码的完全可编译版本可以看https://godbolt.org/z/xaj3Yxabn
        有关C++协程机制的 $5$ 部分系列文章到此结束。
        这些信息可能比你想要了解的还要多,希望它能帮助你理解和揭开协程的神秘面纱。
        感谢您坚持到最后!
        下次再见,Lewis.

C++协程(4):理解对称转移

        协程标准提供了一种有趣的方式来编写异步代码,就好像你还在写同步代码那样。你只需要在合适的点调用 $co_-await$,编译器就会负责挂起协程,在挂起点之间保存状态,并在操作完成后恢复协程执行。
        然而,协程标准,就像它最初定义的那样,有着很不友好的限制,一不小心就很容易导致栈溢出。并且如果想要避免这种溢出,就需要你在的 $task$<$T$> 类型中进行额外的同步开销。
        还好,2018年对协程设计进行了调整,增加了一种叫做“对称转移” ( $symmetric$ $transfer$ ) 的功能,让你可以在不消耗任何额外栈空间的前提下挂起一个协程的同时恢复另一个协程的执行。此功能的加入解决了协程标准的一个关键限制,并且允许更简单有效的,不需要任何安全方面的措施来防止栈溢出的方式来实现一个异步协程。
        在这篇文章我会试着解释栈溢出问题,以及这个关键的“对称转移”能力是怎么解决这个问题的。

首先是一些关于任务协程怎么工作的背景

        考虑如下协程:

task foo() {
  co_return;
}

task bar() {
  co_await foo();
}

        假设我们有一个简单的 $task$ 类型,当其他协程等待它时会懒执行。这个特别的 $task$ 类型没有返回值。
        让我们分析下当 $bar\left(\right)$ 执行 $co_-await$ $foo\left(\right)$ 时发生了什么。

  • $bar\left(\right)$ 协程调用 $foo\left(\right)$ 函数。注意从调用方的角度看协程只是一个普通函数。
  • $foo\left(\right)$ 调用执行一些步骤:
    • 分配协程帧内存 ( 一般在堆上 )。
    • 把参数拷贝到协程帧 ( 如果没有参数,不执行这一步 )。
    • 在协程帧构造 $promise$ 对象。
    • 调用 $promise.get_-return_-object\left(\right)$ 来获取 $foo\left(\right)$ 的返回值。这一步生成将被返回的 $task$ 对象,使用指向刚创建的协程帧的 $std$::$coroutine_-handle$ 初始化。
    • 在最初挂起点 ( 即左大括号 ) 处挂起协程。
    • 把 $task$ 对象返回给 $bar\left(\right)$。
  • 接着 $bar\left(\right)$ 协程对 $foo\left(\right)$ 返回的 $task$ 执行 $co_-await$ 表达式。
    • $bar\left(\right)$ 协程挂起,调用返回的 $task$ 对象的 $await_-suspend\left(\right)$ 方法,传入指向 $bar\left(\right)$ 协程帧的 $std$::$coroutine_-handle$ 。
    • $await_-suspend\left(\right)$ 函数保存 $bar\left(\right)$ 的 $std$::$coroutine_-handle$ 到 $foo\left(\right)$ 的 $promise$ 对象中,调用 $foo\left(\right)$ 的 $std$::$coroutine_-handle$ 的 $.resume\left(\right)$ 方法,恢复 $foo\left(\right)$ 协程的执行。
  • $foo\left(\right)$ 协程同步执行完成。
  • $foo\left(\right)$ 协程在最终挂起点 ( 即右大括号 ) 处挂起,通过开始执行前存储的 $promise$ 对象的 $std$::$coroutine_-handle$ ( 即 $bar\left(\right)$ 协程 ) 恢复 $bar\left(\right)$ 的执行。
  • $bar\left(\right)$ 协程恢复,继续执行,直到到达 $co_-await$ 表达式语句的结尾,这时它调用 $foo\left(\right)$ 返回的临时变量 $task$ 的析构函数。
  • $task$ 的析构函数再调用 $foo\left(\right)$ 协程句柄的 $.destroy\left(\right)$ 方法,销毁协程帧,包括 $promise$ 对象和拷贝的参数。

        好,这么看来一个简单的调用包含了许多步骤。
        为了更好地理解这些步骤,让我们看看一种简单的使用协程标准设计 ( 不支持对称转移 ) 的 $task$ 类实现。

task 的大体实现

        类的实现大体是这样:

class task {
public:
  class promise_type { /* see below */ };

  task(task&& t) noexcept
  : coro_(std::exchange(t.coro_, {}))
  {}

  ~task() {
    if (coro_)
      coro_.destroy();
  }

  class awaiter { /* see below */ };

  awaiter operator co_await() && noexcept;

private:
  explicit task(std::coroutine_handle<promise_type> h) noexcept
  : coro_(h)
  {}

  std::coroutine_handle<promise_type> coro_;
};

        $task$ 独占一个指向协程帧的 $std$::$coroutine_-handle$,在协程调用时创建。$task$ 对象是RAII对象,保证当 $task$ 离开作用域时,$std$::$coroutine_-handle$ 的 $.destroy\left(\right)$ 会被调用。
        接着让我们实现 $promise$ 类型。

实现 task::promise_type

        根据以前的文章,我们知道 $promise$ 类型成员定义了协程帧中的 $Promise$ 对象类型,控制协程行为。
        首先,我们需要实现 $get_-return_-object\left(\right)$ 来构造在协程创建时返回的 $task$ 对象。这个方法只需要使用新创建的协程帧的 $std$::$coroutine_-handle$ 来初始化 $task$。
        我们可以使用 $std$::$coroutine_-handle$::$from_-promise\left(\right)$ 方法来从 $promise$ 对象构造出一个句柄。

class task::promise_type {
public:
  task get_return_object() noexcept {
    return task{std::coroutine_handle<promise_type>::from_promise(*this)};
  }
}

        接着,我们想要协程在左大括号处进行初始化挂起,这样稍后可以在挂起 $task$ 的点恢复协程。
        协程懒启动有一些优点:

  1. 可以在开始协程执行前绑定续体的 $std$::$coroutine_-handle$。这意味着我们不需要使用线程同步来处理绑定续体和协程执行完成之间的竞态。
  2. $task$ 析构函数可以无条件地销毁协程帧,不需要担心协程是否还在另一个线程运行,因为协程只会在 $co_-await$ 时执行。并且开始后调用方协程会被挂起,所以直到协程执行完成,我们才能调用 $task$ 的析构函数。这让编译器可以更好的把协程帧直接分配在调用方的调用栈上,参考P0981R0了解更多关于堆分配跳过优化 ( $Heap$ $Allocation$ $eLision$ $Optimisation$, $HALO$ )。
  3. 提升协程代码的异常安全性。如果你没有立马对返回的 $task$ 调用 $co_-await$,并且做了一些其他逻辑,抛出了异常,导致栈回退,这时 $task$ 的析构函数会被调用,这样我们也可以安全地销毁协程帧,因为我们知道它还没开始执行。我们不需要处理 $detach$,悬垂引用,析构函数阻塞,进程终止或者未定义行为。这也是我在CppCon 2019 talk on Structured Concurrency中讲到的,后者包含更多细节。

        为了让协程在左大括号处进行初始化挂起,我们实现了一个返回内置 $suspend_-always$ 类型的 $initial_-suspend\left(\right)$ 方法。

std::suspend_always initial_suspend() noexcept {
  return {};
}

        接着,我们需要定义 $return_-void\left(\right)$ 方法,这个方法会在执行到 $co_-return;$ 时,或者协程执行结束时被调用。这个方法并不需要做任何事,只需要声明一下来让编译器知道这个协程类型可以使用 $co_-return;$ 语句。

void return_void() noexcept {}

        我们也需要增加一个 $unhandled_-exception\left(\right)$ 方法,如果一个异常逃逸出协程体之外,这个方法会被调用。在我们的场景,可以认为 $task$ 协程体是noexcept的,如果出现异常,直接调用 $std$::$terminate\left(\right)$。

void unhandled_exception() noexcept {
  std::terminate();
}

        最后,当协程执行到右大括号时,我们想让协程在最终挂起点挂起,并恢复续体,即等待它完成的另一个协程的执行。
        为了实现这个,我们需要 $promise$ 有一个数据成员来保存续体的 $std$::$coroutine_-handle$。我们也需要定义 $final_-suspend\left(\right)$ 方法,返回一个 $awaitble$ 对象,在当前协程在最终挂起点挂起后,恢复续体。
        把恢复延后到当前协程挂起后进行是很重要的,因为下一个协程可能会立即调用 $task$ 的析构函数,后者再调用协程帧的 $.destroy\left(\right)$ 。$.destroy\left(\right)$ 只对挂起协程有效,所以这时可能会导致未定义行为。
        编译器会在右大括号处插入 $co_-await$ $promise.final_-suspend\left(\right)$ 语句。
        特别要注意的是,当 $final_-suspend\left(\right)$ 被调用时,当前协程还没有挂起。在协程挂起前,我们需要等待直到返回的 $awaitble$ 对象 $await_-suspend\left(\right)$ 方法被调用。

struct final_awaiter {
  bool await_ready() noexcept {
    return false;
  }

  void await_suspend(std::coroutin_handle<promise_type> h) noexcept {
    // The coroutine is now suspended at the final-suspend point.
    // Lookup its continuation in the promise and resume it.
    h.promise().continuation.resume();
  }

  void await_resume() noexcept {}
};

final_awaiter final_suspend() noexcept {
  return {};
}

std::coroutine_handle<> continuation;
};

        好了,$promise$ 类型完成了。最后还剩下 $task$::$operator$ $co_-await\left(\right)$。

实现 task::operator co_await()

        你可能记得理解co_await这篇文章讲的,当执行 $co_-await$ 表达式时,编译器会生成一个对 $operator$ $co_-await\left(\right)$ 的调用,如果定义了这个方法,返回对象必须同时定义 $await_-ready\left(\right)$、$await_-suspend\left(\right)$ 和 $await_-resume\left(\right)$ 方法。
        当一个协程等待一个 $task$ 时,我们希望等待中的协程总是挂起,并且在挂起后,把等待中协程的句柄保存在即将恢复的协程的 $promise$ 中,并在之后对 $task$ 的 $std$::$coroutine_-handle$ 调用 $.resume\left(\right)$ 来开始 $task$ 执行。
        因此代码会相对直接:

class task::awaiter {
public:
  bool await_ready() noexcept {
    return false;
  }

  void await_suspend(std::coroutine_handle<> continuation) noexcept {
    // Store the continuation in the task'promise so that the final_suspend()
    // knows to resume this corotuine when the task completes.
    coro_.promise().continuation = continuation;

    // Then we resume the task's coroutine, which is currently suspended
    // at the initial-suspend-point (ie. at the open curly brace).
    coro_.resume();
  }

  void await_resume() noexcept {}

private:
  explicit awaiter(std::coroutine_handle<task::promise_type> h) noexcept
  : coro_(h)
  {}

  std::coroutine_handle<task::promise_type> coro_;
};

task::awaiter task::operator co_await() && noexcept {
  return awaiter{coro_};
}

        这样就完成了 $task$ 类型功能所需的代码。
        你可以在 $Compiler$ $Explorer$ 查看完整代码:https://godbolt.org/z/-Kw6Nf

栈溢出问题

        然而,当你尝试在协程中编写循环,并且 $co_-await$ 可能在循环体内同步完成的 $task$ 时,会受到一些限制。
        例如:

task completes_synchronously() {
  co_return;
}

task loop_synchronously(int count) {
  for (int i = 0; i < count; i++) {
    co_await completes_synchronously();
  }
}

        使用上文中的 $task$ 实现,$loop_-synchronously\left(\right)$ 函数 ( 可能 ) 会在 $count$ 是 $10$、$10000$,甚至 $100'000$ 时都能正常运行。但是有些值会导致协程崩溃。
        例如,当 $count$ 是 $1'000'000$ 时就会崩溃,可以查看:https://godbolt.org/z/gy5Q8q
        崩溃的原因是栈溢出。
        为了理解栈溢出的原因,我们需要看看代码执行的时候发生了什么,尤其是栈帧上面发生了什么。
        当其他协程 $co_-await$ 了返回的 $task$ 后,$loop_-synchronously\left(\right)$ 开始执行。这将挂起等待中的协程,调用 $task$::$awaiter$::$await_-suspend\left(\right)$,后者调用 $task$ 的 $std$::$coroutine_-handle$ 的 $.resume\left(\right)$ 方法。
        因此在 $loop_-synchronously\left(\right)$ 启动时,栈看起来就像这样:

           Stack                                                   Heap
+------------------------------+  <-- top of stack   +--------------------------+
| loop_synchronously$resume    | active coroutine -> | loop_synchronously frame |
+------------------------------+                     | +----------------------+ |
| coroutine_handle::resume     |                     | | task::promise        | |
+------------------------------+                     | | - continuation --.   | |
| task::awaiter::await_suspend |                     | +------------------|---+ |
+------------------------------+                     | ...                |     |
| awaiting_coroutine$resume    |                     +--------------------|-----+
+------------------------------+                                          V
|  ....                        |                     +--------------------------+
+------------------------------+                     | awaiting_coroutine frame |
                                                     |                          |
                                                     +--------------------------+

注意:一个协程函数通常被编译器编译成两部分:

  1. “启动函数” ( $ramp$ $function$ ),处理协程帧构造、参数拷贝,$promise$ 构造和生成返回值,以及
  2. “协程体” ( $coroutine$ $body$ ),包含用户编写的逻辑。

我使用 $\$resume$ 后缀表示协程的“协程体”部分。
后面的文章会详细介绍这种分割方式。

        当 $loop_-synchronously\left(\right)$ $co_-await$ $completes_-synchronously\left(\right)$ 返回的 $task$ 时,当前协程会被挂起,调用 $task$::$awaiter$::$await_-suspend\left(\right)$。$await_-suspend\left(\right)$ 方法再调用 $completes_-synchronously\left(\right)$ 协程句柄的 $.resume\left(\right)$ 方法。
        这会恢复 $completes_-synchronously\left(\right)$ 协程,后者会同步执行直到结束,并在最终挂起点处挂起,然后调用 $task$::$promise$::$final_-awaiter$::$await_-suspend\left(\right)$ 方法,后者会调用 $loop_-synchronously\left(\right)$ 协程句柄的 $.resume\left(\right)$ 方法。
        接着当 $loop_-synchronously\left(\right)$ 协程恢复之后、$completes_-synchronously\left(\right)$ 返回的临时变量 $task$ 在分号处销毁时,我们再看看当前程序状态,栈 / 堆看起来就像这样:

           Stack                                                   Heap
+-------------------------------+ <-- top of stack
| loop_synchronously$resume     | active coroutine -.
+-------------------------------+                   |
| coroutine_handle::resume      |            .------'
+-------------------------------+            |
| final_awaiter::await_suspend  |            |
+-------------------------------+            |  +--------------------------+ <-.
| completes_synchronously$resume|            |  | completes_synchronously  |   |
+-------------------------------+            |  | frame                    |   |
| coroutine_handle::resume      |            |  +--------------------------+   |
+-------------------------------+            '---.                             |
| task::awaiter::await_suspend  |                V                             |
+-------------------------------+ <-- prev top  +--------------------------+   |
| loop_synchronously$resume     |     of stack  | loop_synchronously frame |   |
+-------------------------------+               | +----------------------+ |   |
| coroutine_handle::resume      |               | | task::promise        | |   |
+-------------------------------+               | | - continuation --.   | |   |
| task::awaiter::await_suspend  |               | +------------------|---+ |   |
+-------------------------------+               | - task temporary --|---------'
| awaiting_coroutine$resume     |               +--------------------|-----+
+-------------------------------+                                    V
|  ....                         |               +--------------------------+
+-------------------------------+               | awaiting_coroutine frame |
                                                |                          |
                                                +--------------------------+

        接着下一个要做的就是调用 $task$ 的析构函数,销毁 $completes_-synchronously\left(\right)$ 的协程帧,并递增 $count$ 变量,继续循环,再创建一个新的 $completes_-synchronously\left(\right)$ 协程并恢复它。
        事实上,这里会做的就是 $loop_-synchronously\left(\right)$ 和 $completes_-synchronously\left(\right)$ 返回递归调用对方,每次调用都会占用一部分栈空间,直到迭代够一定次数后,栈会溢出并产生未定义行为,通常会导致程序立即崩溃。
        协程内编写循环这种看着没有任何递归的行为,却很容易导致函数产生无限递归。
        那么,在这种原始协程标准设计之下,又该怎么解决这个问题呢?

协程标准解法

        好,那么这种无限递归问题该怎么避免呢?
        在上面的实现中,我们使用返回 $void$ 的 $await_-suspend\left(\right)$。协程标准中还有另外一个返回bool的 $await_-suspend\left(\right)$,返回true代表协程已挂起,控制权移交给 $resume\left(\right)$ 的调用方,返回false则协程立即恢复,不需要消耗任何额外的栈空间。
        所以,为了避免无限地循环递归,我们希望利用返回bool版本的 $await_-suspend\left(\right)$,让 $task$::$awaiter$::$await_-suspend\left(\right)$ 在任务同步结束时返回false,而不是递归地通过 $std$::$coroutine_-handle$::$resume\left(\right)$ 恢复协程。
        实现这种解法需要两部分:

  1. 在 $task$::$awaiter$::$await_-suspend\left(\right)$ 方法,你可以调用 $.resume\left(\right)$ 来启动协程执行。然后在 $.resume\left(\right)$ 返回时,检查协程是否执行完成。如果执行完成,我们可以返回false,表示等待中的协程应当立即恢复。如果没有完成,我们可以返回true,表示将控制权移交给 $std$::$coroutine_-handle$::$resume\left(\right)$ 的调用方。
  2. 在协程执行完成后调用的 $task$::$promise_-type$::$final_-awaiter$::$await_-suspend\left(\right)$ 方法内,我们需要检查等待中协程是否已经 ( 或者即将 ) 从 $task$::$awaiter$::$await_-suspend\left(\right)$ 调用中返回true。如果是,调用 $.resume\left(\right)$ 来恢复它。否则我们需要避免恢复协程,并且通知 $task$::$awaiter$::$await_-suspend\left(\right)$,让它返回false

        然而,有一个更复杂的问题,就是协程可能在当前线程开始执行、挂起,并在 $.resume\left(\right)$ 调用返回前,在其他线程恢复并执行完成。因此,我们需要解决第一部分和第二部分之间可能的竞态。
        我们需要使用 $std$::$atomic$ 值同步。
        现在我们可以对代码进行以下修改:

class task::promise_type {
  ...

  std::coroutine_handle<> continuation;
  std::atomic<bool> ready = false;
};

bool task::awaiter::await_suspend(
    std::coroutine_handle<> continuation) noexcept {
  promise_type& promise = coro_.promise();
  promise.continuation = continuation;
  coro_.resume();
  return !promise.ready.exchange(true, std::memory_order_acq_rel);
}

void task::promise_type::final_awaiter::await_suspend(
    std::coroutine_handle<promise_type> h) noexcept {
  promise_type& promise = h.promise();
  if (promise.ready.exchange(true, std::memory_order_acq_rel)) {
    // The coroutine did not complete synchronously, resume it here.
    promise.continuation.resume();
  }
}

        可以在 $Compiler$ $Explorer$ 上查看修改后的例子:https://godbolt.org/z/7fm8Za。注意这时当 $count$ $==$ $1'000'000$ 时,程序不再崩溃了。
        这就是 $cppcoro$::$task$<$T$> 里面实现的,规避无限递归问题的方式 ( 在某些平台也是一样 ),而且运行得很好。
        哇哦!问题解决了,吗?发布!可以吗…?

问题

        以上方法在解决递归的同时,也引入了一些问题。
        首先,它需要 $std$::$atomic$ 操作,开销可能很大。调用方挂起等待中协程的过程中需要进行一次原子交换,并且被调方执行完成后也需要一次原子交换。如果你的程序是单线程的,那么你会在永远不需要的线程同步原子操作上花费额外的开销。
        其次,它引入了额外的分支。一个在调用方,需要判断是否挂起或者立即恢复协程,另一个在被调方,需要判断是否恢复续体或者挂起。
        注意这个额外分支的开销,甚至原子操作的开销,跟协程程序的业务逻辑相比通常不值一提。然而,协程被称为零成本抽象,并且有许多人使用协程挂起操作来避免等待L1缓存未命中问题 ( 更多细节可以查看 Gor 的 great CppCon talk on nanocoroutines )。
        最后,也可能是最重要的,它在恢复等待中协程的过程中引入了一些运行上下文中不确定的值。
        假设我有以下代码:

cppcoro::static_thread_pool tp;

task foo();
{
  std::cout << "foo1 " << std::this_thread::get_id() << "\n";
  // Suspend coroutine and reschedule onto thread-poll thread.
  co_await tp.schedule();
  std::cout << "foo2 " << std::this_thread::get_id() << "\n";
}

task bar()
{
  std::cout << "bar1 " << std::this_thread::get_id() << "\n";
  co_await foo();
  std::cout << "bar2 " << std::this_thread::get_id() << "\n";
}

        在原来实现中,我们保证在 $co_-await$ $foo\left(\right)$ 执行完成后的代码会在相同的线程中内联执行。
        例如,一种可能的输出:

bar1 1234
foo1 1234
foo2 3456
bar2 3456

        然而,使用原子变量后,$foo\left(\right)$ 执行完成可能与 $bar\left(\right)$ 的挂起之间产生竞态,在一些情况下,这意味着 $co_-await$ $foo\left(\right)$ 之后的代码可能在 $bar\left(\right)$ 启动的线程上执行。
        例如,一种可能的输出:

bar1 1234
foo1 1234
foo2 3456
bar2 1234

        对于许多用例来说,这种行为没有区别。然而,对于想要传递运行上下文的算法来说就有问题了。
        例如,$via\left(\right)$ 算法等待一些 $Awaitable$,然后在对应的 $scheduler$ 的运行上下文上返回。这个算法的一个简单版本如下:

template <typename Awaitable, typename Scheduler>
task<await_result_t<Awaitable>> via(Awaitable a, Scheduler s)
{
  auto result = co_await std::move(a);
  co_await s.schedule();
  co_return result;
}

task<T> get_value();
void consume(const T&);

task<void> consumer(static_thread_pool::scheduler s)
{
  T result = co_await via(get_value(), s);
  consume(result);
}

        最初版本的 $consume\left(\right)$ 调用总是保证在线程池 $s$ 上执行。然而,在使用原子变量后,$consume\left(\right)$ 可能在与 $scheduler$ $s$ 相关的线程上执行,也可能在 $consumer\left(\right)$ 开始执行的线程上执行。
        所以我们应该怎么在没有原子操作、额外分支和不确定的恢复上下文的前提下解决栈溢出问题呢?

进入“对称转移”!

        Gor Nishanov ( $2018$ ) 的论文P0913R0《增加对称协程控制转移》,提出了这种问题的解法,通过提供一种允许一种不需要消耗任何额外栈空间的前提下,让一个协程挂起时对称恢复另一个协程的能力。
        论文提出了两个关键改变:

  • 允许 $await_-suspend\left(\right)$ 返回 $std$::$coroutine_-handle$<$T$>,表示执行需要对称转移给返回的句柄标识的协程。
  • 增加 $std$::$experimental$::$noop_-coroutine\left(\right)$ 函数,返回一个特别的 $std$::$coroutine_-handle$,可以作为 $await_-suspend\left(\right)$ 的返回值,挂起当前协程,从 $.resume\left(\right)$ 调用返回而不是将控制权转移给其他协程。

        所以“对称转移”是什么意思呢?
        当你调用 $std$::$coroutine_-handle$ 的 $.resume\left(\right)$ 来恢复协程,$.resume\left(\right)$ 的调用方在协程恢复后仍然是活跃的。接着,协程再次挂起,在挂起点调用 $await_-suspend\left(\right)$ 返回void( 无条件挂起 ) 或者true ( 有条件挂起 ) 时,$.resume\left(\right)$ 的调用才会返回。
        这可以认为是一种“对称转移”,协程执行和行为就像普通函数调用一样。$.resume\left(\right)$ 的调用方可以是任意函数 ( 协程或非协程 )。当协程挂起并且 $await_-suspend\left(\right)$ 返回true或者void时,控制权会返回给 $.resume\left(\right)$ 的调用者。
        每次调用 $.resume\left(\right)$ 恢复协程时,都会创建一个新的协程的栈帧。
        然而,通过“对称转移”,我们可以简单地挂起一个协程,并恢复另一个协程。两个协程之间不需要隐式的调用方 / 被调方关系,一个协程挂起时,它就可以把控制权转移给另一个挂起的协程 ( 包括它自己 ),并且不需要在下次挂起或者执行完成时把控制权转移给上一个协程。
        让我们看看在 $awaiter$ 使用对称转移时,编译器会怎么处理 $co_-await$ 表达式:

{
  decltype(auto) value = <expr>;
  decltype(auto) awaitble =
      get_awaitable(promise, static_cast<decltype(value)&&>(value));
  decltype(auto) awaiter =
      get_awaiter(static_cast<decltype(awaitable)&&>(awaitable));
  if (!awaiter.await_ready())
  {
    using handle_t = std::coroutine_handle<P>;

    // <suspend-coroutine>

    auto h = awaiter.await_suspend(handle_t::from_promise(p));
    h.resume();
    // <return-to-caller-or-resumer>

    // <resume-point>
  }

  return awaiter.await_resume();
}

        让我们放大与其他 $co_-await$ 行为不同的部分:

auto h = awaiter.await_suspend(handle_t::from_promise(p));
h.resume();
// <return-to-caller-or-resumer>

        一旦协程状态机完成底层转换后 ( 另一篇文章的主题 ),<$return$-$to$-$caller$-$or$-$resumer$> 部分一般是 $return;$ 语句,这会导致最近一次恢复协程的 $.resume\left(\right)$ 的调用将返回给调用方。
        这意味着我们当前函数体的 $std$::$coroutine_-handle$::$resume\left(\right)$ 调用内,又产生了一个对同一个签名的函数的调用,即 $std$::$coroutine_-handle$::$resume\left(\right)$ ,紧跟着一条 $return;$ 语句。
        一些编译器在开启编译优化后,能够在条件满足的时候,把函数结尾的调用 ( 即返回之前 ) 转变为尾调用 ( $tail$-$calls$ )。
        而这种优化正是我们想要的,可以避免栈溢出问题的方式。但是与其期望优化器来决定进行尾调用转移,我们更想要确保这种优化一直有效,即使是未开启优化的情况。
        但是首先,我们先了解下尾调用的含义。

尾调用

        尾调用指在调用前弹出当前栈帧,把当前函数的返回地址作为被调方的返回地址 ( 即被调直接返回给当前函数的调用方 )。
        在X86/X64架构,这通常意味着编译器生成的代码会先弹出当前栈帧,然后调用 $jmp$ 指令来跳转到被调函数入口,而不是使用 $call$ 指令,并在 $call$ 返回后才弹出当前栈帧。
        这个优化通常只会在有限情况下发生,然而:
        特别的,它需要:

  • 调用机制支持尾调用,包括调用方和被调方;
  • 返回类型相同;
  • 没有需要在调用返回前执行的非默认析构函数;
  • 调用不在try/catch块内。

        $co_-await$ 对称转移形式的设计使得它刚好能满足所有需求。我们一个个来看。
        调用机制:当编译器把协程转换成底层状态机时,它实际上分成两部分:启动部分 ( $ramp$,分配和初始化协程框架 ) 和主体 ( 包含用户编写的协程体的状态机 )。
        协程的函数签名 ( 以及所有用户指定的调用机制 ) 只会影响启动函数部分,主体部分则受编译器控制,永远不会被用户代码调用,只能通过 $ramp$ 函数和 $std$::$coroutine_-handle$::$resume\left(\right)$ 调用。
        协程体部分的调用机制是用户不可见的,完全由编译器决定,因此它可以选择一种合适的支持尾调用的机制,并被所有协程体使用。
        返回类型相同:源和目的协程的 $.resume\left(\right)$ 方法的返回值都是void,所以这个需求也能满足。
        没有非默认析构函数:当执行尾调用时,我们需要在调用目标函数前释放当前栈帧,这需要所有栈上对象的生命周期都在调用结束之前。
        一般情况下,一旦作用域内有对象的析构函数非默认且分配在栈上,这些对象会在协程挂起的时候存活,就会产生问题。
        然而,当一个协程挂起时,它不会退出任何作用域,实现方式是把生命周期跨挂起点的对象放在协程帧上而不是栈上。
        局部变量的生命周期不会跨挂起点,可能分配在栈帧上,它们的生命周期会在协程下次挂起时结束。
        因此栈分配对象不存在需要在尾调用返回之前调用的非默认析构函数。
        调用不在try/catch块内:这个可能有点 trick,因为每个协程内都有个隐式的try/catch块,包裹用户编写的协程体。
        根据规范,协程被定义为:

{
  promise_type promise;
  co_await promise.initial_suspend();
  try { F; }
  catch (...) { promise.unhandled_exception(); }
final_suspend:
  co_await promise.final_suspend();
}

        $F$ 就是用户编写的协程体部分。
        因此所有用户编写的 $co_-await$ 表达式 ( 除了 $initial$/$final_-suspend$ ) 都在try/catch块的上下文中。
        然而,实现上会把 $.resume\left(\right)$ 执行放到try/catch块的外部。
        我希望在另一篇文章讲协程怎么变成状态机的文章中讲解更多这方面的细节 ( 这篇文章已经够长了 )。

注意,然而,当前C++规范对实现这个操作需求的描述不够清晰,并且只是一个非规范注释,暗示这可能是必须的。希望我们将来能够修复这个规范。

        那么我们知道了执行对称转移的协程已经满足了所有尾调用的需求。编译器会确保它永远是尾调用,无论是否开启了优化。
        这意味着通过使用 $await_-suspend\left(\right)$ 返回 $std$::$coroutine_-handle$ 的风格,我们可以挂起当前协程,并在不消耗额外栈空间的前提下,把控制权转移给另一个协程。
        这让我们可以编写任意深度的相互递归调用代码,不需要担心栈溢出。
        这就是我们需要修复的 $task$ 的实现。

再看task

        有了“对称转移”能力,我们再来修复 $task$ 类型实现。
        我们要修改两个 $await_-suspend\left(\right)$ 方法的实现:

  • 首先当我们等待 $task$ 时,我们执行对称转移,来恢复任务协程。
  • 其次当任务协程完成,它执行一次对称转移,恢复等待中的协程。

        为了指明等待的方向,我们需要把 $task$::$awaiter$ 方法从:

void task::awaiter::await_suspend(
    std::coroutine_handle<> continuation) noexcept {
  // Store the continuation in the task's promise so that the final_suspend()
  // knows to resume this coroutine when the task completes.
  coro_.promise().continuation = continuation;

  // Then we resume the task's coroutine, which is currently suspended
  // at the initial-suspend-point (ie. at the open curly brace).
  coro_.resume();
}

        改成:

std::coroutine_handle<> task::awaiter::await_suspend(
    std::coroutine_handle<> continuation) noexcept {
  // Store the continuation in the task's promise so that the final_suspend()
  // knows to resume this coroutine when the task completes.
  coro_.promise().continuation = continuation;


  // Then we tail-resume the task's coroutine, which is currently suspended
  // at the initial-suspend-point (ie. at the open curly brace), by returning
  // its handle from await_suspend().
  return coro_;
}

        同时为了指明返回路径,我们需要把 $task$::$promise_-type$::$final_-awaiter$ 方法从:

void task::promise_type::final_awaiter::await_suspend(
    std::coroutine_handle<promise_type> h) noexcept {
  // The coroutine is now suspended at the final-suspend point.
  // Lookup its continuation in the promise and resume it.
  h.promise().continuation.resume();
}

        改成:

std::coroutine_handle<> task::promise_type::final_awaiter::await_suspend(
    std::coroutine_handle<promise_type> h) noexcept {
  // The coroutine is now suspended at the final-suspend point.
  // Lookup its continuation in the promise and resume it symmetrically.
  return h.promise().continuation;
}

        这样我们就有了一个不需要担心栈溢出问题的 $task$ 实现,并且 $await_-suspend$ 是void返回的,没有bool返回带来的不确定恢复上下文问题。

观察栈

        现在让我们看看最初的例子:

task completes_synchronously() {
  co_return;
}

task loop_synchronously(int count) {
  for (int i = 0; i < count; ++i) {
    co_await completes_synchronously();
  }
}

        当其他协程 $co_-await$ $task$ 时,$loop_-synchronously\left(\right)$ 协程首次开始执行。这会触发其他协程的对称转移,并通过 $std$::$coroutine_-handle$::$resume\left(\right)$ 调用恢复。
        因此当 $loop_-synchronously\left(\right)$ 开始时,栈看起来会像这样:

           Stack                                                Heap
+---------------------------+  <-- top of stack   +--------------------------+
| loop_synchronously$resume | active coroutine -> | loop_synchronously frame |
+---------------------------+                     | +----------------------+ |
| coroutine_handle::resume  |                     | | task::promise        | |
+---------------------------+                     | | - continuation --.   | |
|     ...                   |                     | +------------------|---+ |
+---------------------------+                     | ...                |     |
                                                  +--------------------|-----+
                                                                       V
                                                  +--------------------------+
                                                  | awaiting_coroutine frame |
                                                  |                          |
                                                  +--------------------------+

        现在,当执行 $co_-await$ $completes_-synchronously\left(\right)$ ,它会对称转移到 $completes_-synchronously$ 协程。
        通过:

  • 调用 $task$::$operator$ $co_-await\left(\right)$,后者会在稍后返回 $task$::$awaiter$ 对象
  • 然后挂起并调用 $task$::$awaiter$::$await_-suspend\left(\right)$,后者稍后返回 $completes_-synchronously$ 协程的 $coroutine_-handle$。
  • 然后执行尾调用 / 跳转到 $completes_-synchronously$ 协程,在激活 $completes_-synchronously$ 栈帧前弹出 $loop_-synchronously$ 栈帧。

        在 $completes_-synchronously$ 恢复后,栈看起来会像这样:

              Stack                                          Heap
                                            .-> +--------------------------+ <-.
                                            |   | completes_synchronously  |   |
                                            |   | frame                    |   |
                                            |   | +----------------------+ |   |
                                            |   | | task::promise        | |   |
                                            |   | | - continuation --.   | |   |
                                            |   | +------------------|---+ |   |
                                            `-, +--------------------|-----+   |
                                              |                      V         |
+-------------------------------+ <-- top of  | +--------------------------+   |
| completes_synchronously$resume|     stack   | | loop_synchronously frame |   |
+-------------------------------+ active -----' | +----------------------+ |   |
| coroutine_handle::resume      | coroutine     | | task::promise        | |   |
+-------------------------------+               | | - continuation --.   | |   |
|     ...                       |               | +------------------|---+ |   |
+-------------------------------+               | task temporary     |     |   |
                                                | - coro_       -----|---------`
                                                +--------------------|-----+
                                                                     V
                                                +--------------------------+
                                                | awaiting_coroutine frame |
                                                |                          |
                                                +--------------------------+

        注意这里栈帧数量增长了。
        在 $completes_-synchronously$ 协程完成,执行到右大括号处,它会执行 $co_-await$ $promise.final_-suspend\left(\right)$。
        这会挂起协程,并调用 $final_-awaiter$::$await_-suspend\left(\right)$ ,后者返回续体的 $std$::$coroutine_-handle$ ( 即指向 $loop_-synchronously$ 协程的句柄 )。这会触发一次对称转移 / 尾调用来恢复 $loop_-synchronously$ 协程。
        $loop_-synchronously$ 恢复后的栈看起来就像这样:

           Stack                                                   Heap
                                                   +--------------------------+ <-.
                                                   | completes_synchronously  |   |
                                                   | frame                    |   |
                                                   | +----------------------+ |   |
                                                   | | task::promise        | |   |
                                                   | | - continuation --.   | |   |
                                                   | +------------------|---+ |   |
                                                   +--------------------|-----+   |
                                                                        V         |
+----------------------------+  <-- top of stack   +--------------------------+   |
| loop_synchronously$resume  | active coroutine -> | loop_synchronously frame |   |
+----------------------------+                     | +----------------------+ |   |
| coroutine_handle::resume() |                     | | task::promise        | |   |
+----------------------------+                     | | - continuation --.   | |   |
|     ...                    |                     | +------------------|---+ |   |
+----------------------------+                     | task temporary     |     |   |
                                                   | - coro_       -----|---------`
                                                   +--------------------|-----+
                                                                        V
                                                   +--------------------------+
                                                   | awaiting_coroutine frame |
                                                   |                          |

        $loop_-synchronously$ 协程被恢复后,在执行到达分号后,要做的第一件事就是调用 $completes_-synchronously$ 返回的临时 $task$ 的析构函数。这会销毁协程帧,释放内存,这时情况会是这样:

           Stack                                                   Heap
+---------------------------+  <-- top of stack   +--------------------------+
| loop_synchronously$resume | active coroutine -> | loop_synchronously frame |
+---------------------------+                     | +----------------------+ |
| coroutine_handle::resume  |                     | | task::promise        | |
+---------------------------+                     | | - continuation --.   | |
|     ...                   |                     | +------------------|---+ |
+---------------------------+                     | ...                |     |
                                                  +--------------------|-----+
                                                                       V
                                                  +--------------------------+
                                                  | awaiting_coroutine frame |
                                                  |                          |
                                                  +--------------------------+

        我们现在返回到 $loop_-synchronously$ 协程的执行,并且栈帧和协程帧数量与开始执行时一样,并且之后每次循环也是一样。
        因此我们可以执行许多次迭代,只消耗常数级的存储空间。
        完整的对称转移版本的 $task$ 类型可以看以下的 $Compiler$ $Explorer$ 链接:https://godbolt.org/z/9baieF

通用形式 await_suspend 的对称转移

        既然我们已经见识过了对称转移形式的 $awaitable$ $concept$ 的能力和重要性,我想向你展示一下通用形式,理论上可以替换voidbool返回形式的 $await_-suspend\left(\right)$。
        但首先,我们需要看一下P0913R0提案添加的新的协程设计:$std$::$noop_-coroutine\left(\right)$。

循环终止

        通过对称转移形式的协程,每次协程挂起,它都会对称恢复另一个协程。只要你有其他协程可以恢复,它就非常有用。但是有时我们不想要执行其他协程,只需要挂起并把控制权返回给 $std$::$coroutine_-handle$::$resume\left(\right)$ 的调用方。
        voidbool返回形式的 $await_-suspend\left(\right)$ 都允许协程挂起并从 $std$::$coroutine_-handle$::$resmue\left(\right)$ 调用中返回,那么我们怎么让对称转移形式的协程返回呢?
        答案是使用内置的特殊 $std$::$coroutine_-handle$,称为无操作协程句柄 ( $noop$ $coroutine$ $handle$ ),通过函数 $std$::$noop_-coroutine\left(\right)$ 生成。
        无操作协程句柄之所以叫这个名字,是因为它的 $.resume\left(\right)$ 实现是立即返回,即恢复协程后没有操作。一般它的实现包含一条简单的 $ret$ 指令。
        如果 $await_-suspend\left(\right)$ 方法返回 $std$::$noop_-coroutine\left(\right)$ 句柄,那么协程不会把控制权转移给下一个协程,而是转移给 $std$::$coroutine_-handle$::$resume\left(\right)$ 的调用方。

其他风格的 await_suspend() 表示方式

        有了这个信息,我们再看其他风格的 $await_-suspend\left(\right)$ 如何使用对称转移。
        我们有void返回格式的:

void my_awaiter::await_suspend(std::coroutine_handle<> h) {
  this->coro = h;
  enqueue(this);
}

        可以改成bool返回格式的:

bool my_awaiter::await_suspend(std::coroutine_handle<> h) {
  this->coro = h;
  enqueue(this);
  return true;
}

        也可以再改成对称转移风格:

std::noop_coroutine_handle my_awaiter::await_suspend(
    std::coroutine_handle<> h) {
  this->coro = h;
  enqueue(this);
  return std::noop_coroutine();
}

        我们有bool返回格式的:

bool my_awaiter::await_suspend(std::coroutine_handle<> h) {
  this->coro = h;
  if (try_start(this)) {
    // Operation will complete asynchronously.
    // Return true to transfer execution to caller of
    // coroutine_handle::resume().
    return true;
  }

  // Operation completed asynchronously.
  // Return false to immediately resume the current coroutine.
  return false;
}

        可以改成对称转移格式的:

std::coroutine_handle<> my_awaiter::await_suspend(std::coroutine_handle<> h) {
  this->coro = h;
  if (try_start(this)) {
    // Operation will complete asynchronously.
    // Return std::noop_coroutine() to transfer execution to caller of
    // coroutine_handle::resume().
    return std::noop_coroutine();
  }

  // Operation completed asynchronously.
  // Return current coroutine's handle to immediately resume
  // the current coroutine.
  return h;
}

为什么有三种风格?

        那么为什么在有对称转移风格的前提下,我们还继续使用voidbool返回风格的 $await_-suspend\left(\right)$ 呢?
        一部分历史原因,一部分实用性原因,一部分性能原因。
        $await_-suspend\left(\right)$ 返回 $std$::$noop_-coroutine_-handle$ 可以完全替代void返回版本,因为这两种对于编译器来说,都表示协程会无条件地把控制权转移给 $std$::$coroutine_-handle$::$resume\left(\right)$ 的调用方。
        在我看来,它还能留下来的原因,一部分是它在对称转移被提出之前就已经在使用了,另一部分是因为void返回对于无条件挂起的情况可以少写一点代码。
        bool返回版本,在某些情况下,可以优化得比对称转移形式的更好。
        假设我们有一个bool返回的 $await_-suspend\left(\right)$,定义在另一个编译单元。这时编译器可以在等待协程处生成代码,挂起当前协程并在 $await_-suspend\left(\right)$ 调用返回之后,通过执行下一块代码的方式来有条件地恢复它。如果 $await_-suspend\left(\right)$ 返回false,那么就可以明确知道需要执行哪段代码。
        即使有了对称转移风格,我们还是需要表示相同的结果:返回给调用方 / 恢复方,或者恢复当前协程。相比返回true或者false,我们需要返回 $std$::$noop_-coroutine\left(\right)$ 或者当前协程的句柄。我们可以把这些句柄都统一成 $std$::$coroutine_-handle$<$void$> 返回。
        然而,因为 $await_-suspend\left(\right)$ 定义在其他编译单元,编译器看不到协程返回的句柄指向什么,所以当协程恢复后,它需要一些更重的间接调用,并且相比bool返回的单个分支而言,可能会有更多的恢复协程分支。
        有可能在将来的某一天,对称转移版本可以获得同等性能。例如,我们可以编写内联的 $await_-suspend\left(\right)$,但是调用一个bool返回的非内联方法,并有条件地返回合适的句柄。
        例如:

struct my_awaiter {
  bool await_ready();

  // Compiler should in-theory be able to optimise this to the same
  // as the bool-returning version, but currently don't do this optimisation.
  std::coroutine_handle<> await_suspend(std::coroutine_handle<> h) {
    if (try_start(h)) {
      return std::noop_coroutine();
    } else {
      return h;
    }
  }

  void await_resume();

private:
  // This method is defined out-of-line in a seperate translation unit.
  bool try_start(std::coroutine_handle<> h);
}

        然而,现在的编译器 ( 比如Clang 10 ) 还不能把这种情况优化成和bool返回版本一样高效的代码。话虽如此,除非你编写了一个非常紧凑的循环,否则可能不会注意到它们的差异。
        到目前为止,通用的规则是:

  • 如果你需要无条件地返回给 $.resume\left(\right)$ 的调用方,使用void返回风格。
  • 如果你需要有条件地返回给 $.resume\left(\right)$ 的调用方,或者恢复当前协程,使用bool返回风格。
  • 如果你需要恢复其他协程,使用对称转移风格。

补充

        C++ 20新加的对称转移能力,使得协程递归恢复对方变得简单,不需要担心栈溢出。这个能力是编写高效、安全的异步协程类型的关键,就像 $task$ 那样。
        这篇关于对称转移的文章比预期的要长,十分感谢你坚持读完了它!希望能帮到你。
        在下篇文章,我会讲解编译器怎么把协程函数转换成状态机。

致谢

        也是不翻了~

C++协程(3):理解promise

        这篇文章是C++协程标准系列的第三篇文章。
        之前的文章可以在这里查看:

        在这篇文章我会讲解你写的代码是怎么被编译器编译成协程代码的,并且你可以通过定义自己的 $Promise$ 类型来自定义协程行为。

协程概念

        协程标准添加了三个新的关键字:$co_-await$,$co_-yield$ 和 $co_-return$ 。无论你在函数体里使用哪个,编译器都会把这个函数编译成协程,这个函数也不再是普通函数。
        编译器使用一些相当机械的转换来把你写的代码变成状态机,从而可以在函数内的特定点挂起,并在之后恢复执行。
        在之前的文章我描述了协程标准引入的两个接口中的第一个接口:$Awaitable$ 接口。而第二个接口, $Promise$ 接口则对于这种代码转换来说十分重要。
        $Promise$ 接口规定了自定义它所在协程行为的方法。库开发者可以用它定义协程被调用时的行为,协程返回的行为 ( 包括普通方式返回或者通过未处理异常返回 ),协程内使用 $co_-await$ 或者 $co_-yield$ 表达式的行为。

Promise对象

        $Promise$ 对象通过实现协程执行过程中特定点的调用方法的形式来定义和控制对应协程的行为。

在继续之前,我希望你能忘掉之前所有关于 $promise$ 是什么的记忆。在一些用例中,协程的 $promise$ 对象与 $std$::$future$ 的 $std$::$promise$ 的功能很相似,但在其他用例中,并不能拿来相比。可能把协程的 $promise$ 对象想象成一个用于控制协程行为、跟踪协程状态的“协程状态控制器”会更合适。

        $promise$ 对象实例会跟随着每个协程函数调用时创建的协程帧一起被构造。
        编程器会在协程执行的关键点生成对 $promise$ 对象的特定方法的调用。
        在接下来的例子中,我们把某个特定协程在协程帧内创建的 $promise$ 对象叫做 $promise$ 。
        当你编写一个协程函数体 <$body$-$statement$> ,函数体包含了某个协程关键字 ( $co_-return$,$co_-await$,$co_-yield$ ),那么协程体将转变为 ( 大致地 ) 如下形式:

{
  co_await promise.initial_suspend();
  try
  {
    <body-statement>
  }
  catch (...)
  {
    promise.unhandled_exception();
  }
FinalSuspend:
  co_await promise.final_suspend();
}

        一个协程函数被调用后,在执行之前会有一系列的准备步骤,这些步骤与常规的函数会有些不同。
        以下是一个步骤总结 ( 我会在接下来详细介绍每个步骤 ):

  1. 通过 $operator$ $new$ 分配协程帧 ( 可选的 )。
  2. 将所有函数参数拷贝到协程帧。
  3. 调用 $promise$ 对象的构造函数,$promise$ 类型记为 $P$ 。
  4. 调用 $promise.get_-return_-object\left(\right)$ 方法获取协程首次挂起时需要返回给调用方的返回值,并保存为一个局部变量。
  5. 调用 $promise.initial_-suspend\left(\right)$ ,并将返回值作为 $co_-await$ 的参数调用。
  6. 当 $co_-await$ $promise.initial_-suspend\left(\right)$ 表达式恢复 ( 可能立即恢复或者异步恢复 ) 后,协程开始执行你编写的协程体语句。

        当协程执行到 $co_-return$ 语句时,会进行一些额外的步骤:

  1. 调用 $promise.return_-void\left(\right)$ 或者 $promise.return_-value$(<$expr$>)。
  2. 以与创建顺序相反的顺序自动销毁所有具有自动生命周期的变量。
  3. $co_-await$ 调用 $promise.final_-suspend\left(\right)$ 获取结果。

        如果执行因为一个未处理异常停止,那么会发生:

  1. catch块中捕获异常,调用 $promise.unhandled_-exception\left(\right)$ 。
  2. 调用 $promise.final_-suspend\left(\right)$ ,并将返回值作为 $co_-await$ 的参数调用。

        一旦执行到协程体之外,协程帧就会被销毁。通过以下步骤销毁协程帧:

  1. 调用 $promise$ 对象的析构函数。
  2. 调用拷贝的函数参数的析构函数。
  3. 调用 $operator$ $delete$ 释放协程帧的内存空间 ( 可选的 )。
  4. 返回控制权给调用方 / 恢复方。

        当执行首次到达 $co_-await$ 表达式的 <$return$-$to$-$caller$-$resumer$> 点,或者协程没有到达任何 <$return$-$to$-$caller$-$or$-$resumer$> 点就完成时,协程要么被挂起,要么被销毁,然后之前调用 $promise.get_-return_-object\left(\right)$ 返回的结果就会被返回给协程的调用方。

分配协程帧

        首先,编译器生成一个对协程帧的 $operator$ $new$ 调用来分配内存。
        如果 $promise$ 类型 $P$,定义了一个自身版本的 $operator$ $new$ ,那么就会调用它定义的,否则就调用全局的。
        这里有几个重点需要注意:
        传给 $operator$ $new$ 的大小不是 $sizeof\left(P\right)$ ,而是整个编译过程中编译器根据:入参数量和大小、$promise$ 对象的大小、局部变量的数量和大小、以及一些其他编译相关的管理协程状态的空间大小,计算出来的协程帧大小。
        如果满足以下条件,编译器可以省掉对 $operator$ $new$ 的调用:

  • 协程帧的生命周期严格内嵌于调用方的生命周期。
  • 编译器可以在调用点计算出协程帧需要的内存大小。

        在这种情况下,编译器可以在调用方的调用帧 ( 栈帧部分或者协程帧部分 ) 上分配协程帧的空间。
        协程标准没有定义什么情况下需要跳过分配,所以你需要在协程帧分配可能抛出 $std$::$bad_-alloc$ 异常的前提下编写代码。这意味着你不应该把一个协程函数声明成noexcept,除非你确定协程分配协程帧内存失败后会调用 $std$::$terminate\left(\right)$ 。
        然而,有一个后备方案可以处理协程帧分配失败。在不允许异常的环境下,例如集成场景或者高性能场景这种不能容忍异常开销的场景,这种方式十分必要。
        如果 $promise$ 类型提供了一个静态的 $P$::$get_-return_-object_-on_-allocation_-failure\left(\right)$ 成员函数,编译器会生成一个对 $operator$ $new$($size_-t$, $nothrow_-t$) 的重载调用。如果调用返回nullptr,那么协程会立即调用 $P$::$get_-return_-object_-on_-allocation_-failure\left(\right)$ 并把结果返回给调用方,而不是抛出异常。

自定义协程帧内存分配

        你的 $promise$ 类型可以重载 $operator$ $new$ ,这样如果编译器需要分配协程帧内存,就会使用你定义的 $operator$ $new$ 而不是全局的 $operator$ $new$ 。
        例如:

struct my_promise_type
{
  void* operator new(std::size_t size)
  {
    void* ptr = my_custom_allocate(size);
    if (!ptr) throw std::bad_alloc{};
    return ptr;
  }

  void operator delete(void* ptr, std::size_t size)
  {
    my_custom_free(ptr, size);
  }

  ...
}

        我知道你想问什么:“要怎么自定义 $allocator$ 呢?”。
        你也可以提供 $P$::$operator$ $new\left(\right)$ 的重载,接收额外的参数,如果能找到合适的重载,将使用协程函数参数的左值引用调用。这种方式可以用来 $hook$ $operator$ $new$ ,使其调用某个作为参数传递给协程函数的 $allocator$ 的 $allocate\left(\right)$ 方法。
        你需要做一些额外工作来在分配好的内存中拷贝一份 $allocator$ 的副本,这样才能在相应的 $operator$ $delete$ 调用中引用它,因为 $allocator$ 不会作为参数传递给的 $operator$ $delete$ 。这是因为入参会被存储在协程帧,所以他们有可能会在 $operator$ $delete$ 调用之前就被销毁了。
        例如,你可以实现 $operator$ $new$ 来给协程帧分配一些额外内存,并利用这个空间来存储 $allocator$,用于释放协程帧内存。
        例如:

template <typename ALLOCATOR>
struct my_promise_type
{
  template <typename... ARGS>
  void* operator new(std::size_t sz, std::allocator_arg_t, ALLOCATOR& allocator, ARGS&... args)
  {
    // Round up sz to next multiple of ALLOCATOR alignment
    std::size_t allocatorOffset =
      (sz + alignof(ALLOCATOR) - 1u) & ~(alignof(ALLOCATOR) - 1u);
    
    // CAll onto allocator to allocate space for coroutine frame.
    void* ptr = allocator.allocate(allocatorOffset + sizeof(ALLOCATOR));


    // Take a copy of the allocator (assuming noexcept copy constructor here)
    new (((char*)ptr) + allocatorOffset) ALLOCATOR(allocator);

    return ptr;
  }

  void operator delete(void* ptr, std::size_t sz)
  {
    std::size_t allocatorOffset =
      (sz + alignof(ALLOCATOR) - 1u) & ~(alignof(ALLOCATOR) - 1u);

    ALLOCATOR& allocator = *reinterpret_cast<ALLOCATOR*>(
      ((char*)ptr) + allocatorOffset);

    // Move allocator to local variable first so it isn't freeing its
    // own memory from underneath itself.
    // Assuming allocator move-constructor is noexcept here.
    ALLOCATOR allocatorCopy = std::move(allocator);

    // But don't forget to destruct allocator object in coroutine frame
    allocator.~ALLOCATOR();

    // Finally, free the memory using the allocator.
    allocatorFactory.deallocate(ptr, allocatorOffset + sizeof(ALLOCATOR));
  }
}

        为了 $hook$ 成功,让自定义的 $my_-promise_-type$ 被协程使用,需要让 $std$::$allocator_-arg$ 作为第一个入参,这需要通过 $coroutine_-traits$ 类 ( 详细见后续的 $coroutine_-traits$ 章节 ) 来指定。
        例如:

namespace std::experimental
{
  template <typename ALLOCATOR, typename... ARGS>
  struct coroutine_traits<my_return_type, std::allocator_arg_t, ALLOCATOR, ARGS...>
  {
    using promise_type = my_promise_type<ALLOCATOR>;
  };
}

        注意,即使你自定义了协程的内存分配策略,编译器还是可以选择跳过你的自定义内存分配器

拷贝参数到协程帧

        协程需要从原始调用方中把所有传给协程的参数拷贝到协程帧,这样才能保证协程被挂起后,它们依然有效。
        如果参数是值传递的,那么这些参数将会通过移动构造函数拷贝到协程帧。
        如果参数以引用传递 ( 左值或者右值 ),那么只有引用会被拷贝到协程帧,而不是它们指向的值。
        注意如果类型的析构函数是默认的,并且在到达 <$return$-$to$-$caller$-$or$-$resumer$> 点后不再引用,编译器可以选择跳过该参数副本的析构函数。
        在以引用方式传递参数给协程的过程中,存在几个陷阱,也因此你不能在协程生命周期中依赖引用。许多常见的用于普通函数的技术,例如完美转发和通用引用,用到协程上可能导致代码出现未定义行为。如果你想了解更多,Toby Allsopp 写了一篇关于这个主题的好文章
        如果某个参数的拷贝/移动构造函数抛出了异常,那么已经构造好的参数的析构函数将被调用,协程帧会被释放,异常会传播给调用方。

构造 promise 对象

        一旦所有参数被拷贝到协程帧,协程会构造 $promise$ 对象。
        参数在 $promise$ 对象之前构造的原因是让 $promise$ 对象可以在构造函数中访问先前拷贝的参数。
        首先,编译器检查 $promise$ 是否存在接收每个拷贝参数左值引用作为参数的重载构造函数,如果找到了这种函数,编译器会生成对这个构造函数的调用,如果没有找到,编译器会使用 $promise$ 类型的默认构造函数。
        注意 $promise$ 构造函数可以“看到”入参是一个对协程标准相对较新的改动,在 Jacksonville 2018 会议的N4723 中被采纳,提案为P0914R1。因此对于一些较老版本的Clang或者MSVC来说可能还不支持。
        如果 $promise$ 构造函数抛出了一个异常,那么入参的拷贝会析构,协程帧会在异常传播回给调用方之前被释放。

获取返回对象

        协程要对 $promise$ 对象做的第一件事是调用 $promise.get_-return_-object\left(\right)$ 获取返回对象。
        返回对象是协程函数初次挂起或者运行完成移交控制权后要返回给调用方的值。
        你可以认为控制流 ( 大致上 ) 是这样的:

// Pretend there's a compiler-generated structure called 'coroutine_frame'
// that holds all of the state needed for the coroutines. Its constructor
// takes a copy of parameters and default-constructs a promise object.
struct coroutine_frame { ... };

T some_corouine(P param)
{
  auto* f = new coroutine_frame(std::forward<P>(param));

  auto returnObject = f->promise.get_return_object();

  // Start execution of the coroutine body by resuming it.
  // This call will return when the coroutine gets to the first.
  // suspend-point or when the coroutine runs to completion.
  coroutine_handle<decltype(f->promise)>::from_promise(f->promise).resume();

  // Then the return object is returned to the caller.
  return returnObject;
}

        注意我们需要在协程体开始执行之前获取返回对象,因为协程帧 ( 或者说 $promise$ 对象 ) 可能在 $coroutine_-handle$::$resume\left(\right)$ 调用返回前就被销毁,可能在当前线程销毁,也可能在其他线程,所以在协程体开始执行后调用 $get_-return_-object\left(\right)$ 是不安全的。

初次挂起点

        协程帧初始化,获取返回对象之后,要做的下一件事是执行 $co_-await$ $promise.initial_-suspend\left(\right);$ 。
        这让 $promise$ 类型的开发者可以控制协程在执行编写好的协程体代码之前是否需要挂起,或者立即执行协程体。
        如果协程在初次挂起点挂起,那么它可以在之后你选择的某个时间点对 $coroutine_-handle$ 调用 $resume\left(\right)$ 来恢复,或者调用 $destroy\left(\right)$ 销毁。
        $co_-await$ $promise.initial_-suspend\left(\right)$ 表达式的结果会被丢弃,所以 $awaiter$ 的 $await_-resume\left(\right)$ 函数实现应该返回void
        特别注意这个语句是在try/catch块保护之外的 ( 如果你忘记了,可以往上翻翻再看下代码 )。这意味着 $co_-await$ $promise.initial_-suspend\left(\right)$ 抛出的异常等同于到达它的 <$return$-$to$-$caller$-$or$-$resumer$> 点,会在协程销毁调用栈和返回对象后,再抛回给调用方。
        要明白如果你的返回对象是RAII的,即会在协程帧被析构的时候销毁,那么你需要保证 $co_-await$ $promise.initial_-suspend\left(\right)$ 是noexcept的,这样才能避免 double-free。

注意这里有个提案,希望调整语义,以便 $co_-await$ $promise.initial_-suspend\left(\right)$ 表达的全部或者部分位于try/catch块内,所以这里的明确语义可能会在最终确定前发生变化。

        对于许多协程类型来说,$initial_-suspend\left(\right)$ 方法要么返回 $std$::$experimental$::$suspend_-always$ ( 如果操作懒开始 ),要么返回 $std$::$experimental$::$suspend_-never$ ( 如果操作立即开始 )。而它们都是noexcept的 $awaitable$ ,所以这通常不是问题。

返回给调用方

        当协程函数达到首个 <$return$-$to$-$caller$-$or$-$resumer$> 点 ( 或者没有到达,但是协程执行完成 ) 时,通过 $get_-return_-object\left(\right)$ 获取的返回对象会返回给协程的调用方。
        注意返回对象的类型并不需要与协程函数的返回类型相同。在必要的时候,返回对象可以是一个能够隐式转换为协程返回类型的类型。

注意Clang的协程实现 ( 从$5.0$ 开始 ) 会在返回对象从协程调用返回后才会转换,而MSVC自从 2017 Update 3 开始会在 $get_-return_-object\left(\right)$ 调用之后立即转换。虽然协程标准没有显式给出预期行为,我相信MSVC有计划把它们的实现改成更像Clang的实现,因为这种实现允许一些有趣的用例

使用 co_return 从协程返回

        当协程到达 $co_-return$ 语句,它会被翻译成一个 $promise.return_-void\left(\right)$ 调用或者 $promise.return_-value$(<$expr$>) 以及一个 $goto$ $FinalSuspend$; 调用。
        翻译规则如下:

  • $co_-return$;
    • -> $promise.return_-void\left(\right)$
  • $co_-return$ <$expr$>
    • -> 如果 <$expr$> 类型为void,<$expr$>; $promise.return_-void\left(\right)$;
    • -> 如果 <$expr$> 类型不为void,$promise.return_-value$(<$expr$>);

        随后的 $goto$ $FinalSuspend;$ ,会在 $co_-await$ $promise.final_-suspend\left(\right)$ 之前触发所有自动生命周期的局部变量按照与构造顺序相反的顺序析构。
        注意如果协程函数体代码没有以 $co_-return$ 语句结束,会等同于以 $co_-return;$ 结束。在这个例子中,如果 $promise$ 类型没有 $return_-void\left(\right)$ 方法,那么行为会是未定义的。
        如果 <$expr$> 或者 $promise.return_-void\left(\right)$ 调用或者 $promise.return_-value\left(\right)$ 抛出了一个异常,那么异常仍然会传给 $promise.unhandled_-exception\left(\right)$ ( 见下方 )。

处理传播到协程体以外的异常

        如果一个异常传播到协程体之外,那么异常会被catch块捕获,并调用 $promise.unhandled_-exception\left(\right)$ 方法。
        这个方法典型实现是调用 $std$::$current_-exception\left(\right)$ 来捕获异常的副本,在后续抛出到其他上下文。
        其他可选实现是立即通过 $throw;$ 语句重新抛出异常,例如 $folly$::$Optional$。然而,这么做会 ( 或者说很可能,见下方 ) 导致协程帧被立即销毁,并将异常传播回调用方 / 恢复方。这可能会导致某些抽象出现问题,因为它们假设 / 要求对 $coroutine_-handle$::$resume\left(\right)$ 的调用是noexcept,所以你应该只在能够完全控制谁 / 什么调用 $resume\left(\right)$ 时使用这种方式。
        注意现在协程标准在调用 $unhandled_-exception\left(\right)$ 时重新抛出异常 ( 或者在try-catch块之外抛出异常 ) 的预期行为描述并不是很清晰
        我现在对标准的解释是如果控制权离开了协程体,可以通过 $co_-await$ $promise.initial_-suspend\left(\right)$ 、$promise.unhandled_-exception\left(\right)$ 或者 $co_-await$ $promise.final_-suspend\left(\right)$ 把异常传播出去,也可以通过 $co_-await$ $promise.final_-suspend\left(\right)$ 调用同步结束协程执行。无论哪种方式,都会在返回给调用方 / 恢复方前自动销毁协程帧。然而,这种解释也有问题。
        我希望未来版本能够细化并描述清楚这种情况。无论如何,在那之前我不会从 $initial_-suspend\left(\right)$ 、$final_-suspend\left(\right)$ 或者 $unhandled_-exception\left(\right)$ 中抛出异常。敬请关注!

最终挂起点

        一旦协程体执行离开了用户定义的部分后,结果会通过 $return_-void\left(\right)$ 、$return_-value\left(\right)$ 或者 $unhandled_-exception\left(\right)$ 调用捕获,所有局部变量被析构,在返还控制权给调用方 / 恢复方之前,协程可以执行一些额外的逻辑。
        这时候协程执行的是 $co_-await$ $promise.final_-suspend\left(\right);$ 语句。
        它允许协程执行一些逻辑,比如发布结果,发出完成信号或者恢复后续协程执行。它也允许协程选择性地在协程执行完成、协程帧被销毁前立即挂起。
        注意 $resume\left(\right)$ 一个在 $final_-suspend$ 点挂起的协程是未定义行为。对于这种协程,你只能调用 $destroy\left(\right)$。
        根据 Gor Nishanov 的观点,这种限制的理由是它能让编译器做出更多优化,因为最终挂起点无需维护完整的协程上下文,从而优化了协程状态,而且降低了分支复杂度。
        注意协程也可以不在 $final_-suspend$ 点挂起,只是建议你尽可能设计一个会在这个点挂起的协程。因为这样可以强制你在协程外调用 $.destroy\left(\right)$ ( 通常通过RAII对象的析构函数调用 ),并且可以方便编译器确定协程帧的生命周期是否内嵌于调用方,从更让编译器更有可能优化掉协程帧的内存分配。

编译器怎么选择 promise 类型

        让我们看看编译器怎么决定协程使用哪个 $promise$ 类型。
        协程 $promise$ 对象的类型通过 $std$::$experimental$::$coroutine_-traits$ 类决定。
        如果你有一个协程函数签名如下:

task<float> foo(std::string x, bool flag);

        那么编译器会通过返回类型和参数类型作为 $coroutine_-traits$ 的模板参数来推导协程的 $promise$ 类型。

typename coroutine_traits<task<float>, std::string, bool>::promise_type;

        如果函数是非静态成员函数,class类型会作为第二个模板参数传递给 $coroutine_-traits$。注意如果你的方法是右值引用重载的,那么第二个模板参数也会是右值引用。
        例如,如果你有以下方法:

task<void> my_class::method1(int x) const;
task<foo> my_class::method2() &&;

        编译器会使用以下 $promise$ 类型:

// method1 promise type
typename coroutine_traits<task<void>, const my_class&, int>::promise_type;

// method2 promise type
typename coroutine_traits<task<foo>, my_class&&>::promise_type;

        $coroutine_-traits$ 模板会默认查找返回类型是否存在内嵌 $promise$ 类型,并使用其作为 $promise$ 类型,例如这样 ( 不过通过一些SFINAE魔法,如果 $RET$::$promise_-type$ 没有定义,那么 $promise$ 类型也是未定义的 ):

namespace std::experimental
{
  template <typename RET, typename... ARGS>
  struct coroutine_traits<RET, ARGS...>
  {
    using promise_type = typename RET::promise_type;
  };
}

        所以对于你可以控制的协程返回类型,你可以在里面定义一个内嵌的 $promise$ 类型,让编译器直接使用作为协程的 $promise$ 对象。
        例如:

template <typename T>
struct task
{
  using promise_type = task_promise<T>;
  ...
};

        然而,对于你无法控制的协程返回类型,你可以在不修改类型的前提下,通过指定 $coroutine_-traits$ 来控制 $promise$ 类型。
        例如,给一个返回 $std$::$optional$<$T$> 的协程定义 $promise$ 类型:

namespace std::experimental
{
  template <typename T, typename... ARGS>
  struct coroutine_traits<std::optional<T>, ARGS...>
  {
    using promise_type = optional_promise<T>;
  };
}

识别特定协程调用帧

        当你调用一个协程函数时,协程帧会被创建。为了恢复关联的协程或者销毁协程帧,你需要一些方法识别或者引用对应的协程帧。
        协程标准通过 $coroutine_-handle$ 类型提供了这个机制。
        类型接口 ( 大致上 ) 如下:

namespace std::experimental
{
  template <typename Promise = void>
  struct coroutine_handle;

  // Type-erased coroutine handle. Can refer to any kind of coroutine.
  // Doesn't allow access to the promise object.
  template <>
  struct coroutine_handle<void>
  {
    // Constructs to the null handle.
    constexpr coroutine_handle();

    // Convert to/from a void* for passing into C-style interop functions.
    constexpr void* address() const noexcept;
    static constexpr coroutine_handle from_address(void* addr);

    // Query if the handle is non-null.
    constexpr explicit operator bool() const noexcept;

    // Query if the coroutine is suspended at the final_suspend point.
    // Undefined behavior if coroutine is not currently suspended.
    bool done() const;

    // Resume/Destroy the suspended coroutinie
    void resume();
    void destroy();
  };

  // Coroutine handle for coroutines with a known promise type.
  // Template argument must exactly match coroutine's promise type.
  template <typename Promise>
  struct coroutine_handle : coroutine_handle<>
  {
    using coroutine_handle<>::coroutine_handle;

    static constexpr coroutine_handle from_address(void* addr);

    // Access to the coroutine's promise object.
    Promise& promise() const;

    // You can reconstruct the coroutine handle from the promise object.
    static coroutine_handle from_promise(Promise& promise);
  };
}

        你可以通过两种方式获取一个协程的 $coroutine_-handle$ :

  1. $coroutine_-handle$ 会在 $co_-await$ 表达式中被传递给 $await_-suspend\left(\right)$ 。
  2. 如果你有一个协程 $promise$ 对象的引用,你可以通过 $coroutine_-handle$<$Promise$>::$from_-promise\left(\right)$ 重新构造出它的 $coroutine_-handle$ 。

        当协程到达$co_-await$ 表达式的 <$suspend$-$point$> 被挂起后,等待该协程的其他协程的 $coroutine_-handle$ 会被传给 $awaiter$ 的 $await_-suspend\left(\right)$ 方法。你可以认为这个 $coroutine_-handle$ 是 续体传递风格 ( $continuation-passing$ $style$ ) 中协程续体的表示。
        注意 $coroutine_-handle$ 不是RAII对象。你必须手动调用 $.destroy\left(\right)$ 来销毁协程帧,释放资源。可以把它视为一个用于管理内存的 $void*$ 值。这么设计是出于性能原因:RAII会带来额外的开销,比如需要引用计数管理。
        你应该尝试使用更高级的支持协程RAII语义的类型,例如cppcoro提供的 ( 不要脸地植入 ),或者编写一个你自己的更高级的类型,封装你协程类型的协程帧生命周期。

自定义 co_await 行为

        $promise$ 类型可以自定义出现在协程体内的每个 $co_-await$ 表达式行为。
        通过简单地定义 $promise$ 类型的 $await_-transform\left(\right)$ 方法,编译器会把协程体里的每个 $co_-await$ <$expr$> 转换成 $co_-await$ $promise.await_-transform$(<$expr$>)。
        这里有一些重要且有用的用法:
        它能让协程等待一些非 $awaitable$ 类型。
        例如,一个返回 $std$::$optional$<$T$> 类型的协程可能会重载 $promise$ 类型的 $await_-transform\left(\right)$ ,接受 $std$::$optional$<$U$> 并返回一个 $awaitable$ 类型,该类型会返回类型 $U$ 的值,或者在nullopt时挂起协程。

template <typename T>
class optional_promise
{
  ...

  template <typename U>
  auto await_transform(std::optional<U>& value)
  {
    class awaiter
    {
      std::optional<U>& value;
    public:
      explicit awaiter(std::optional<U>& x) noexcept : value(x) {}
      bool await_ready() noexcept { return value.has_value(); }
      void await_suspend(std::experimental::coroutine_handle<>) noexcept {}
      U& await_resume() noexcept { return *value; }
    };
    return awaiter{ value };
  }
}

        它可以让你通过删除 $await_-transform$ 方法来禁止等待一个特定类型。
        例如,一个 $std$::$generator$<$T$> 返回类型的 $promise$ 类型可能会声明一个已删除的 $await_-transform\left(\right)$ 且接收所有类型的模板成员函数。这基本上禁止了所有在协程内的 $co_-await$ 调用。

template <typename T>
class generator_promise
{
  ...

  // Disable any use of co_await within this type of coroutine.
  template <typename U>
  std::experimental::suspend_never await_transform(U&&) = delete;
};

        它可以适配和改变常规 $awaitable$ 值的行为。
        例如,你可以定义一个协程类型,通过把 $awaitable$ 包装在 $resume_-on\left(\right)$ 操作中 ( 参考 $cppcoro$::$resume_-on\left(\right)$ ),保证协程始终在关联的 $executor$ 上的 $co_-await$ 表达式中恢复。

template <typename T, typename Executor>
class executor_task_promise
{
  Executor executor;

public:

  template <typename Awaitable>
  auto await_transform(Awaitable&& awaitable)
  {
    using cppcoro::resume_on;
    return resume_on(this->executor, std::forward<Awaitable>(awaitable));
  }
};

        作为 $await_-transform\left(\right)$ 介绍的最后一句话,特别要注意的是,如果 $promise$ 类型定义了 $await_-transform\left(\right)$ 成员,编译器就会把所有 $co_-await$ 替换成 $promise.await_-transform\left(\right)$ 调用。这意味着如果你想要只给某些类型定制 $co_-await$ 行为,你也需要重载默认的只转发参数的 $await_-transform\left(\right)$ 实现。

自定义 co_yield 行为

        最后一个你可以通过 $promise$ 类型自定义行为的是 $co_-yield$ 关键字。
        如果 $co_-yield$ 关键字在协程内出现,编译器会把表达式 $co_-yield$ <$expr$> 翻译成 $co_-await$ $promise.yield_-value$(<$expr$>) 。因此 $co_-yield$ 的行为可以通过定义一个或多个 $promise$ 类型的 $yield_-value\left(\right)$ 方法定制。
        注意,不像 $await_-transform$ ,如果 $promise$ 类型没有定义 $yield_-value\left(\right)$ 方法,$co_-yield$ 没有默认行为。所以如果需要禁止 $co_-await$ 表达式,$promise$ 类型需要显式删除 $await_-transform\left(\right)$ 方法,但是 $co_-yield$ 则不用。
        一种典型的 $promise$ 类型的 $yield_-value\left(\right)$ 方法实现是 $generator$<$T$> 类型:

template <typename T>
class generator_promise
{
  T* valuePtr;
public:
  ...

  std::experimental::suspend_always yield_value(T& value) noexcept
  {
    // Stash the address of the yielded value and then return an awaitable
    // that will cause the coroutine to suspend at the co_yield expression.
    // Execution will then return from the call to coroutine_handle<>::resume()
    // inside either generator<T>::begin() or generator<T>::iterator::operator++().
    valuePtr = std::addressof(value);
    return {};
  }
};

总结

        在这篇文章中,我依次介绍了编译器将函数编译成协程时的每个转换。
        希望这篇能够帮你理解怎么通过定义自己的 $promise$ 类型的方式来定制不同类型的协程行为。协程机制中有许多可以变动的部分,所以有许多种自定义行为的方式。
        然而,编译器有一个更重要的转换我还没讲——把协程体转换成状态机。不过要是讲这块的话,这篇文章就太长了,所以我把它放到了下一篇文章。请保持关注!

C++协程(2):理解co_await

        在之前的文章中,我介绍了函数和协程在高级表现的区别,但还没有讲到到C++协程标准的语法和语义。
        C++协程标准提供的一个关键能力是挂起协程,并在之后恢复。这个机制是通过 $co_-await$ 提供的。
        在揭开协程的神秘面纱前,我们需要理解 $co_-await$ 的工作方式,了解它是如何挂起和恢复协程的。在这篇文章中,我会解释 $co_-await$ 的机制,并介绍AwaitableAwaiter的概念。
        在这之前,作为背景,我想先简单回顾下协程标准。

协程标准带来了什么?

  • 三个关键字 $co_-await$ ,$co_-yield$ 和 $co_-return$
  • 一些 $std$::$experimental$ 命名空间的新类型:
    • $coroutine_-handle$<$P$>
    • $coroutine_-traits$<$Ts$…>
    • $suspend_-always$
    • $suspend_-never$
  • 一种允许库开发者与协程交互、定制化行为的机制
  • 一个让异步代码更容易编写的语言能力

        C++协程标准可以认为在语言层面提供了一个低级汇编语言协程。这些功能很难以一种安全的方式直接使用,主要是提供给库开发者来实现高级抽象,从而应用开发者可以安全使用。
        之后的语言标准 ( 希望是C++20 ) 计划加入这些低级功能,同时让标准库提供一些包装了这些功能的高级类型,让应用开发者以更方便安全的方式使用。

编译器 <-> 库交互

        有趣的是,协程标准并没有明确定义协程语义,包括协程如何把值返回给调用方,$co_-return$ 如何处理返回值,如何处理传播到协程之外的异常,以及协程应该在哪个线程恢复。
        相反的,它指定了一种可以让库代码定制化协程行为的机制,通过实现特定的接口类型。编译器使用库代码生成对应的调用。这种方式很像range模式,后者可以让库开发者通过定义 $begin\left(\right)$ / $end\left(\right)$ 和 $iterator$ 类型,来定制化代码。
        协程的这种没有给机制定义特殊语义的方式,让它变成了一个强大的工具,允许库开发者实现各种类型的协程,以各种方式,出于各种目的。
        例如,你可以实现一个异步生成单个值的协程,或者一个懒生成值序列的协程,或者一个简单消费 $optional$<$T$> 值,如果遇到 $nullopt$ 就提前退出的协程。
        协程标准定义了两个接口:$Promise$ 和 $Awaitable$ 。
        $Promise$ 接口指定了协程定制化行为的方法。库开发者可以通过该接口,实现当协程被调用时的行为,协程返回时的行为 ( 包括正常返回和抛出未处理异常 ),协程内使用 $co_-await$ 或者 $co_-yield$ 的行为。
        $Awaitable$ 接口指定控制 $co_-await$ 语义的方法。当一个值是可以 $co_-await$ 的,代码就会被翻译成一系列的对 $awaitable$ 对象方法的调用。这个对象可以决定是否挂起当前协程,在挂起前执行一些逻辑用于后续的恢复,以及在恢复后生成 $co_-await$ 表达式的值。
        我会在之后的文章再介绍 $Promise$ ,现在先来看看 $Awaitable$ 接口。

Awaiters 和 Awaitables:解释co_await

        $co_-await$ 运算符是一个新的一元运算符,接收一个值,比如:$co_-await$ $someValue$ 。
        $co_-await$ 运算符只能在协程上下文中使用。这有一点重复定义了,因为根据定义,包含 $co_-await$ 操作符的函数都应该被编译成协程。
        支持 $co_-await$ 运算符的类型被叫做 $Awaitable$ 类型。
        注意,$co_-await$ 运算符能否对一个类型使用,取决于 $co_-await$ 表达式所在的上下文。协程使用的 $promise$ 类型可以通过 $await_-transform$ 方法 ( 之后介绍 ),决定 $co_-await$ 的语义。
        为了更精确,我喜欢使用术语常规 Awaitable ( $Normally$ $Awaitable$ ) 来描述 $promise$ 类型未定义 $await_-transform$ 方法的协程,这种协程上下文支持 $co_-await$ 操作。使用术语语境化 Awaitable来描述 $promise$ 类型定义了 $await_-transform$ 方法的协程,这种类型仅在某些特别的上下文中才支持 $co_-await$ 运算符。
        一个Awaiter类型实现了 $co_-await$ 调用会时使用到的三个方法:$await_-ready$ ,$await_-suspend$ 和 $await_-resume$ 。
        注意我不要脸地从C# asnyc关键字中“借了” $Awaiter$ 这个术语。C#中这个术语指代可以通过 $GetAwaiter\left(\right)$ 方法返回类似于C++ $Awaiter$ 对象的类型,而C#的 $Awaiter$ 也与C++有着诡异的相似,具体可以看这篇文章
        注意一个类型既可以是 $Awaitable$ 类型,也可以是 $Awaiter$ 类型。
        当编译器看到 $co_-awaiter$<$expr$> 表达式时,根据类型的不同,可能会有多种行为。

获取 Awaiter

        编译器要做的第一件事就是生成被等待值获取 $Awaiter$ 对象的代码。$N4680$ 在5.3.8(3)小节中列出了一系列的获取 $awaiter$ 对象的步骤。
        假设等待中的协程的 $promise$ 类型是 $P$ ,并且 $promise$ 在当前协程中是一个左值引用。
        如果 $promise$ 类型 $P$ 存在 $await_-transform$ 成员,<$expr$> 会被传入 $promise$.$await_-transform$(<$expr$>) 调用来获取 $Awaitable$ 值,记为 $awaitable$ 。相反,如果 $promise$ 类型没有 $await_-transform$ 成员,我们会直接使用 <$expr$> 来作为 $Awaitable$ 对象,同样记为 $awaitable$ 。
        然后,如果 $Awaitable$ 对象 $awaitable$ 覆写了 $operator$ $co_-awaiter\left(\right)$ ,那么这个函数就会被调用,来获取 $Awaiter$ 对象。否则,$awaitable$ 会被直接作为 $awaiter$ 对象使用。
        如果我们把这些规则使用函数 $get_-awaitable\left(\right)$ 和 $get_-awaiter\left(\right)$ 来编码,看起来就是这样:

template <typename P, typename T>
decltype(auto) get_awaitable(P& promise, T&& expr)
{
  if constexpr (has_any_await_transform_member_v<P>)
    return promise.await_transform(static_cast<T&&>(expr));
  else
    return static_cast<T&&>(expr);
}

template <typename Awaitable>
decltype(auto) get_awaiter(Awaitable&& awaitable)
{
  if constexpr (has_member_operator_co_await_v<Awaitable>)
    return static_cast<Awaitable&&>(awaitable).operator co_await();
  else if constexpr (has_non_member_operator_co_await_v<Awaitable&&>)
    return operator co_await(static_cast<Awaitable&&>(awaitable));
  else
    return static_cast<Awaitable&&>(awaitable);
}

等待 Awaiter

        假设我们把逻辑重新封装,把 <$expr$> 转换成上面那种获取 $Awaiter$ 对象的函数,那么 $co_-await$<$expr$> 可以简单翻译成:

{
  auto&& value = <expr>;
  auto&& awaitable = get_awaitable(promise, static_cast<decltype(value)>(value));
  auto&& awaiter = get_awaiter(static_cast<decltype(awaitable)>)(awaitable);
  if (!awaiter.await_ready())
  {
    using handle_t = std::experimental::coroutine_handle<P>;

    using await_suspend_result_t =
      decltype(awaiter.await_suspend(handle_t::from_promise(promise)));
    
    <suspend-coroutine>

    if constexpr (std::is_void_v<await_suspend_result_t>)
    {
      awaiter.await_suspend(handle_t::from_promise(promise));
      <return-to-caller-or-resumer>
    }
    else
    {
      static_assert(
          std::is_same_v<await_suspend_result_t, bool>,
          "await_suspend() must reutrn 'void' or 'bool'.");

      if (awaiter.await_suspend(handle_t::from_promise(promise)))
      {
        <return-to-caller-or-resumer>
      }
    }

    <resume-point>
  }

  return awaiter.await_resume();
}

        void版本的 $await_-suspend\left(\right)$ 无条件地在调用返回后交还控制权,而bool版本的则允许 $awaiter$ 对象有条件地在返回后恢复协程执行而非交换控制权给调用方/恢复方。
        bool版本的 $await_-suspend\left(\right)$ 在 $awaiter$ 发起一个需要异步完成的操作时十分有用。在这种情况下,当任务异步完成后,$await_-suspend$ 可以返回false,表示协程应该立即恢复并继续执行。
        在 <$suspend$-$coroutine$> ( 挂起协程 ) 处,编译器会生成一些代码来保存协程状态,并准备好被恢复。这些状态包括保存 <$resume$-$point$>,以及将一些寄存器值保存到当前协程帧内存中。
        当前协程在 <$suspend$-$coroutine\left(\right)$> 操作完成后,就被认为已挂起。$await_-suspend$ 调用内部是第一个可以观测到被挂起协程的地方。一旦被挂起,就可以在之后被恢复或者销毁。
        一旦 $await_-suspend\left(\right)$ 操作完成,就要在之后对这个协程进行恢复 ( 或者销毁 )。注意 $await_-suspend\left(\right)$ 返回false相当于在当前线程中立即恢复协程。
        $await_-ready\left(\right)$ 方法可以让你避免 <$suspend$-$coroutine$> 带来的开销,因为它可以返回操作是否可以在不需要挂起的前提下同步完成。
        在 <$return$-$to$-$caller$-$or$-$resumer$> 点,控制权会返还给调用方或者恢复方,并弹出协程调用栈帧,保留协程栈帧。
        当 ( 或者 ) 执行到 <$resume$-$point$> ( 恢复点 ),挂起的协程将被恢复。例如,在通过 $await_-resume\left(\right)$ 方法获取结果之前。
        $await_-resume\left(\right)$ 方法的返回值会作为 $co_-await$ 表达式的结果。$await_-resume\left(\right)$ 方法也可以抛出一个异常,并将其传播到 $co_-await$ 表达式之外。
        注意如果一个异常传播到 $await_-suspend\left(\right)$ 调用之外,协程将在没有 $await_-resume\left(\right)$ 调用的情况下被自动恢复,并将异常传播到 $co_-await$ 表达式之外。

协程句柄

        你可能注意到了,$coroutine_-handle$<$P$> 类型会被作为参数,在 $co_-await$ 表达式中传给 $await_-suspend\left(\right)$ 调用。
        这个类型代表一个无主的协程帧句柄,可以用来恢复协程执行,或者销毁协程帧。它可以用来获取协程的 $promise$ 对象。
        $coroutine_-handle$ 类型有着如下 (省略了的) 接口:

namespace std::experimental
{
  template <typename Promise>
  struct coroutine_handle;

  template <>
  struct coroutine_handle<void>
  {
    bool done() const;

    void resume();
    void destroy();

    void* address() const;
    static coroutine_handle from_address(void* address);
  };

  template <typename Promise>
  struct coroutine_handle : coroutine_handle<void>
  {
    Promise& promise() const;

    static coroutine_handle from_promise(Promise& promise);

    static coroutine_handle from_address(void* address);
  };
}

        实现一个 $Awaitable$ 类型的时候,与 $coroutine_-handle$ 相关的关键方法是 $.resume\left(\right)$,它会在操作完成并且想要恢复等待中的协程执行的时候被调用。对 $coroutine_-handle$ 调用 $.resume\left(\right)$ 会在 <$resume$-$point$> 重新激活一个挂起的协程,在到达下一个 <$return$-$to$-$caller$-$or$-$resumer$> 点处返回。
        $.destroy\left(\right)$ 方法销毁协程帧,调用作用域内的变量析构器,释放协程帧使用的内存空间。一般情况下,你不需要主动 ( 事实上需要避免 ) 调用 $.destroy\left(\right)$,除非你是一个正在实现协程 $promise$ 类型的库开发者。通常,协程帧会被某些协程返回的RAII类型所持有。所以再调用 $.destroy\left(\right)$ 可能导致一个双重析构的bug。
        $.promise\left(\right)$ 方法返回一个协程 $promise$ 对象的引用。然而,就像 $.destroy\left(\right)$,一般只会在需要编写 $promise$ 类型时有用。你应该把 $promise$ 对象视为协程的一个内部细节实现。对于大部分常规 $Awaitable$ 类型,你应该使用 $coroutine_-handle$<$void$> 而不是 $coroutine_-handle$<$Promise$> 作为 $await_-suspend\left(\right)$ 方法的参数。
        $coroutine_-handle$<$P$>::$from_-promise$($P$& $promise$) 函数允许从协程 $promise$ 对象引用中重新构造协程。注意你必须保证类型 $P$ 与某个正在使用的协程帧匹配,如果 $P$ 的类型继承了 $promise$ 类型,并用于构造 $coroutine_-handle$<$Base$> ,会导致一些未定义行为。
        $.address\left(\right)$ / $.from_-address\left(\right)$ 函数允许把一个协程句柄转换为 $void*$ 指针,或者从一个 $void*$ 指针转换为协程句柄。这主要是用来作为context参数传递给现有的C风格API使用,所以你可能在一些实现 $Awaitable$ 类型的情况会用到。然而,大部分情况context参数都需要一些额外的信息用来回调,所以我通常会把协程句柄存储在结构中并通过把结构指针作为context传递,而不是直接使用 $.address\left(\right)$ 的返回值。

不需要同步的异步代码

        $co_-await$ 的一个十分有用的设计是协程可以在被挂起后,把控制权移交给调用方 / 恢复方前执行代码。
        这可以让 $Awaiter$ 对象在被挂起后发起一个异步操作,把被挂起协程的 $coroutine_-handle$ 传给该操作,并在不需要任何额外同步的前提下安全地恢复操作 ( 可能在其他线程上 )。
        例如,在 $await_-suspend\left(\right)$ 调用中,发起一个异步读操作,当协程被挂起,意味着我们可以在读操作完成后,恢复协程的执行。这一步不需要任何线程同步操作,不需要协调发起线程和执行线程的关系。

Time     Thread 1                           Thread 2
  |      --------                           --------
  |      ....                               Call OS - Wait for I/O event
  |      Call await_ready()                    |
  |      <supend-point>                        |
  |      Call await_suspend(handle)            |
  |        Store handle in operation           |
  V        Start AsyncFileRead ---+            V
                                  +----->   <AsyncFileRead Completion Event>
                                            Load coroutine_handle from operation
                                            Call handle.resume()
                                              <resume-point>
                                              Call to await_resume()
                                              execution continues....
           Call to AsyncFileRead returns
         Call to await_suspend() returns
         <return-to-caller/resumer>

        当使用这种方便的方式时,你需要小心的一点是,当把协程句柄发给其他线程时,其他线程可能在 $await_-suspend\left(\right)$ 调用返回前就尝试恢复协程执行,导致在 $await_-suspend\left(\right)$ 未完成时,协程继续执行了。
        当协程恢复后,做的第一件事是调用 $await_-resume\left(\right)$ 获取结果,然后通常立马析构 $Awaiter$ 对象 ( 例如,$await_-suspend\left(\right)$ 调用的this指针 )。然后协程可能执行直到完成,在 $await_-suspend\left(\right)$ 返回前销毁协程和 $promise$ 对象。
        所以在 $await_-suspend\left(\right)$ 方法,一旦协程有可能被另一个线程并行地恢复,你就需要避免访问this或者协程的 $.promise\left(\right)$ 对象,因为两者都可能被摧毁。总的来说,当挂起操作从发起到完成期间,只有局部变量是安全的。

与有栈协程相比

        我想换个话题,快速地与现有的其他有栈协程比较下无栈协程在挂起后执行逻辑的能力,比如Win32 fibers或者boost::context
        对于许多有栈协程来说,挂起和恢复操作被合并成了“上下文切换”操作。通过“上下文切换”操作,当前协程在挂起后没有机会去执行逻辑,只是单纯地将控制权移交给其他协程。
        这意味着,如果我们想要基于有栈协程实现一个类似于异步文件读取操作,我们需要在挂起协程之前发起操作。因此有可能操作会在线程被挂起之前就在其他线程上完成了,并等待恢复。这种潜在的其他线程完成操作和协程挂起之间的竞态,需要一些线程同步机制来选择并决定胜者。
        一种可选的方式是使用内嵌上下文 ( $trampoline$ $context$ ),它可以在初始上下文被挂起后,代表初始上下文来表示发起某个操作。然而,这可能需要额外的架构支持,和另外的上下文切换来保证正常使用,并且成本可能大于它试图避免的同步操作的成本。

减少内存分配

        异步操作经常需要给每个操作分配状态,用于跟踪操作执行阶段。这些状态需要在操作执行过程中保留,直到操作完成才会被释放。
        例如,调用异步Win32 I/O函数需要你分配并传递一个 $OVERLAPPED$ 结构指针。调用方需要保证指针在操作完成前都是有效的。
        典型的基于回调的API要求这些状态分配在堆上,并保证它们有着合适的生命周期。如果你正在执行很多操作,你可能需要给每个操作都分配和释放状态。如果这导致性能问题,你可能需要通过内存池来分配这些状态。
        然而,当使用协程时,利用局部变量在协程帧内的特性,我们可以避免堆分配存储,因为协程帧内的变量在协程被挂起时也是存活的。
        通过把 $co_-await$ 表达式中每个操作状态放到 $Awaiter$ 对象中,我们可以有效地从协程帧中“借用”内存。一旦操作完成,协程被恢复,$Awaiter$ 对象被摧毁,释放协程帧中其他局部变量使用的内存。
        最终,协程帧还是分配在堆上。然而,只需要一次堆分配,协程帧就可以被用来执行许多异步操作。
        你想一想,协程帧其实就是arena内存分配器的一种高级表现。编译器计算出所有局部变量需要的arena大小,然后零成本地分配所有局部变量!试着用传统的分配器来战胜它;)

例子:实现一个单线程同步原语

        现在我们已经讲了许多 $co_-await$ 运算符的机制,我想实现一个基本的 $awaitable$ 同步原语:一个异步手动重置 $event$,来把这些知识运用到实践上。
        这个 $event$ 的基本需求是对许多并行执行的协程来说是 $Awaitble$ 的,当协程对其等待时,会挂起协程,直到一个线程调用 $.set\left(\right)$ 方法,此时恢复所有挂起的协程。如果一个线程已经调用了 $.set\left(\right)$,协程应该在不挂起的前提下继续执行。
        理想情况下,我想要让方法noexcept,这需要避免堆分配,以及无锁实现。
        2017/11/23 编辑:增加 async_manual_reset_event 的用例
        用例看起来像是这样:

T value;
async_manual_reset_event event;

// A single call to produce a value
void producer()
{
  value = some_long_running_computation();

  // Publish the value by setting the event.
  event.set();
}


// Supports multiple concurrent consumers
task<> consumer()
{
  // Wait until the event is signalled by call to event.set()
  // in the producer() function
  co_await event;

  // Now it's safe to consume 'value'
  // This is guaranteed to 'happen after' assignment to 'value'
  std::cout << value << std::endl;
}

        让我们先想想这个 $event$ 可能的状态:$not$ $set$ 和 $set$ 。
        当处于 $not$ $set$ 状态,存在一个等待 $set$ 状态的等待协程列表 ( 可能为空 )。
        当设置 $set$ 状态,就不应该存在等待中的协程,因为正在 $co_-await$ 这个 $event$ 的协程可以不挂起地继续执行。
        这个状态可以通过一个 $std$::$atomic$<$void*$> 表示:

  • 保存一个特殊指针,指向 $set$ 状态。我们使用 $event$ 的this指针来表示,因为它不可能跟列表中其他对象的地址相同。
  • 否则,$event$ 处于 $not$ $set$ 状态,指针值是一个等待协程的单向链表头。

        我们可以通过把状态存储在协程帧的 $awaiter$ 对象的方式,避免在堆上对节点进行额外分配。
        让我们实现一个类似如下的类接口:

class async_manual_reset_event
{
public:
  async_manual_reset_event(bool initiallySet = false) noexcept;

  // No copying/moving
  async_manual_reset_event(const asnyc_manual_reset_event&) = delete;
  async_manual_reset_event(async_manual_reset_event&&) = delete;
  async_manual_reset_event& operator=(const asnyc_manual_reset_event&) = delete;
  async_manual_reset_event& operator=(async_manual_reset_event&&) = delete;

  bool is_set() const noexcept;

  struct awaiter;
  awaiter operator co_await() const noexcept;

  void set() noexcept;
  void reset() noexcept;

private:

  friend struct awaiter;

  // - 'this' => set state
  // - otherwise => not set, head of linked list of awaiter*.
  mutable std::atomic<void*> m_state;
};

        这里我们给出了一个相当简单直接的接口。主要要注意的点是 $operator$ $co_-await\left(\right)$ 返回一个未定义的 $awaiter$ 类型。
        让我们先实现 $awaiter$ 。

实现 Awaiter

        首先,需要知道正在等待哪个 $async_-manual_-reset_-event$ ,所以需要一个 $event$ 引用和一个初始化它的构造器。
        它同样需要作为 $awaiter$ 链表的节点,所以也需要保存下一个 $awaiter$ 对象的指针。
        它还需要保存正在执行 $co_-await$ 的等待中协程的 $coroutine_-handle$ ,用来在 $event$ 被 $set$ 之后恢复协程。我们不关心协程的 $promise$ 类型,所以只需要使用 $coroutine_-handle$<> ( $coroutine_-handle$<$void$> 的缩写 )。
        最后,它需要实现 $Awaiter$ 接口,所以需要三个特殊方法 $await_-ready$ ,$await_-suspend$ 和 $await_-resume$ 。我们不需要 $co_-await$ 返回值,所以 $await_-resume$ 可以返回 $void$ 。
        一旦我们把所有东西放到一起,基本的 $awaiter$ 接口看起来就像这样:

struct async_manual_reset_event::awaiter
{
  awaiter(const async_manual_reset_event& event) noexcept
  : m_event(event)
  {}

  bool await_ready() const noexcept;
  bool await_suspend(std::experimental::coroutine_handle<> awaitinigCoroutine) noexcept;
  void await_resume() noexcept {};

private:

  friend class async_manual_reset_event;

  const async_manual_reset_event& m_event;
  std::experimental::coroutine_handle<> m_awaitingCoroutine;
  awaiter* m_next;
};

        当 $co_-await$ 一个 $event$ 时,我们不想让等待中的协程在 $event$ 被 $set$ 的时候挂起。所以我们让 $await_-ready\left(\right)$ 在 $event$ 已经 $set$ 的时候返回true

bool async_manual_reset_event::awaiter::await_ready() const noexcept
{
  return m_event.is_set();
}

        接下来,让我们看看 $await_-suspend\left(\right)$ 方法,许多 $awaitable$ 类型的有趣行为都发生在这里。
        首先它需要在 $m_-awaitingCoroutine$ 成员中保存协程句柄,用于后续调用 $.resume\left(\right)$ 。
        保存句柄之后,需要把 $awaiter$ 自动地加入链表中。成功入队之后,如果 $event$ 还没有被设置成 $set$ 状态,返回true表示我们不希望立即恢复协程,否则返回false,表示协程应该立马恢复。

bool async_manual_reset_event::awaiter:await_suspend(
  std::experimental::coroutine_handle<> awaitingCoroutine) noexcept
{
  // Special m_state value that indicates the event is in the 'set' state.
  const void* const setState = &m_event;

  // Remember the handle of the awaiting coroutine.
  m_awaitingCoroutine = awaitingCoroutine;

  // Try to atomically push this awaiter onto the front of the list.
  void* oldValue = m_event.m_state.load(std::memory_order_acquire);
  do
  {
    // Resume immediately if already in 'set' state.
    if (oldValue == setState) return false;

    // Upload linked list to point at current head.
    m_next = static_cast<awaiter*>(oldValue);

    // Finally, try to swap the old list head, inserting this awaiter
    // as the new list head.
  } while (!m_event.m_state.compare_exchange_weak(
            oldValue,
            this,
            std::memory_order_release,
            std::memory_order_acquire));
  
  // Successfully enqueued. Remain suspend.
  return true;
}

        注意我们使用 $acquire$ 内存序读取旧状态,这样我们就可以看到所有发生在 $set\left(\right)$ 调用之前的写操作了。
        我们需要用 $release$ 内存序调用 $compare$-$exchange$ ,这样之后的 $set\left(\right)$ 就可以看到我们写入的 $m_-awaitingCoroutine$ 和更早写入的协程状态。

完成 event 类的剩余实现

        我们已经定义好了 $awaiter$ 类型,再看回来 $async_-manual_-reset_-event$ 的方法。
        首先是构造函数,它需要使用空的链表 ( 比如nullptr) 初始化为 $not$ $set$ 状态,或者初始化为 $set$ 状态 ( 比如this )。

async_manual_reset_event::async_manual_reset_event(
  bool initiallySet) noexcept
: m_state(initiallySet ? this : nullptr)
{}

        接下来,$is_-set\left(\right)$ 方法更直接,如果持有this指针,那么就是 $set$ 状态。

bool async_manual_reset_event::is_set() const noexcept
{
  return m_state.load(std::memory_order_acquire) == this;
}

        接着是 $reset\left(\right)$ 方法,如果当前是 $set$ 状态,我们需要重置为空链表的 $not$ $set$ 状态,否则不做任何事:

void async_manual_reset_event::reset() noexcept
{
  void* oldValue = this;
  m_state.compare_exchange_strong(oldValue, nullptr, std::memory_order_acquire);
}

        通过 $set\left(\right)$ 方法,我们可以用特殊的this指针与当前值交换,从而设置状态为 $set$。如果存在挂起的协程,我们需要在返回前依次恢复。

void async_manual_reset_event::set() noexcept
{
  // Needs to be 'release' so that subsequent 'co_await' has
  // visibility of our prior writes.
  // Needs to be 'acquire' so that we have visibility of prior
  // writes by awaiting coroutines.
  void* oldValue = m_state.exchange(this, std::memory_order_acq_rel);
  if (oldValue != this)
  {
    // Wasn't already in 'set' state.
    // Treat old value as head of a linked-list of waiters
    // which we have now acquired and need to resume.
    auto* waiters = static_cast<awaiter*>(oldValue);
    while (waiters != nullptr)
    {
      // Read m_next before resuming the coroutines as resuming
      // the coroutine will likely destroy the awaiter object.
      auto* next = waiters->m_next;
      waiters->m_awaitingCoroutine.resume();
      waiters = next;
    }
  }
}

        最后我们需要实现 $operator$ $co_-await\left(\right)$ 方法,只需要构造一个 $awaiter$ 对象。

async_manual_reset_event::awaiter
async_manual_reset_event::operator co_await() const noexcept
{
  return awaiter{ *this };
}

        这样我们就完成了一个无锁,无额外内存分配,noexcept实现的 $awaitable$ 的异步手动-重置 $event$ 。
        如果你想要试试这段代码,看看它在MSVCClang下的编译结果,可以看一下godbolt上的源码
        你也可以在cppcoro库上看到这段实现,以及更多的其他的有用的 $awaitable$ 类型,例如 $async_-mutex$ 和 $async_-auto_-reset_-event$ 。

写在结尾

        这篇文章介绍了 $operator$ $co_-await$ 是怎么通过 $Awaitable$ 和 $Awaiter$ 这两个concept实现的。
        同样也讲了怎么实现一个 $awaitable$ 的异步线程同步原语,使用了 $awaiter$ 对象在协程帧上分配的优点,避免了额外的堆分配。
        我希望这篇文章可以帮助你理解新的 $co_-await$ 运算符。
        在下一篇文章中,我将介绍 $Promise$ concept,以及一个协程类型的开发者可以怎样设计协程的行为。

致谢

        这段就不翻了吧~