C++并发编程实战笔记(一)线程概念与基本控制

 
Category: C_C++

写在前面

在C++ 中实现多线程还是很容易的, 不像C的pthreads接口, 下面来总结一下C++多线程的一些基本操作, 包括线程的创建, 合并, 分离, 获取ID等操作, 主要参考了C++并发编程实战(第二版)的第一二章, 这本书应该是C++并发必看的经典了.

另外参考:

std::thread;

一些有用的程序

用于辅助

#include <iostream>
#include <cassert> // 断言
#include <chrono>  // 计时
#include <thread>  // 线程

using namespace std;
using namespace std::chrono;   // 计时
using namespace std::literals; // 秒数字面量, C++14

睡眠

测试多线程, 不加睡眠系统实在是太容易假死了.

this_thread::sleep_for(1s); // 睡眠1s

计时

auto start = std::chrono::system_clock::now();
// 待计时的程序
auto end = std::chrono::system_clock::now();
auto duration = duration_cast<std::chrono::microseconds>(end - start);
cout << "Time spent: "
     << double(duration.count()) * std::chrono::microseconds::period::num /
            std::chrono::microseconds::period::den
     << "s" << endl;

线程基础

头文件thread

查看硬件支持

我的是8核CPU.

#include <iostream>
#include <thread>

int main(int argc, char const *argv[]) {
    // static method
    std::cout << std::thread::hardware_concurrency(); // 8
    return 0;
}

如果是1核, 那就只能实现并发而不能实现并行了.

创建与合并(join)

构造函数: 直接传入函数名(函数指针), 以及对应的参数(如果有), 需要注意线程的join(), 否则主线程不会等待子线程结束.

void fun() { cout << "Hello t1!\n"; }

void t1() {
    thread t1(&fun); // 传入函数指针
    if (t1.joinable()) cout << "t1 is joinable\n", t1.join();
    // 令主线程等待子线程
    // t1 is joinable
    // Hello t1!
}

其他创建方法:

  1. 传入函数对象: 临时对象, 即右值
  2. 传入函数对象: 具名对象, 即左值
  3. 传入lambda表达式

void t2() {
    thread t2([] { cout << "Hello t2!\n"; }); // lambda 表达式
    t2.join();                                // Hello t2!
}

struct Foo {
    void operator()() const { cout << "Hello t3!\n"; }
};

void t3() { // 传入临时函数对象
    // 二义性(烦人的分析机制), 参见Effective STL,
    // 即`只要C++语句有可能被解释成函数声明, 编译器就肯定将其解释为函数声明`
    // thread t3((foo())); // 由于存在函数指针二义性, 这里必须用圆括号包裹
    thread t3{Foo()}; // 同理, 这里用一致性初始化{}, 推荐这种方法
    t3.join();        // Hello t3!
}

struct Foo1 {
    void operator()() const { cout << "Hello t4!\n"; }
};

void t4() {
    Foo1 f;
    thread t4(f);
    t4.join(); // Hello t4!
}

void t5() {
    auto t5 = thread([] { cout << "Hello t5!\n"; });
    t5.join(); // Hello t5!
}

事实上使用join()方法等待线程是一刀切式的, 即要么不等待, 要么一直等待, 之后会采用期值(future)或者条件变量(condition_variable)来做.

并且线程只能被join一次.

int main() {
    //
    thread t1([] { cout << "AA\n"; });
    t1.join();                     //"AA"
    cout << t1.joinable() << endl; // 0
    t1.join(); // libc++abi: terminating with uncaught exception of type
               // std::__1::system_error: thread::join failed: Invalid argument
}

线程分离: detach

分离的线程不受主线程(即main函数)的管理, 而是由C++runtime库管理(成为daemon守护/后台进程).

但是分离线程之后就无法等待线程结束了

void t1() {
    thread t([] {
        cout << "detached thread\n";
        this_thread::sleep_for(1s);
    });
    t.detach();
    assert(!t.joinable());
    this_thread::sleep_for(1s);
    cout << "Main thread\n";
}


int main(int argc, char const* argv[]) {
    auto start = system_clock::now();
    t1();
    auto end = system_clock::now();
    auto duration = duration_cast<microseconds>(end - start);
    cout << "Time spent: "
         << double(duration.count()) * microseconds::period::num /
                microseconds::period::den
         << "s" << endl;
    // detached thread
    // Main thread
    // Time spent: 1.00508s
    return 0;
}

可见主线程和分离的线程(几乎)同时结束. 耗时1s.

上面代码中, 如果用join而不是detach, 那么用时就是2s, 大家可以测试一下.

获取id

两种获取方法:

  1. 直接对thread对象调用成员函数.get_id();
  2. 通过在对应线程中(即传入线程的函数中)调用this_thread::get_id().
int main() {
    cout << "null thread id: " << thread().get_id() << endl;
    cout << "null thread id: " << thread::id() << endl; // static func
    thread t1([] {
        cout << "Hello t1!\n";
        cout << "t1 thread id(use this_thread::get_id): "
             << this_thread::get_id() << endl;
    });
    cout << "main thread id: " << this_thread::get_id() << endl;
    cout << "t1 id(use t1.get_id): " << t1.get_id() << endl;
    t1.join();
    // null thread id: 0x0
    // null thread id: 0x0
    // main thread id: 0x1046d0580
    // t1 id(use t1.get_id): 0x16bc43000
    // Hello t1!
    // t1 thread id(use this_thread::get_id): 0x16bc43000
}

线程实战

参数传递的小问题

case 1: 常量引用

线程具有内部存储空间, 参数会按照默认方式先复制到该处, 新创建的线程才能直接访问它们.

然后, 这些副本被当成临时变量, 以右值形式传给新线程上的函数或者可调用对象.

即便函数的相关参数是引用, 上述过程依然会发生.

void oops() {
    //
    auto f = [](int i, string const& s) { cout << i << s << endl; };
    char buf[1024]; // 局部变量(自动变量 )
    snprintf(buf, 10, "%i", 100);
    // thread t(f, 3, buf); // buf 可能已销毁
    // 直接传入 buf 可能会出现安全性问题, 原因是参数传递本意是将 buf 隐式转换为
    // String, 再将其作为函数参数, 但转换不一定能及时开始(由于 thread
    // 的工作机制, 其构造函数需要原样复制所有传入的参数)
    thread t(f, 3, string(buf)); // 这样可以解决, 直接在传入之前进行构造
    t.detach();
}

自动变量: 代码块内声明或者定义的局部变量, 位于程序的栈区.

case 2: 非常量引用

// 传入一个非常量引用
class Widget {};
void oops_again() {
    auto f = [](int id, Widget& w) {};
    Widget w1;
    // 此时传入的 w1 是右值形式, move-only 型别, 因为非常量引用不能向其传递右值
    // thread t(f, 10, w1);
    thread t(f, 10, std::ref(w1));
    t.join();
}

针对非常量引用, 由于这种形参不能接受右值变量, 所以一定要加上std::ref修饰(配接器)

case 3: 成员函数

class X {
public:
    void do_something() { cout << "do_something\n"; }
};
void t2() {
    X my_x;
    // 向某个类的成员函数设定为线程函数, 需要传入函数指针, 指向该成员函数
    thread t(&X::do_something, &my_x);
    // 若考虑到对象指针, 成员函数的第一个形参实际上是其第二个实参
    // 向线程函数 传入的第三个参数就是成员函数的第一个参数
}

针对成员函数的参数传递, 需要考虑形参的顺序(将成员的地址作为成员函数的第一个参数, 然后才传入成员函数的参数)

case 4: 智能指针的控制权转移

void process(unique_ptr<X>){} // X 定义在 case 3
void t3(){
    unique_ptr<X> p(new X);
    p->do_something();
    thread t(process, std::move(p));// 通过 move 移交智能指针所指对象的控制权
}

通过 std::move() 移交控制权

移动语义支持

通过移动语义, thread可以实现控制权移交.

void f() { cout << "f()\n"; }
void g() { cout << "g()\n"; }

void test1() {
    thread t1(f);         // t1:f
    t1.join();
    thread t2 = move(t1); // t2:f
    t1 = thread(g);       // t1:g
    t1.join();
    thread t3;
    t3 = move(t2); // t1:g t2:∅ t3:f
    // 运行f的线程归属权转移到t1, 该线程最初由t1启动, 但是在转移时,
    // t1已经关联到g的线程, 因此terminate()会被调用, 终止程序.
    t1 = move(t3); // 终止整个程序
    // f()
    // g()
}

void f3(thread t) {}
void g3() {
    // 线程归属权可以转移到函数内部, 函数能够接收thread实例作为按右值传递的参数.
    f3(thread(f));
    thread t(f);
    f3(std::move(t));
}

std::move() 仅仅将左值强制类型转换为右值, 但是不进行其他操作, 真正移交控制权的时刻是 t2 的move构造调用时(初始化)

并行版的accumulate

template <typename Iterator, typename T>
struct accmuluate_block {
    void operator()(Iterator first, Iterator last, T& result) {
        result = accumulate(first, last, result);
    }
};

template <typename Iterator, typename T>
T parallel_accumulate(Iterator first, Iterator last, T init) {
    // 设置常量
    unsigned long const length = distance(first, last);
    if (!length) return init; // 如果计算区间为空, 返回初值
    unsigned long const min_per_thread = 25; // 每一个线程计算的数量
    unsigned long const max_threads =        // 最大线程数
        (length + min_per_thread - 1) / min_per_thread;
    unsigned long const hardeare_threads = thread::hardware_concurrency(); // 8
    unsigned long const num_threads = // 实际线程数
        min(hardeare_threads != 0 ? hardeare_threads : 2, max_threads);
    unsigned long const block_size = length / num_threads;
    // 存放计算结果,
    vector<T> results(num_threads);
    // 设置线程存储
    vector<thread> threads(num_threads - 1);
    Iterator block_start = first;

    for (unsigned long i{}; i < num_threads - 1; ++i) {
        Iterator block_end = block_start;
        advance(block_end, block_size);
        threads[i] = thread(accmuluate_block<Iterator, T>(), block_start,
                            block_end, ref(results[i])); // 这里使用ref适配器
        block_start = block_end;
    }
    accmuluate_block<Iterator, T>()(block_start, last,
                                    results[num_threads - 1]);

    for (auto& entry : threads) entry.join();
    // 汇总每一个线程分块的结果, 累加得到最终结果, 所以结果需要满足结合律
    // (double/float不满足, 所以可能与串行版accumulate结果有出入)
    return accumulate(results.begin(), results.end(), init);
}


vector<int> get_vec() { // 生成测试数据
    vector<int> v;
    for (int i{}; i < 10000000; ++i) v.emplace_back(i);
    return v;
}


void t1() {
    auto v = get_vec();
    auto start = system_clock::now();
    int ans = parallel_accumulate(v.begin(), v.end(), 0);
    // int ans = accumulate(v.begin(), v.end(), 0);
    auto end = system_clock::now();
    auto duration = duration_cast<microseconds>(end - start);
    cout << "Time spent: "
         << double(duration.count()) * microseconds::period::num /
                microseconds::period::den
         << "s" << endl;

    cout << ans;

    // with parallel:
    // Time spent: 0.014697s
    // -2014260032

    // without parallel:
    // Time spent: 0.083763s
    // -2014260032
}

确实是快了将近8倍…