回到顶部 暗色模式

EffectiveModernCpp(6):并发

        C++11的最伟大标志之一是将并发整合到语言和库中,允许开发者通过标准库写出跨平台的多线程程序。

1. std::async

        如果开发者想要异步执行 $doAsyncWork$ 函数,有两种方式,一种是创建 $std::thread$ :

int doAsyncWork();
std::thread t(doAsyncWork);

        另一种是创建 $std::async$ :

auto fut = std::async(doAsyncWork);

        通常我们会选择第二种方法,一是因为它代码量更少,二是可以获取返回值。更进一步,如果 $doAsyncWork$ 发生异常,我们还可以通过 $get$ 获取异常。
        基于线程与基于任务的最根本区别在于抽象层次的高低。基于任务的方式将开发者从线程管理中解放出来。C++的线程有三种含义:

        软件线程是有限的,如果你创建的软件线程的数量大于计算机最大能提供的数量,会抛出 $std::system_-error$ 错误。即使线程调用的函数是 $noexcept$ 的,这个错误依然会发生。
        设计良好的程序必须避免这种错误,一种有效的方法是在当前线程执行 $doAsyncWork$ ,但是这可能会导致负载不均衡,尤其是当前线程是GUI线程的时候,程序可能会陷入长时间未响应的状态;另一种办法是等待当前线程结束后再新创建线程,但是如果当前线程在等待 $doAsyncWork$ 的结果,这时程序就会陷入死锁。
        即使我们没有超出最大线程数,可用资源也会约束我们的程序。如果当前软件线程数大于硬件线程数,就会发生上下文切换。线程的上下文切换会带来开销,如果这时软件线程运行的CPU核心与之前的硬件线程所在的核心不同,这个开销会更高,因为(a)CPU缓存需要重新载入;(b)新线程缓存会覆盖老线程缓存,导致老线程再次在当前核心运行时又要重新载入缓存。
        避免这种资源开销的问题是很难的,因为软件线程与硬件线程的最佳比例取决于软件线程的执行效率,一个很明显的例子就是计算密集型程序和I/O密集型程序的执行效率是不同的。这个比例还依赖于上下文切换的开销和CPU缓存的使用效率。而且,对某种类型硬件和平台进行优化并不意味着换种硬件或者平台依然还有着这种效率。
        相比于直接使用 $std::thread$ ,$std::async$ 可以把调整最优线程数量的工作交给标准库实现,而且也可以减少资源超额的可能。$std::async$ 并不保证开启一个新线程,只是保证会执行该函数。我们也可以选择性地通过调用程序让它在当前线程执行。如果在GUI程序中使用 $std::async$ 出现了相应变慢的问题,我们还可以通过 $std::launch::async$ 来指定调度策略。
        最新的线程调度算法会使用线程池来避免资源问题,并且通过工作窃取算法提升跨核心的负载均衡。C++标准库虽然没有要求使用线程池或者工作窃取算法,但是库开发者们在标准库中使用了这些技术。使用基于任务的开发模式,处理资源和负载均衡的问题就交给了库开发者,而如果使用基于线程的开发模式,这些工作就落在了我们头上,更不用说还要考虑跨平台问题了。
        当然,这不意味着 $std::async$ 总是比 $std::thread$ 好。在一些场景中,$std::thread$ 是更好的选择:

2. 启动策略

        当调用 $std::async$ 时,你通常希望该函数被异步执行,但它不一定会这样做。$std::async$ 有两种策略,通过 $std::launch$ 域的枚举表示:

        但是 $std::async$ 的默认策略不是上面中的任何一个,而是它们的或,这意味着它可以异步也可以同步,当然,这也会使得它们的行为变得不可预测。默认启动策略也导致使用线程本地变量比较麻烦,因为无法知道哪个线程的本地变量会被访问。默认启动策略还会影响到基于超时机制的 $wait$ 循环:

using namespace std::literals;

void f() { std::this_thread::sleep_for(1s); }

auto fut = std::async(f);
while(fut.wait_for(100ms) != std::future_status::ready) {
  // ...
}

        如果 $f$ 异步执行,那么这个循环没有问题;但如果 $f$ 同步执行,$wait_-for$ 将总是返回 $std::futre_-status::deferred$ ,导致循环永远不会停止。解决办法也很简单,只要检查 $f$ 是否延迟执行即可。很不幸的是,我们并没有直接办法得知 $f$ 是否延迟执行,只能通过超时函数获取:

auto fut = std::async(f);
if (fut.wait_for(0s) == std::future_status::deferred) {
  // ...
} else {
  while (fut.wait_for(100ms) != std::future_status::ready) {
    // ...
  }
}

        总结起来,只有在以下条件满足时,我们才会使用默认策略:

        如果无法满足以上条件,我们建议使用 $async$ 策略。实际上,我们可以自己编写一个使用 $async$ 作为默认策略的函数:

template<typename F, typename ...Ts>
inline std::future<typename std::result_of<F(Ts...)>::type>
reallyAsync(F &&f, Ts &&...params) {
  return std::async(
    std::launch::async,
    std::forward<F>(f),
    std::forward<Ts>(params)...);
}

        这个函数接收一个可调用对象以及对象参数,然后完美转发给 $std::async$ ,并选择 $async$ 策略。
        在C++14中,我们可以这样实现:

template<typename F, typename ...Ts>
inline auto reallyAsync(F &&f, Ts &&...params) {
  return std::async(
    std::launch::async,
    std::forward<T>(f),
    std::forward<Ts>(params)...);
}

3. join

        $std::thread$ 有两个状态——可连接和不可连接。可连接状态的 $thread$ 绑定一个底层正在运行的异步线程,或者可能运行的线程,比如一个等待被调度或者阻塞的 $thread$ 。绑定到一个已经运行完成的底层线程的 $thread$ 也是可连接的。
        不可连接的 $thread$ 包括:

        如果一个线程是可连接的,那么当它的析构函数被调用时,程序也将中止。

constexpr auto tenMillion = 10'000'000;

bool doWork(std::function<bool (int)> filter,
            int maxVal = tenMillion) {
  std::vector<int> goodVals;
  std::thread t([&filter, maxVal, &goodVals] {
    for (auto i = 0; i <= maxVal; ++i)
      if (filter(i)) goodVals.push_back(i);
  });
  auto nh = t.native_handle();  // set thread's priority
  // ...
  if (conditionsAreStatisfied()) {
    t.join();
    performComputation(goodVals);
    return true;
  }

  return false;
}

        这里如果 $conditionAreStatisfied$ 返回 $true$ ,那么程序会正常结束。但是如果返回的是 $false$ ,或者抛出异常,那么 $t$ 会在 $doWork$ 结束时被析构,从而程序中止。
        你或许会疑惑为什么要这样设计,因为另外两种方式更糟:

        销毁可连接的线程会带来可怕的后果,所以我们也应该确保如果 $thread$ 会离开当前作用域,就把它们设置为不可连接的。但是覆盖每一个分支是很难的。一种通用的解决方案就是将这些操作放在对象析构函数中,这种对象也被称为 $RAII$ ( $Resource$ $Acquisition$ $Is$ $Initialization$ ) 对象。标准库没有 $RAII$ 的 $std::thread$ 类,可能是标准委员会拒绝将 $join$ 和 $detach$ 作为默认选项,不知道如何去实现这个类。

class ThreadRAII {
public:
  enum class DtorAction { join, detach };
  ThreadRAII(std::thread &t, DtorAction a)
    : action(a), t(std::move(t)) {}
  ThreadRAII(ThreadRAII &&) = default;
  ThreadRAII &operator=(ThreadRAII &&) = default;
  ~ThreadRAII() {
    if (t.joinable()) {
      if (action == DtorAction::join) {
        t.join();
      } else {
        t.detach();
      }
    }
  }
  std::thread &get() { return t; }

private:
  DtorAction action;
  std::thread t;
};

        $std::thread$ 无法复制,所以我们只能通过移动构造。提供 $get$ 函数可以避免 $ThreadRAII$ 复制整个 $std::thread$ 接口。因为在一个不可连接的 $thread$ 上调用 $join$ 或 $detach$ 是未定义行为,所以我们需要先判断它的连接性。$ThreadRAII$ 存在析构函数,所以编译器不会自动生成移动函数,但很明显它是可以移动的,所以我们需要显式声明。
        有了 $ThreadRAII$ ,我们可以修改之前的代码:

bool doWork(std::function<bool (int)> filter,
            int maxVal = tenMillion) {
  std::vector<int> goodVals;
  ThreadRAII t(
    std::thread([&filter, maxVal, &goodVals] {
      for (auto i = 0; i <= maxVal; ++i)
        if (filter(i)) goodVals.push_back(i);
    }),
    ThreadRAII::DtorAction::join
  );
  auto nh = t.get().native_handle();
  // ...
  if (conditionAreStatisfied()) {
    t.get().join();
    performComputation(goodVals);
    return true;
  }

  return false;
}

        虽然 $join$ 也可能导致性能异常,但是相比于未定义行为和程序中止,这个选择还是更好一些。

4. 线程析构

        可连接的 $std::thread$ 会绑定底层线程,异步的 $std::future$ 也与底层线程有相似的关系,从而它们都可以被视为底层线程的句柄。
        当然,它们的行为有很大不同。$thread$ 的析构可能会导致程序中止,但是 $future$ 的析构就像隐式 $join$ 或者隐式 $detach$ ,不会导致程序中止。
        $future$ 实际上是通信信道的一端,被调用者将数据写入信道 ( 通过 $std::promise$ ),调用者使用 $future$ 读取结果。问题来了,这些数据会存放在哪?肯定不能作为局部变量,因为会在调用结束后被销毁;同样也不能存储在 $future$ 内,因为它可能会被用于创建 $std::shared_-future$ ,这会导致数据被拷贝,但不是所有数据都能被拷贝。
        因为与被调用者关联的对象和与调用者关联的对象都不适合存储,所以这些数据必须存在其他位置,而且这个位置还必须能被被调用者和调用者访问。这个位置称为共享状态 ( $shared$ $state$ ),通常是基于堆的对象。标准并未共享状态的类型、接口和实现,所以具体实现依赖于标准库作者。$future$ 的析构行为依赖于共享状态:

        在大多数情况下,$future$ 析构函数会直接销毁 $future$ ,不会进行 $join$ 或 $detach$ ,而是仅仅销毁 $future$ 的数据成员。只有在满足以下条件时才会出现例外:

        只有同时满足上面三个条件,$future$ 的析构函数才会在异步任务执行完成前阻塞。
        由于没有API来判断 $future$ 关联的共享状态是否是由于 $std::async$ 的调用创建的,所以我们也无法判断这个 $future$ 是否会阻塞。因为只有通过 $std::async$ 创建的共享状态才会有这个问题,所以我们可以通过用其他方式创建共享状态来避免这个问题。

int calcValue();

std::package_task<int ()> pt(calcValue);
auto fut = pt.get_future();

        上面通过 $std::package_-task$ 创建的 $future$ 就不会有这样的问题。$std::package_-task$ 对象可以通过 $thread$ 执行,也可以通过 $std::async$ ,当然这样就违背了我们使用它的初衷了。还要注意 $std::package_-task$ 不可拷贝,只能移动:

std::thread t(std::move(pt));

5. 一次性通知

        有时候,我们需要通过一个任务通知另一个异步执行的任务。一个很明显的解决方案就是通过条件变量,我们将修改条件的任务称为检测任务,对条件做出反应的任务称为反应任务,反应任务等待一个条件变量,检测任务在适时改变条件变量。

std::condition_variable cv;
cv.notify_one();

        如果有多个反应任务,可以通过 $notify_-all$ 通知。由于多个线程访问同一个变量,所以我们需要一些同步手段:

std::condition_variable cv;
std::mutex m;
std::unique_lock<std::mutex> lk(m);
cv.wait(lk);

        条件变量的一个问题是它必须使用互斥锁,即使在不需要互斥锁的情况。此外,有两个情况要注意:

cv.wait(lk, [] { return /* if the event has occurred */ });

        另一个解决方案是通过原子类:

std::atomic<bool> flag(false);
// ...
flag = true;

        反应线程轮询该标志:

while (!flag) {
  // ...
}

        这种方案的缺点是存在轮询的无意义开销,在轮询的时候线程无法做其他事,只能一次次地检测条件变量。所以,相比之下还是更建议使用条件变量。

std::condition_variable cv;
std::mutex m;
bool flag(false);
// ...
{
  std::lock_guard<std::mutex> g(m);
  flag = true;
}
cv.notify_one();

{
  std::unique_lock<std::mutex> lk(m);
  cv.wait(lk, [] { return flag; });
}
// ...

        这个代码解决了我们之前的问题,但还是有点古怪,因为反应线程和检测线程通过同一个变量通信。一个可选的替代方案是通过 $future::wait$ 来避免使用条件变量。检测任务使用 $std::promise$ ,反应任务则使用 $std::future$ 或者 $std::shared_-future$ 。

std::promise<void> p;
// ...
p.set_value();

p.get_future().wait();

        这种方式不需要条件变量和互斥锁,而且反应任务也会被阻塞。看上去很完美,但是又有新问题。因为共享状态一般是在堆上存储的,这会涉及动态内存的分配和释放;而且,$std::promise$ 只能设置一次,不能重复使用。
        一次性通信可以用在许多地方。假设你只是想要挂起一个线程,从而避免下一次需要时再次创建,或者是想在线程运行前进行设置,都可以通过 $future$ 实现:

std::promise<void> p;
void react();
void detect();

void detect() {
  std::thread t([] {
    p.get_future().wait();
    react();
  });
  // ...
  p.set_value();
  // ...
  t.join();
}

        这里我们只创建了一个 $thread$ ,如果需要多个 $thread$ ,可以使用 $shared_-future$ 。

std::promise<void> p;
void react();
void detect();

void detect() {
  auto sf = p.get_future().share();
  std::vector<std::thread> vt;
  for (int i = 0; i < threadsToRun; i++) {
    vt.emplace_back([sf] {
      sf.wait();
      react();
    });
  }
  // ...
  p.set_value();
  // ...
  for (auto &t : vt)
    t.join();
}

6. std::atomicvolatile

        与其他编程语言不同,C++中的 $volatile$ 并没有关于并发的能力。开发者有时候会对 $volatile$ 和 $std::atomic$ 感到困惑。$std::atomic$ 会给某个类型提供原子性,它的行为类似于内部使用互斥锁保护一段临界区,但是是通过特殊的更有效率的机器指令实现的。

std::atomic<int> ai(0);
ai = 10;
std::cout << ai;
++ai;
--ai;

        如果没有其他线程访问 $ai$ ,那么 $ai$ 的值只可能是 $0$ 、$10$ 和 $11$ 之间的一个。要注意 $std::cout$ 对 $ai$ 的访问只保证了它读取时是原子的,而在之后调用 $operator$<< 的时候并不是原子的。对于 $ai$ 的递增和递减操作,它们都是 $read-modifiy-write$ ( $RMW$ ) 操作,$atomic$ 保证它们是原子执行的。

volatile int vi(0);
vi = 10;
std::cout << vi;
++vi;
--vi;

        $volatile$ 在多线程中不会保证任何事。在这段代码中,$vi$ 的值可能是任意一个,因为可能存在数据竞争。
        $RMW$ 操作不是仅有的在 $atomic$ 有效而 $volatile$ 无效的例子。

std::atomic<bool> valVailable(false);
auto imptValue = computeImportantValue();
valAvailable = true;

        就代码编写角度,我们能看到 $valAvailable$ 是在 $imptValue$ 之后赋值的,并且这个顺序不应该改变。但是对于编译器来说不是,编译器不知道它们的依赖关系,因此可能会重排这些指令。即使编译器没有重排,底层硬件也可能进行重排。$atomic$ 会限制这种重排,体现在源代码中,就是在对 $atomic$ 变量完成写入之前不会执行其他指令。这种禁止指令重排序的功能也是 $volatile$ 所不具有的。
        一般内存会保证一个值不变,直到被修改。如果一个值被多次修改,编译器也可能进行优化:

int x;
auto y = x;
y = x;

        编译器可能会略去一次对 $y$ 的赋值,因为它很明显是重复的。同样的:

x = 10;
x = 20;

        编译器可能直接略去第一个赋值语句,因为它也是不必要的。
        这些代码看上去很蠢,但是对于编译器来说很常见,因为模版实例化、内联函数以及指令重排序会导致很多这样的代码出现,编译器需要负责对这些代码进行优化。
        还要注意的是,我们上面的例子是在一般内存的前提下。有些特殊的内存,比如用于内存映射I/O的内存,实际上是与外围设备 ( 比如传感器、显示器、打印机或者网络端口等 ) 通信,而不是读写RAM。这种情况下:

auto y = x;
y = x;

        这种代码就不能进行优化了,因为 $x$ 的值可能是某个传感器的上报值,第一次读取和第二次读取的结果可能是不同的。类似的,对 $x$ 的写可能是写入多条指令,也不能进行优化。
        面对这种问题,$volatile$ 的价值就被体现了。声明为 $volatile$ 的变量所在的内存不会被编译器优化。这是 $std::atomic$ 无法做到的,而且我们也不能对 $atomic$ 对象进行以下操作:

std::atomic<int> x;
auto y = x;

        因为 $atomic$ 没有拷贝函数。之所以没有,是因为拷贝涉及到读取和写入,而硬件通常无法在一次原子操作内同时进行读取和写入。如果要从 $x$ 进行构造,可以使用 $load$ 和 $store$ :

std::atomic<int> y(x.load());
y.store(x.load());

        当然,这种操作不是原子的,而是分为两次原子操作进行。这里 $x$ 读出了两次,编译器可能通过寄存器优化:

register = x.load();
std::atomic<int> y(register);
y.store(register);

        当然,对于特殊内存来说,肯定不能进行这种优化。
        总的来说,$std::atomic$ 应用于并发场景,$volatile$ 则应用于特殊内存场景。它们也可以结合使用:

volatile std::atomic<int> vai;

        一些开发者喜欢使用 $load$ 和 $store$ 而不是 $=$ ,因为这可以强调它们是原子变量。这也有一些道理,因为对原子变量的访问确实会慢些,不过更多的还是习惯问题。

EffectiveModernCpp(6):并发