写在前面
在C++ 中实现多线程还是很容易的, 不像C的pthreads接口, 下面来总结一下C++多线程的一些基本操作, 包括线程的创建, 合并, 分离, 获取ID等操作, 主要参考了C++并发编程实战(第二版)的第一二章, 这本书应该是C++并发必看的经典了.
另外参考:
一些有用的程序
用于辅助
#include <iostream>
#include <cassert> // 断言
#include <chrono> // 计时
#include <thread> // 线程
using namespace std;
using namespace std::chrono; // 计时
using namespace std::literals; // 秒数字面量, C++14
睡眠
测试多线程, 不加睡眠系统实在是太容易假死了.
this_thread::sleep_for(1s); // 睡眠1s
计时
auto start = std::chrono::system_clock::now();
// 待计时的程序
auto end = std::chrono::system_clock::now();
auto duration = duration_cast<std::chrono::microseconds>(end - start);
cout << "Time spent: "
<< double(duration.count()) * std::chrono::microseconds::period::num /
std::chrono::microseconds::period::den
<< "s" << endl;
线程基础
头文件
thread
查看硬件支持
我的是8核CPU.
#include <iostream>
#include <thread>
int main(int argc, char const *argv[]) {
// static method
std::cout << std::thread::hardware_concurrency(); // 8
return 0;
}
如果是1核, 那就只能实现并发而不能实现并行了.
创建与合并(join)
构造函数: 直接传入函数名(函数指针), 以及对应的参数(如果有), 需要注意线程的join(), 否则主线程不会等待子线程结束.
void fun() { cout << "Hello t1!\n"; }
void t1() {
thread t1(&fun); // 传入函数指针
if (t1.joinable()) cout << "t1 is joinable\n", t1.join();
// 令主线程等待子线程
// t1 is joinable
// Hello t1!
}
其他创建方法:
- 传入函数对象: 临时对象, 即右值
- 传入函数对象: 具名对象, 即左值
- 传入lambda表达式
void t2() {
thread t2([] { cout << "Hello t2!\n"; }); // lambda 表达式
t2.join(); // Hello t2!
}
struct Foo {
void operator()() const { cout << "Hello t3!\n"; }
};
void t3() { // 传入临时函数对象
// 二义性(烦人的分析机制), 参见Effective STL,
// 即`只要C++语句有可能被解释成函数声明, 编译器就肯定将其解释为函数声明`
// thread t3((foo())); // 由于存在函数指针二义性, 这里必须用圆括号包裹
thread t3{Foo()}; // 同理, 这里用一致性初始化{}, 推荐这种方法
t3.join(); // Hello t3!
}
struct Foo1 {
void operator()() const { cout << "Hello t4!\n"; }
};
void t4() {
Foo1 f;
thread t4(f);
t4.join(); // Hello t4!
}
void t5() {
auto t5 = thread([] { cout << "Hello t5!\n"; });
t5.join(); // Hello t5!
}
事实上使用join()方法等待线程是一刀切
式的, 即要么不等待, 要么一直等待, 之后会采用期值(future)或者条件变量(condition_variable)来做.
并且线程只能被join一次.
int main() {
//
thread t1([] { cout << "AA\n"; });
t1.join(); //"AA"
cout << t1.joinable() << endl; // 0
t1.join(); // libc++abi: terminating with uncaught exception of type
// std::__1::system_error: thread::join failed: Invalid argument
}
线程分离: detach
分离的线程不受主线程(即main函数)的管理, 而是由C++runtime库管理(成为daemon守护/后台进程).
但是分离线程之后就无法等待线程结束了
void t1() {
thread t([] {
cout << "detached thread\n";
this_thread::sleep_for(1s);
});
t.detach();
assert(!t.joinable());
this_thread::sleep_for(1s);
cout << "Main thread\n";
}
int main(int argc, char const* argv[]) {
auto start = system_clock::now();
t1();
auto end = system_clock::now();
auto duration = duration_cast<microseconds>(end - start);
cout << "Time spent: "
<< double(duration.count()) * microseconds::period::num /
microseconds::period::den
<< "s" << endl;
// detached thread
// Main thread
// Time spent: 1.00508s
return 0;
}
可见主线程和分离的线程(几乎)同时结束. 耗时1s.
上面代码中, 如果用join而不是detach, 那么用时就是2s, 大家可以测试一下.
获取id
两种获取方法:
- 直接对thread对象调用成员函数
.get_id()
; - 通过在对应线程中(即传入线程的函数中)调用
this_thread::get_id()
.
int main() {
cout << "null thread id: " << thread().get_id() << endl;
cout << "null thread id: " << thread::id() << endl; // static func
thread t1([] {
cout << "Hello t1!\n";
cout << "t1 thread id(use this_thread::get_id): "
<< this_thread::get_id() << endl;
});
cout << "main thread id: " << this_thread::get_id() << endl;
cout << "t1 id(use t1.get_id): " << t1.get_id() << endl;
t1.join();
// null thread id: 0x0
// null thread id: 0x0
// main thread id: 0x1046d0580
// t1 id(use t1.get_id): 0x16bc43000
// Hello t1!
// t1 thread id(use this_thread::get_id): 0x16bc43000
}
线程实战
参数传递的小问题
case 1: 常量引用
线程具有内部存储空间, 参数会按照默认方式先复制到该处, 新创建的线程才能直接访问它们.
然后, 这些副本被当成临时变量, 以右值形式传给新线程上的函数或者可调用对象.
即便函数的相关参数是引用, 上述过程依然会发生.
void oops() {
//
auto f = [](int i, string const& s) { cout << i << s << endl; };
char buf[1024]; // 局部变量(自动变量 )
snprintf(buf, 10, "%i", 100);
// thread t(f, 3, buf); // buf 可能已销毁
// 直接传入 buf 可能会出现安全性问题, 原因是参数传递本意是将 buf 隐式转换为
// String, 再将其作为函数参数, 但转换不一定能及时开始(由于 thread
// 的工作机制, 其构造函数需要原样复制所有传入的参数)
thread t(f, 3, string(buf)); // 这样可以解决, 直接在传入之前进行构造
t.detach();
}
自动变量: 代码块内声明或者定义的局部变量, 位于程序的栈区.
case 2: 非常量引用
// 传入一个非常量引用
class Widget {};
void oops_again() {
auto f = [](int id, Widget& w) {};
Widget w1;
// 此时传入的 w1 是右值形式, move-only 型别, 因为非常量引用不能向其传递右值
// thread t(f, 10, w1);
thread t(f, 10, std::ref(w1));
t.join();
}
针对非常量引用, 由于这种形参不能接受右值变量, 所以一定要加上std::ref
修饰(配接器)
case 3: 成员函数
class X {
public:
void do_something() { cout << "do_something\n"; }
};
void t2() {
X my_x;
// 向某个类的成员函数设定为线程函数, 需要传入函数指针, 指向该成员函数
thread t(&X::do_something, &my_x);
// 若考虑到对象指针, 成员函数的第一个形参实际上是其第二个实参
// 向线程函数 传入的第三个参数就是成员函数的第一个参数
}
针对成员函数的参数传递, 需要考虑形参的顺序(将成员的地址作为成员函数的第一个参数, 然后才传入成员函数的参数)
case 4: 智能指针的控制权转移
void process(unique_ptr<X>){} // X 定义在 case 3
void t3(){
unique_ptr<X> p(new X);
p->do_something();
thread t(process, std::move(p));// 通过 move 移交智能指针所指对象的控制权
}
通过 std::move() 移交控制权
移动语义支持
通过移动语义, thread可以实现控制权移交.
void f() { cout << "f()\n"; }
void g() { cout << "g()\n"; }
void test1() {
thread t1(f); // t1:f
t1.join();
thread t2 = move(t1); // t2:f
t1 = thread(g); // t1:g
t1.join();
thread t3;
t3 = move(t2); // t1:g t2:∅ t3:f
// 运行f的线程归属权转移到t1, 该线程最初由t1启动, 但是在转移时,
// t1已经关联到g的线程, 因此terminate()会被调用, 终止程序.
t1 = move(t3); // 终止整个程序
// f()
// g()
}
void f3(thread t) {}
void g3() {
// 线程归属权可以转移到函数内部, 函数能够接收thread实例作为按右值传递的参数.
f3(thread(f));
thread t(f);
f3(std::move(t));
}
std::move() 仅仅将左值强制类型转换为右值, 但是不进行其他操作, 真正移交控制权的时刻是 t2 的move构造调用时(初始化)
并行版的accumulate
template <typename Iterator, typename T>
struct accmuluate_block {
void operator()(Iterator first, Iterator last, T& result) {
result = accumulate(first, last, result);
}
};
template <typename Iterator, typename T>
T parallel_accumulate(Iterator first, Iterator last, T init) {
// 设置常量
unsigned long const length = distance(first, last);
if (!length) return init; // 如果计算区间为空, 返回初值
unsigned long const min_per_thread = 25; // 每一个线程计算的数量
unsigned long const max_threads = // 最大线程数
(length + min_per_thread - 1) / min_per_thread;
unsigned long const hardeare_threads = thread::hardware_concurrency(); // 8
unsigned long const num_threads = // 实际线程数
min(hardeare_threads != 0 ? hardeare_threads : 2, max_threads);
unsigned long const block_size = length / num_threads;
// 存放计算结果,
vector<T> results(num_threads);
// 设置线程存储
vector<thread> threads(num_threads - 1);
Iterator block_start = first;
for (unsigned long i{}; i < num_threads - 1; ++i) {
Iterator block_end = block_start;
advance(block_end, block_size);
threads[i] = thread(accmuluate_block<Iterator, T>(), block_start,
block_end, ref(results[i])); // 这里使用ref适配器
block_start = block_end;
}
accmuluate_block<Iterator, T>()(block_start, last,
results[num_threads - 1]);
for (auto& entry : threads) entry.join();
// 汇总每一个线程分块的结果, 累加得到最终结果, 所以结果需要满足结合律
// (double/float不满足, 所以可能与串行版accumulate结果有出入)
return accumulate(results.begin(), results.end(), init);
}
vector<int> get_vec() { // 生成测试数据
vector<int> v;
for (int i{}; i < 10000000; ++i) v.emplace_back(i);
return v;
}
void t1() {
auto v = get_vec();
auto start = system_clock::now();
int ans = parallel_accumulate(v.begin(), v.end(), 0);
// int ans = accumulate(v.begin(), v.end(), 0);
auto end = system_clock::now();
auto duration = duration_cast<microseconds>(end - start);
cout << "Time spent: "
<< double(duration.count()) * microseconds::period::num /
microseconds::period::den
<< "s" << endl;
cout << ans;
// with parallel:
// Time spent: 0.014697s
// -2014260032
// without parallel:
// Time spent: 0.083763s
// -2014260032
}
确实是快了将近8倍…