写在前面
全部代码: ostep_cases;在 Linux+MacOS 下运行通过.
下面的代码都是基于 C 的, 当然为了不分文件测试我用了 C++ 的命名空间, 实际使用的话可以用类封装一下 API, 使用 RAII 惯用法, 避免忘记释放锁, 这部分内容在之后的文章中给出.
线程(thread)
基本概念与术语
- 临界区: 访问共享资源的一段代码, 资源通常是一个变量或者数据结构
- 竞态条件(race condition): 出现在多个执行线程大致同时进入临界区时, 都试图更新共享的数据结构, 会导致异常的结果.
- 不确定性: 程序由一个或多个竞态条件组成, 程序的输出因运行而异, 具体取决于哪些线程在何时运行. 这就导致了不确定的结果
- 互斥原语: 为了避免竞态条件导致的不确定性, 引入互斥原语, 可以保证只有一个线程进入临界区, 从而避免出现竞态条件并产生确定的输出.
线程抽象
线程: 为单个运行进程提供的新抽象, CPU 调度的基本单位
.
一个程序只有一个执行点(程序计数器, 存放要执行的指令), 但多线程程序会有多个执行点(每个程序计数器都用来取指令和执行).
每一个线程类似于独立的进程, 唯一的区别是线程间共享(进程的)地址空间, 从而能访问相同的数据.
多线程运行时候也存在上下文的切换, 其中的信息保存在 TCB:即线程控制块中.
线程间上下文切换不需要切换当前使用的页表, 即地址空间不变.
多个线程独立运行, 并且都有独立的栈空间, 即线程本地存储. (thread-local)
POSIX 线程 API: Pthread 库
链接选项: -lpthread
或者-pthread
创建
#include <pthread.h>
int pthread_create(pthread_t *thread, const pthread_attr_t *attr,
void *(*start_routine) (void *), void *arg);
传入的函数需要是参数为 void*
, 返回值为 void*
的函数.
attr 一般来说传入 NULL 即可.
等待
int pthread_detach(pthread_t thread); // 分离线程
int pthread_join(pthread_t thread, void **retval); // 合并线程
锁
这部分 API 的 man 文档在 Ubuntu 中不是默认安装的, 需要通过
sudo apt install glibc-doc
安装.
pthread_mutex_t fastmutex = PTHREAD_MUTEX_INITIALIZER; // 静态初始化(编译期指定)
// 动态初始化(运行时分配), 需要配合 destroy 使用
int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *mutexattr);
int pthread_mutex_destroy(pthread_mutex_t *mutex);
// 上锁解锁
int pthread_mutex_lock(pthread_mutex_t *mutex);
int pthread_mutex_trylock(pthread_mutex_t *mutex);
int pthread_mutex_unlock(pthread_mutex_t *mutex);
条件变量
pthread_cond_t cond = PTHREAD_COND_INITIALIZER; // 静态初始化
// 动态初始化
int pthread_cond_init(pthread_cond_t *cond, pthread_condattr_t *cond_attr);
int pthread_cond_destroy(pthread_cond_t *cond);
// 通知一个等待线程
int pthread_cond_signal(pthread_cond_t *cond);
// 通知全部等待线程
int pthread_cond_broadcast(pthread_cond_t *cond);
// 执行等待, 使线程休眠, 等待接收其他线程发出的信号后启动
int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex); // 注意这里需要传入一个与此条件相关的锁,
// 超时等待
int pthread_cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex, const struct timespec *abstime);
wait 函数为什么需要锁作为第二参数呢?
因为等待调用除了让调用线程进入休眠状态, 还会让调用者睡眠时释放锁, 如果不释放, 别的线程就无法获得互斥锁从而开始执行.
信号量
仅在 Ubuntu 下测试, MacOS 下需要自己用条件变量和锁实现(应该是可以用sem_open
和sem_close
实现, 原有 API deprecated)
#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);
int sem_post(sem_t *sem);
int sem_wait(sem_t *sem);
int sem_trywait(sem_t *sem);
int sem_timedwait(sem_t *sem, const struct timespec *abs_timeout);
API 编程原则
-
保持简洁
-
减少线程交互
-
初始化锁和条件变量
-
检查返回值(判断错误/ 异常)
-
注意传给线程的参数和返回值
-
等待线程在 while 循环中重新检查条件, 而不是 if 语句: 虽然增加了开销, 但是提高了安全性
有一些 pthread 实现可能会错误地唤醒等待线程, 在这种情况下, 如果没有重新检查, 等待的线程会继续认为条件已经改变. 因此将唤醒视为某种事物可能已经发生改变的暗示, 而不是绝对的事实, 更安全.
-
记住每个线程都有自己的栈空间: 线程局部变量是每线程私有的, 其他线程不可访问.
线程间共享数据, 值要在堆区(动态内存)或者其他全局可访问的位置完成.
-
线程间通过条件变量发送信号: 而不是简单的标记变量+自旋锁
-
RTFM(Read The F*** Manual)
其他原则
- 只在需要的时候才并发, 尽可能不用并发, 过早优化是最糟糕的.
锁
基本思想
当一个线程在临界区, 锁(Linux 下即互斥量 mutex, mutable exclusion)能够阻止其他线程进入知道本线程离开临界区.
Pthread 实现
POSIX 将锁称为互斥量 mutex, 因为它被用来提供现成之间的互斥, 即当一个线程在临界区, 它能阻止其他线程进入临界区, 直到本线程离开临界区.
// static init mutex
pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_lock(&lock);
// do something
i = i + 1;
pthread_mutex_unlock(&lock);
这里给一份 C++实现: (Linux 下的实现还是调用了 Pthread 接口)
std::mutex mtx;
{
lock_guard<std::mutex> lk(mtx);
i = i + 1;
}
锁是一个变量, 这个变量(简称锁)保存了锁在某一个时刻的状态, 要么可用(表示没有线程持有锁, 临界区安全)要么不可用(表示某线程持有了锁, 线程处于临界区)
执行过程
一开始调用 lock()尝试获取锁, 如果没有其他线程持有锁(可用状态), 该线程会获得锁, 进入临界区.
当该线程(锁的持有者)持有锁后进入临界区之后, 其他锁调用 lock()时, 因为锁被该线程持有, 调用就不会返回, 使得其他线程无法进入临界区.
当锁的持有线程调用了 unlock(), 锁重新成为可用状态, 此时:
- 如果没有其他等待线程(阻塞等待获取锁), 该锁就重新成为可用状态.
- 如果有等待线程, 其中一个最终会注意到(或者收到通知)锁状态发生改变, 于是立即获取该锁, 进入临界区执行操作.
锁为程序员提供了最小程度的调度控制.
视线程为程序员创建的实体, 但是被操作系统调度, 具体方式由操作系统选择. 是锁让程序员获得了一些控制权.
通过给临界区加锁, 可以保证临界区内只有一个线程活跃.
结果
锁让原本由操作系统调度的混乱状态变得更为可控.
设计良好的锁应具备的条件
- 提供互斥: 能否阻止多个线程进入临界区, 基本条件.
- 公平性: 是否每一个竞争线程都有公平的机会抢到锁
- 性能: 指标是使用锁之后 增加的时间开销
早期尝试: 关闭中断
在临界区关闭中断, 这样临界区的代码就不会被中断, 从而保证原子化执行, 但这仅限于单处理器系统.
优点
- 简单, 直观
缺点
- 仅支持单处理器系统
- 要求允许所有调用线程执行特权操作(中断的开启关闭都是内核级操作), 可能存在一个瑞程序开始时就调用 lock, 独占处理器, 如果此时程序死循环, 系统就无法重新获得控制, 只能重启.
- 关闭中断会导致中断丢失
- 效率低下
TAS: 测试并设置指令(原子交换)
测试并设置指令(test-and-set instruction), 也称原子交换(atomic exchange)
原理
首先实现一个不依赖 TAS 的锁, 用一个变量来标志锁是否被某些线程占用.
- 第一个线程进入临界区, 调用 lock() , 检查标志是否为 1, 然后设置为 1, 表明线程持有该锁.
- 结束临界区时, 调用 unlock(), 清除标志, 表示锁未被持有.
第一次实现: 代码
typedef struct lock_t {
int flg;
} lock_t;
// 初始时为 0 表示未持有, 1 表示持有
void init(lock_t* mutex) { mutex->flg = 0; }
void lock(lock_t* mutex) {
// 持有, 一直等待
while (mutex->flg == 1) // TEST flg
; // spin wait
mutex->flg = 1; // SET
}
void unlock(lock_t* mutex) { mutex->flg = 0; }
第一个线程持锁后(调用 lock())进入临界区, flg 值为 1, 此时另一个线程调用 lock()后自旋(spin)等待, 直到第一个线程unlock()
看似没问题, 但是如果执行序列如下: (其中 interrupt 表示系统中断, 可以认为是时间片到期轮转)
Thread 1 | Thread 2 |
---|---|
call lock() while(flag == 1) interrupt: switch to Thread 2 |
|
call lock() while (flag == 1) flaga == 1; interrupt: switch to Thread 1 |
|
flag = 1; // set flag to 1 (too) |
此时虽然上了锁, 但是两个线程都可以进入临界区(因为 flag 都为 1)
所以该实现存在正确性问题.
第二次实现: 代码
注意, 这里的代码是伪码, 即在真实条件下虽然可以运行但是并不能有效锁住互斥, 因为不保证原子性.
int TestAndSet(int *old_ptr, int new_val) {
int old = *old_ptr; // 取出旧值
*old_ptr = new_val; // 设置新值
return old; // 返回旧值
}
typedef struct lock_t {
int flg;
} lock_t;
// 初始时为 0 表示未持有, 1 表示持有
void init(lock_t* lock) { lock->flg = 0; }
void lock(lock_t* lock) {
// 持有, 一直等待
while (TestAndSet(&lock->flg, 1) == 1) // TEST flg
; // spin wait
lock->flg = 1; // SET
}
void unlock(lock_t* lock) { lock->flg = 0; }
这次使用了一个新的方法测试并设置
(以下简称TAS
), 下面分析可行性:
可行性
- case1: 假设有一个现成在运行, 调用 lock(),没有其他线程持有锁, 所以 flg=0, 调用
TAS(flg, 1)
, flg 被设置为 1, 返回旧值 0, 此时 lock() 方法中 while 循环跳出, 此时获取了锁, 又一次设置 flg 为 1, 标志锁被持有. 线程离开临界区, 调用 unlock()清理 flg, 置为 0. - case2: 某线程已经持有了锁, 目前 flg=1, 本线程此时调用lock() 尝试获取锁, 在循环内调用
TAS(flg, 1)
, 由于此时 flg 本来就是 1, 所以循环一直不跳出, 即一直处于自旋状态. 当上一线程结束执行, 调用 unlock()释放了锁之后, flg 此时为 0, 当前线程在 TAS 执行中返回了 0, 跳出循环, 重新设置 flg=1, 持有锁, 开始进入临界区执行操作.
上面的 flg 设置操作都是原子的(atomic), 这是锁可行性的基础.
将测试(测试旧值), 和设置(设置新值)合并为一个原子操作, 这就保证了只有一个线程能获取锁, 这就实现了一个有效的互斥原语.
公平性
可能存在争用, 导致某一线程饿死.
性能
单 CPU 情况下 =, 自旋锁开销很大
比较并交换(CAS)
int CompareAndSwap(int *ptr, int expected, int new_val) {
int actual = *ptr;
if (actual == expected)
*ptr = new_val;
return actual;
}
同时, 上面的代码可以直接拿来用:
int TestAndSet(int *old_ptr, int new_val) {
int old = *old_ptr; // 取出旧值
*old_ptr = new_val; // 设置新值
return old; // 返回旧值
}
typedef struct lock_t {
int flg;
} lock_t;
// 初始时为 0 表示未持有, 1 表示持有
void init(lock_t* lock) { lock->flg = 0; }
void lock(lock_t* lock) {
// 持有, 一直等待
while (CompareAndSwap(&lock->flg, 0, 1) == 1) // TEST flg
; // spin wait
lock->flg = 1; // SET
}
void unlock(lock_t* lock) { lock->flg = 0; }
可以使用的实现: (x86_64)
#include <cstdio>
#include <pthread.h>
int TestAndSet(int* old_ptr, int new_val) {
int old = *old_ptr; // 取出旧值
*old_ptr = new_val; // 设置新值
return old; // 返回旧值
}
// 伪码
// int CompareAndSwap(int* ptr, int expected, int new_val) {
// int actual = *ptr;
// if (actual == expected) //
// *ptr = new_val;
// return actual;
// }
char CompareAndSwap(int* ptr, int old_val, int new_val) {
unsigned char ans;
__asm__ __volatile__(
"lock\n"
"cmpxchgl %2, %1\n"
"sete %0\n"
: "=q"(ans), "=m"(*ptr)
: "r"(new_val), "m"(*ptr), "a"(old_val)
: "memory");
return ans;
}
typedef struct lock_t {
int flg;
} lock_t;
// 初始时为 0 表示未持有, 1 表示持有
void init(lock_t* lock) { lock->flg = 0; }
void lock(lock_t* lock) {
// 持有, 一直等待
// while (TestAndSet(&lock->flg, 1) == 1) // TEST flg
while (CompareAndSwap(&lock->flg, 0, 1) == 1) // TEST flg
; // spin wait
lock->flg = 1; // SET
}
void unlock(lock_t* lock) { lock->flg = 0; }
int i{};
lock_t lk;
void* f(void*) {
int num = 100000;
for (int n{}; n < num; ++n) {
lock(&lk);
i = i + 1;
unlock(&lk);
}
return NULL;
}
int main(int argc, char* argv[]) { // 测试
init(&lk);
pthread_t tid1, tid2;
pthread_create(&tid1, NULL, f, NULL);
pthread_create(&tid2, NULL, f, NULL);
pthread_join(tid1, NULL);
pthread_join(tid2, NULL);
printf("%d\n", i); // 200000
return 0;
}
临界区的实现
链接加载
条件式存储
获取并增加
int FetchAndAdd(int *ptr) {
int old = *ptr;
*ptr = old + 1;
return old;
}
typedef struct {
int ticket;
int turn;
} lock_t;
void lock_init(lock_t *lock) {
lock->ticket = 0;
lock->turn = 0;
}
void lock(lock_t *lock) {
int myturn = FetchAndAdd(&lock->ticket);
while (lock->turn != myturn)
; // spin
}
void unlock(lock_t *lock) { //
FetchAndAdd(&lock->turn);
}
使用队列: 休眠代替自旋
一般的调度会导致一直自旋或者立即让出 CPU, 此时锁的实现就需要依赖于 OS 的控制.
并发数据结构(using lock)
计数器
先来看一个朴素版本(不保证线程安全/同步机制)
typedef struct counter_t {
int val;
} counter_t;
void init(counter_t* c) { c->val = 0; }
void inc(counter_t* c) { c->val++; }
void dec(counter_t* c) { c->val--; }
int get(counter_t* c) { return c->val; }
下面是基于锁的最基本的线程安全计数器:
typedef struct counter_t {
int val;
pthread_mutex_t lock;
} counter_t;
void init(counter_t* c) {
c->val = 0;
pthread_mutex_init(&c->lock, NULL);
}
void inc(counter_t* c) {
pthread_mutex_lock(&c->lock);
c->val++;
pthread_mutex_unlock(&c->lock);
}
void dec(counter_t* c) {
pthread_mutex_lock(&c->lock);
c->val--;
pthread_mutex_unlock(&c->lock);
}
int get(counter_t* c) {
pthread_mutex_lock(&c->lock);
int tmp= c->val;
pthread_mutex_unlock(&c->lock);
return tmp;
}
虽然保证了安全性, 但是性能不好.
懒惰计数器: 通过多核分别上局部锁实现:
#include <cstdio>
#include <pthread.h>
namespace lazy {
const int NUMCPUS = 8;
typedef struct counter_t {
int global; // 全局计数器
pthread_mutex_t glock;
int local[NUMCPUS];
pthread_mutex_t llock[NUMCPUS];
int threshold;
} counter_t;
void init(counter_t* c, int threshold) {
c->threshold = threshold;
c->global = 0;
pthread_mutex_init(&c->glock, NULL);
for (int i = 0; i < NUMCPUS; ++i) {
c->local[i] = 0;
pthread_mutex_init(&c->llock[i], NULL);
}
}
void update(counter_t* c, int threadID, int amt) {
pthread_mutex_lock(&c->llock[threadID]);
c->local[threadID] += amt;
if (c->local[threadID] >= c->threshold) {
pthread_mutex_lock(&c->glock);
c->global += c->local[threadID];
pthread_mutex_unlock(&c->glock);
c->local[threadID] = 0;
}
pthread_mutex_unlock(&c->llock[threadID]);
}
int get(counter_t* c) {
pthread_mutex_lock(&c->glock);
int tmp = c->global;
pthread_mutex_unlock(&c->glock);
return tmp;
}
} // namespace lazy
using namespace lazy;
counter_t cnt;
void* f(void*) {
for (int i{}; i < 10000000; ++i) //
update(&cnt, 1, 1);
return NULL;
}
void t1() {
init(&cnt, 1000);
pthread_t tids[NUMCPUS];
for (int i = 0; i < NUMCPUS; ++i) {
pthread_create(&tids[i], NULL, f, NULL);
pthread_join(tids[i], NULL);
}
printf("%d\n", get(&cnt));
// 80000000
}
int main(int argc, char* argv[]) {
t1();
return 0;
}
在我的 8 核 M1 芯片 Mac 上跑, 没有问题.
并发链表
typedef struct node_t {
int key;
struct node_t *next;
} node_t;
typedef struct list_t {
node_t *head;
pthread_mutex_t lock;
} list_t;
void list_init(list_t *L) {
L->head = NULL;
pthread_mutex_init(&L->lock, NULL);
}
int list_insert(list_t *L, int key) {
node_t *new_node = (node_t *)malloc(sizeof(node_t));
if (new_node == NULL) {
perror("malloc"); // malloc 如果失败返回空指针, 此时必须释放锁
return -1;
}
pthread_mutex_lock(&L->lock); // 仅在临界区加锁
new_node->key = key;
new_node->next = L->head;
L->head = new_node;
pthread_mutex_unlock(&L->lock);
return 0;
}
int list_lookup(list_t *L, int key) {
int rv = -1;
pthread_mutex_lock(&L->lock);
node_t *cur = L->head;
while (cur) {
if (cur->key == key) {
rv = 0;
break;
}
cur = cur->next;
}
pthread_mutex_unlock(&L->lock);
return rv; // both success and failure
}
list_t *ll = new list_t;
void *f_insert(void *) {
for (int i{}; i < 100; ++i) { //
list_insert(ll, i);
}
return NULL;
}
测试代码:
void t1() {
using namespace improve_linkedlist;
list_init(ll);
pthread_t tid1, tid2;
pthread_create(&tid1, NULL, f_insert, NULL);
pthread_create(&tid2, NULL, f_insert, NULL);
pthread_join(tid1, NULL);
pthread_join(tid2, NULL);
for (auto cur = ll->head; cur; cur = cur->next) //
printf("%d\t", cur->key);
}
并发队列
这个设计很巧妙, 通过上面的链表实现, 并且添加一个哑结点.
- 初始时头尾指针都指向哑结点
- 入队(enqueue)时候从哑结点后面插入(尾插), 重新设置尾结点为新节点
- 出队(dequeue)有点技巧, 直接弹出头结点(哑结点)之后, 需要将新的头结点标记为哑结点.
#include <cassert>
#include <pthread.h>
#include <cstdlib>
#include <cstdio>
typedef struct __node_t {
int val;
struct __node_t* next;
} node_t;
typedef struct queue_t {
node_t *head, *tail;
pthread_mutex_t headLock, tailLock;
} queue_t;
void queue_init(queue_t* q) {
node_t* tmp = (node_t*)malloc(sizeof(node_t));
tmp->next = NULL;
q->head = q->tail = tmp; // 中节点, 分割实际入队和出队的序列
pthread_mutex_init(&q->headLock, NULL);
pthread_mutex_init(&q->tailLock, NULL);
}
void enqueue(queue_t* q, int val) {
node_t* dummy = (node_t*)malloc(sizeof(node_t));
assert(dummy != NULL);
dummy->val = val;
dummy->next = NULL;
pthread_mutex_lock(&q->tailLock); // 队尾插入
q->tail->next = dummy;
q->tail = dummy;
pthread_mutex_unlock(&q->tailLock); // 队尾插入
}
int dequeue(queue_t* q, int* val) {
pthread_mutex_lock(&q->headLock); // 队头删除
node_t* tmp = q->head;
node_t* newHead = tmp->next;
if (newHead == NULL) {
pthread_mutex_unlock(&q->headLock);
free(tmp); //
return -1; // queue empty
}
*val = newHead->val;
q->head = newHead;
pthread_mutex_unlock(&q->headLock);
free(tmp);
return 0;
}
void t1() { // basic test
queue_t* q = (queue_t*)malloc(sizeof(queue_t));
queue_init(q);
for (int i{}; i < 10; ++i) { //
enqueue(q, i);
}
for (int i{}; i < 5; ++i) { //
int tmp;
dequeue(q, &tmp);
printf("%d\t", tmp);
}
// 0 1 2 3 4
putchar('\n');
for (int i{}; i < 10; ++i) { //
enqueue(q, i);
}
for (int i{}; i < 10; ++i) { //
int tmp;
dequeue(q, &tmp);
printf("%d\t", tmp);
}
putchar('\n');
// 5 6 7 8 9 0 1 2 3 4
}
int main(int argc, char* argv[]) {
t1();
return 0;
}
并发散列表
#include <pthread.h>
#include <cstdio>
#include <cstdlib>
const int BUCKETS = 101; // 桶大小
typedef struct node_t {
int key;
struct node_t *next;
} node_t;
typedef struct list_t {
node_t *head;
pthread_mutex_t lock;
} list_t;
void list_init(list_t *L) {
L->head = NULL;
pthread_mutex_init(&L->lock, NULL);
}
int list_insert(list_t *L, int key) {
node_t *new_node = (node_t *)malloc(sizeof(node_t));
if (new_node == NULL) {
perror("malloc"); // malloc 如果失败返回空指针, 此时必须释放锁
return -1; // failure
}
pthread_mutex_lock(&L->lock); // 仅在临界区加锁
new_node->key = key;
new_node->next = L->head;
L->head = new_node;
pthread_mutex_unlock(&L->lock);
return 0; // success
}
int list_lookup(list_t *L, int key) {
int rv = -1;
pthread_mutex_lock(&L->lock);
node_t *cur = L->head;
while (cur) {
if (cur->key == key) {
rv = 0;
break;
}
cur = cur->next;
}
pthread_mutex_unlock(&L->lock);
return rv; // both success and failure
}
typedef struct __hash_t {
list_t lists[BUCKETS]; // 拉链法
} hash_t;
void hash_init(hash_t *h) {
for (int i = 0; i < BUCKETS; ++i) {
list_init(&h->lists[i]);
}
}
int hash_insert(hash_t *h, int key) {
int bucket = key % BUCKETS;
return list_insert(&h->lists[bucket], key);
}
int hash_lookup(hash_t *h, int key) {
int bucket = key % BUCKETS;
return list_lookup(&h->lists[bucket], key);
}
未测试.
条件变量
是一个显式队列, 当某些执行状态(条件)不满足时, 线程把自己加入队列, 等待该条件(wait)成立. 另外的某一个线程中, 当它改变了上述状态, 使之条件成立, 就可以唤醒一个或者多个等待线程(通过在该条件上发信号,signal), 以让它们继续执行.
基本使用
int done = 0;
// static init
pthread_mutex_t m = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t c = PTHREAD_COND_INITIALIZER;
void thr_exit() {
pthread_mutex_lock(&m);
done = 1;
pthread_cond_signal(&c);
pthread_mutex_unlock(&m);
}
void *child(void *arg) {
printf("child\n");
sleep(1);
thr_exit();
return NULL;
}
void thr_join() {
pthread_mutex_lock(&m);
// if (done == 0) // also ok, but `while` is best
while (done == 0) //
pthread_cond_wait(&c, &m);
pthread_mutex_unlock(&m);
}
上述的等待调用(wait)将锁作为第二参数, 而信号调用(signal)仅需要一个参数(cond), 造成这种差异的原因是:
等待调用除了使调用线程进入睡眠状态之外, 还会让调用者睡眠时释放锁.
如果调用者睡眠时不释放锁, 其他线程则无法获取锁并将其唤醒.
但是, 在被唤醒之后, 返回之前, pthread_cond_wait()
会重新获取该锁, 从而确保在等待序列开始时获取锁以及结束时释放锁之间的任何时间内, 等待线程都持有锁.
生产者消费者模型(条件变量版)
又称为有界缓冲区问题.
问题描述
假设有一个或者多个生产者线程和一个或多个消费者线程, 生产者把生成的数据放入缓冲区, 消费者从缓冲区中取走数据.
实现
#include <pthread.h>
#include <unistd.h>
#include <cassert>
#include <cstdio>
namespace version1_2_3 {
int buf; // 缓冲区
int cnt; // 数据
const int LOOP_NUMS = 5;
void put(int val) {
assert(cnt == 0);
cnt = 1;
buf = val;
}
int get() {
assert(cnt == 1);
cnt = 0;
return buf;
}
pthread_cond_t empty;
pthread_cond_t full;
pthread_mutex_t mutex;
void *producer(void *arg) {
int loops = LOOP_NUMS;
for (int i = 0; i < loops; ++i) {
pthread_mutex_lock(&mutex);
while (cnt == 1) // 满了, 等待变空
pthread_cond_wait(&empty, &mutex);
put(i);
printf("%d put \n", i);
pthread_cond_signal(&full);
pthread_mutex_unlock(&mutex);
}
return NULL;
}
void *consumer(void *arg) {
int loops = LOOP_NUMS;
for (int i = 0; i < loops; ++i) {
pthread_mutex_lock(&mutex);
while (cnt == 0) // 空了, 等待变满
pthread_cond_wait(&full, &mutex);
int tmp = get();
printf("%d gotten \n", tmp);
pthread_cond_signal(&empty);
pthread_mutex_unlock(&mutex);
}
return NULL;
}
} // namespace version1_2_3
void t1() {
using namespace version1_2_3;
// version 2: 将 cond_wait 的 if 改为 while, 但是会导致三个线程都睡眠
// 原因在于仅有一个条件变量, 使得消费者唤醒了消费者
// 使用第二个条件变量解决这个问题
pthread_t tid1, tid2, tid3;
pthread_create(&tid1, NULL, producer, NULL);
pthread_create(&tid2, NULL, consumer, NULL);
pthread_create(&tid3, NULL, consumer, NULL);
pthread_join(tid1, NULL);
pthread_join(tid2, NULL);
pthread_join(tid3, NULL);
}
namespace version_final {
//
const int MAX = 5;
const int LOOP_NUMS = 10;
int buf[MAX];
int fill = 0;
int use = 0;
int cnt = 0;
void put(int val) {
buf[fill] = val;
fill = (fill + 1) % MAX;
++cnt;
}
int get() {
int tmp = buf[use];
use = (use + 1) % MAX;
--cnt;
return tmp;
}
pthread_cond_t empty;
pthread_cond_t full;
pthread_mutex_t mutex;
void *producer(void *arg) {
int loops = LOOP_NUMS;
for (int i = 0; i < loops; ++i) {
pthread_mutex_lock(&mutex);
while (cnt == MAX) // 满了, 等待变空
pthread_cond_wait(&empty, &mutex);
put(i);
printf("%d put \n", i);
pthread_cond_signal(&full);
pthread_mutex_unlock(&mutex);
}
return NULL;
}
void *consumer(void *arg) {
int loops = LOOP_NUMS;
for (int i = 0; i < loops; ++i) {
pthread_mutex_lock(&mutex);
while (cnt == 0) // 空了, 等待变满
pthread_cond_wait(&full, &mutex);
int tmp = get();
usleep(30000);
printf("%d gotten \n", tmp);
pthread_cond_signal(&empty);
pthread_mutex_unlock(&mutex);
}
return NULL;
}
} // namespace version_final
void t2() {
using namespace version_final;
pthread_t tid1, tid2, tid3;
pthread_create(&tid1, NULL, producer, NULL);
pthread_create(&tid2, NULL, consumer, NULL);
// pthread_create(&tid3, NULL, consumer, NULL);
pthread_join(tid1, NULL);
pthread_join(tid2, NULL);
// pthread_join(tid3, NULL);
// 1 生产者 2 消费者还是会忙等
}
int main(int argc, char *argv[]) {
// t1();
t2();
return 0;
}
高级使用: 状态机
参考了南大 OS 课程
合成水分子问题:
static enum state { A = 1, B, C, D, E } tmp;
vector<tuple<int, char, int>> rules = {
{A, 'H', B}, //
{B, 'O', C}, //
{C, 'H', A}, //
{B, 'H', D}, //
{D, 'O', A}, //
{A, 'O', E}, //
{E, 'H', C}, //
};
class H2O {
int next(char c) {
enum Items { from = 0, ch, to };
for (int i = 0; i < rules.size(); ++i) {
auto rule = rules[i];
if (get<from>(rule) == cur && get<ch>(rule) == c) {
return get<to>(rule);
}
}
return 0;
}
int can_print(char ch) { return next(ch) != 0 && quota > 0; }
void H2O_before(char ch) {
std::unique_lock<std::mutex> lk(mtx);
while (!can_print(ch)) //
cv.wait(lk);
--quota;
}
void H2O_after(char ch) {
lock_guard<std::mutex> lk(mtx);
++quota;
cur = next(ch);
cv.notify_all();
}
int cur, quota;
std::mutex mtx;
std::condition_variable cv;
public:
H2O() : cur(A), quota(1) {}
void hydrogen(function<void()> releaseHydrogen) {
H2O_before('H');
releaseHydrogen();
H2O_after('H');
}
void oxygen(function<void()> releaseOxygen) {
H2O_before('O');
releaseOxygen();
H2O_after('O');
}
};
信号量 semaphore
可以理解为条件变量的一种泛化, 但是这种泛化并不是必须的/通用的.
理解信号量
可以理解为一个袋子, 里面有若干个球(资源, 信号量值)
- P 操作(wait): 试着从袋子中拿出一个球,
- 如果成功了, 就离开(函数返回)
- 失败了(袋子为空), 等待(while 循环)
- V 操作(post): 往袋子中放入一个球,
- 如果有人在等待球, 就可以拿走刚放进去的球并返回
- 放球-拿球这个过程实现了 线程同步
作为锁: X=1
下面的测试以及作为条件变量的测试都是在 Ubuntu 跑的, MacOS 上信号量 API 已经 deprecated 了.
#include <semaphore.h>
#include <pthread.h>
#include <cstdio>
namespace sem_as_lock {
sem_t s;
int cnt = 0;
void *f(void *) {
sem_wait(&s);
for (int i = 0; i < 1000000; ++i) { //
cnt = cnt + 1;
}
sem_post(&s);
return NULL;
}
} // namespace sem_as_lock
void t1() {
using namespace sem_as_lock;
sem_init(&s, 0, 1);
pthread_t tid1, tid2;
pthread_create(&tid1, NULL, f, NULL);
pthread_create(&tid2, NULL, f, NULL);
pthread_join(tid1, NULL);
pthread_join(tid2, NULL);
printf("cnt=%d\n", cnt); // 2000000
}
int main(int argc, char *argv[]) {
t1();
return 0;
}
作为条件变量: X=0
等待子线程完成之后才继续运行父线程
#include <semaphore.h>
#include <pthread.h>
#include <cstdio>
namespace sem_as_cond_variable {
sem_t s;
void *child(void *arg) {
printf("child\n");
sem_post(&s); // signal : child done
return NULL;
}
} // namespace sem_as_cond_variable
void t2() {
using namespace sem_as_cond_variable;
sem_init(&s, 0, 0);
printf("parent: begin\n");
pthread_t tid;
pthread_create(&tid, NULL, child, NULL);
sem_wait(&s);
printf("parent: end\n");
// test in ubuntu: x87
// parent: begin
// child
// parent: end
}
int main(int argc, char *argv[]) {
t2();
return 0;
}
通过锁和条件变量实现信号量
有了上面两部分内容, 可以发现, 信号量可以体现锁的性质(持有并等待)和条件变量的性质(等待条件发生), 于是就可以使用锁和条件变量实现一个信号量, 事实上在 MacOS 下只能这样来做, 因为信号量 API 处于deprecated
状态, 应该是可以用sem_open
等接口的, 这里就不增加学习成本了.
这部分是一个基本的实现, 通过预定义宏的方式提供 Mac 下的semaphore接口.
要完成这个简单信号量的实现, 首先要理解信号量做了什么, 初始化的值会影响信号量实际的行为. 下面这两个函数就是 pthread 提供的 API, 就像 lock()和 unlock() 一样, 都需要成对出现.
- sem_wait(): 首先判断信号量值(下面简称值)是否大于等于 1, 如果是, 减去 1, 直接返回; 如果小于等于 0, 这时候就进入等待状态, 直到值重新大等 1, 然后减去 1, 返回.
- sem_post(): 直接将值增加 1, 并且发出信号告诉忙等线程目前的值. 返回.
并且, 信号量内部对值的更改应该都是原子操作, 这在实现上可以用锁来完成.
typedef struct __Zem_t {
int value;
pthread_cond_t cond;
pthread_mutex_t lock;
} Zem_t;
void Zem_init(Zem_t *z, int value) {
z->value = value;
pthread_cond_init(&z->cond, NULL);
pthread_mutex_init(&z->lock, NULL); // 动态初始化, 最后需要用 destroy 释放
}
void Zem_wait(Zem_t *z) {
// 如果 val >= 1, -1, 返回
// 否则线程挂起, 直到之后的 post 操作
pthread_mutex_lock(&z->lock);
while (z->value <= 0) //
pthread_cond_wait(&z->cond, &z->lock);
--z->value;
pthread_mutex_unlock(&z->lock);
}
void Zem_post(Zem_t *z) {
// 并不等待某些条件满足, 仅增加信号量的值, 如果有等待线程, 唤醒其中一个
pthread_mutex_lock(&z->lock);
++z->value;
pthread_cond_signal(&z->cond); // 唤醒
pthread_mutex_unlock(&z->lock);
}
#ifdef __APPLE__ // just for APPLE
typedef Zem_t sem_t;
#define sem_wait(s) Zem_wait(s)
#define sem_post(s) Zem_post(s)
// placeholder for API compatibliity
#define sem_init(s, placeholder, v) Zem_init(s, v)
#endif
实际使用可以放在头文件中, 然后用条件宏判断不同平台以决定使用哪个头文件.
这里要注意, 这个自己实现的信号量与 pthread 版本并不是完全一致的, pthread 库的信号量(Dijkstra 提出的信号量)会将信号量值减到负数, 这个负数的值(绝对值)就是等待线程的个数, 而这里的实现并不会让值减到负数.
生产者消费者模型(信号量版)
namespace version3 { // ok
const int MAX = 10;
const int LOOP_NUMS = 100;
int buf[MAX];
int fill = 0, use = 0;
void put(int val) {
buf[fill] = val;
fill = (fill + 1) % MAX;
}
int get() {
int tmp = buf[use];
use = (use + 1) % MAX;
return tmp;
}
sem_t empty, full;
sem_t mutex;
void* producer(void* arg) {
for (int i = 0; i < LOOP_NUMS; ++i) {
sem_wait(&empty);
sem_wait(&mutex);
put(i);
usleep(5000);
sem_post(&mutex);
sem_post(&full);
}
// end case
for (int i = 0; i < 2; i++) { // 2 消费者
sem_wait(&empty);
sem_wait(&mutex);
put(-1);
sem_post(&mutex);
sem_post(&full);
}
return NULL;
}
void* consumer(void* arg) {
int tmp = 0;
while (-1 != tmp) {
sem_wait(&full);
sem_wait(&mutex);
tmp = get();
usleep(5000);
sem_post(&mutex);
sem_post(&empty);
printf("%d\n", tmp);
}
return NULL;
}
} // namespace version3
void t1() {
using namespace version3;
sem_init(&empty, 0, MAX);
sem_init(&full, 0, 0);
sem_init(&mutex, 0, 1);
pthread_t tid1, tid2, tid3;
pthread_create(&tid1, NULL, producer, NULL);
pthread_create(&tid2, NULL, consumer, NULL);
pthread_create(&tid3, NULL, consumer, NULL);
pthread_join(tid1, NULL);
pthread_join(tid2, NULL);
pthread_join(tid3, NULL);
}
读写锁(信号量的使用)
会造成写者饥饿, 特别是大量读者占有锁的情况下, 解决方案是 在写者等待时, 避免更多的读者进入. 或许可以通过增加一个threshold
实现.
读写锁性能并不高, 因为存在公平性问题.
哲学家就餐问题
这里不想自己写 API 了, 直接用力扣现成的.
#include <semaphore.h>
class DiningPhilosophers {
sem_t forks[5];
public:
DiningPhilosophers() {
for (int i{}; i < 5; ++i) sem_init(&forks[i], 0, 1); // as lock
}
void wantsToEat(int philosopher, function<void()> pickLeftFork,
function<void()> pickRightFork, function<void()> eat,
function<void()> putLeftFork,
function<void()> putRightFork) {
int l = philosopher, r = (philosopher + 1) % 5;
if (4 == philosopher) {
sem_wait(&forks[r]); // r
sem_wait(&forks[l]); // l
pickRightFork();
pickLeftFork();
} else {
sem_wait(&forks[l]); // l
sem_wait(&forks[r]); // r
pickLeftFork();
pickRightFork();
}
eat();
putLeftFork();
putRightFork();
sem_post(&forks[l]);
sem_post(&forks[r]);
}
};
使用 C++自带的锁也可以解决, 如下: (慢了不少)
class DiningPhilosophers {
mutex m[5];
public:
DiningPhilosophers() {}
void wantsToEat(int philosopher, function<void()> pickLeftFork,
function<void()> pickRightFork, function<void()> eat,
function<void()> putLeftFork,
function<void()> putRightFork) {
int l = philosopher, r = (philosopher + 1) % 5;
if (philosopher & 1) {
m[l].lock();
m[r].lock();
pickRightFork();
pickLeftFork();
} else {
m[r].lock();
m[l].lock();
pickLeftFork();
pickRightFork();
}
eat();
putLeftFork();
putRightFork();
m[l].unlock();
m[r].unlock();
}
};
使用 pthread 的 mutex(更慢…)
#include <pthread.h>
class DiningPhilosophers {
pthread_mutex_t forks[5];
public:
DiningPhilosophers() {
for (int i{}; i < 5; ++i) forks[i] = PTHREAD_MUTEX_INITIALIZER;
}
void wantsToEat(int philosopher, function<void()> pickLeftFork,
function<void()> pickRightFork, function<void()> eat,
function<void()> putLeftFork,
function<void()> putRightFork) {
int l = philosopher, r = (philosopher + 1) % 5;
if (philosopher & 1) { // 奇偶
pthread_mutex_lock(&forks[r]); // r
pthread_mutex_lock(&forks[l]); // l
pickRightFork();
pickLeftFork();
} else {
pthread_mutex_lock(&forks[l]); // l
pthread_mutex_lock(&forks[r]); // r
pickLeftFork();
pickRightFork();
}
eat();
putLeftFork();
putRightFork();
pthread_mutex_unlock(&forks[l]);
pthread_mutex_unlock(&forks[r]);
}
};
常见并发问题
非死锁缺陷
- 违反原子性缺陷
- 违反顺序缺陷
死锁缺陷
条件
- 互斥条件
- 持有并等待条件
- 环路等待条件
- 不可剥夺条件(非抢占条件)
解决
- 避免互斥
- 原子式抢占锁
- 合理调度, 按顺序严格执行(全序)
- 非抢占
- 检查和恢复
基于事务的并发
本质上是 I/O 多路复用与异步 I/O 操作的应用, 但是 Linux 对异步 I/O 的支持并不好, (事实上这也是大多数并发服务器不使用 proactor的原因)