信号量(Semaphore)是 Dijkstra 引入的同步原语,可用作锁和条件变量。
信号量的定义
信号量是一个拥有整数值的对象,通过两个函数操作:sem_wait() 和 sem_post()。
#include <semaphore.h>
sem_t s;
sem_init(&s, 0, 1); // 第二个参数0表示同一进程多线程共享,第三个参数1是初始值int sem_wait(sem_t *s) {
// 将信号量 s 的值减 1
// 如果 s 的值为负数,则等待
}
int sem_post(sem_t *s) {
// 将信号量 s 的值加 1
// 如果有等待的线程,唤醒其中一个
}二值信号量(锁)
sem_t m;
sem_init(&m, 0, 1); // 初始化为 1
sem_wait(&m);
// 临界区
sem_post(&m);双线程竞争锁的场景追踪:
| 信号量的值 | 线程 0 | 线程 1 |
|---|---|---|
| 1 | ||
| 1 | 调用 sem_wait() | |
| 0 | sem_wait() 返回 | |
| 0 | (进入临界区) | |
| 0 | 调用 sem_wait() | |
| -1 | sem 减 1,睡眠等待 | |
| -1 | 调用 sem_post() | |
| 0 | 增加 sem,唤醒线程 1 | |
| 0 | sem_post() 返回 | 就绪并返回 |
| 0 | (进入临界区) | |
| 1 | 调用 sem_post() 并返回 |
信号量用作条件变量
信号量可用于等待某一条件成立(如父线程等待子线程结束)。此时信号量初始值应设为 0。
sem_t s;
void *child(void *arg) {
printf("child\n");
sem_post(&s); // 子线程完成,发信号
return NULL;
}
int main(int argc, char *argv[]) {
sem_init(&s, 0, 0); // 初始值为 0
printf("parent: begin\n");
pthread_t c;
pthread_create(&c, NULL, child, NULL);
sem_wait(&s); // 等待子线程
printf("parent: end\n");
return 0;
}- 场景 1(父线程先运行):父线程调用
sem_wait(),信号量减为 -1 并睡眠。子线程运行后调用sem_post(),信号量增为 0 并唤醒父线程。 - 场景 2(子线程先运行):子线程调用
sem_post(),信号量增为 1。父线程后续调用sem_wait()时,信号量减为 0 并直接返回,无需等待。
生产者/消费者(有界缓冲区)问题
使用 empty 和 full 两个信号量分别表示空缓冲区和满缓冲区的数量。
初次尝试(无互斥,存在竞态条件)
int buffer[MAX];
int fill = 0, use = 0;
void put(int value) {
buffer[fill] = value;
fill = (fill + 1) % MAX;
}
int get() {
int tmp = buffer[use];
use = (use + 1) % MAX;
return tmp;
}
sem_t empty;
sem_t full;
void *producer(void *arg) {
for (int i = 0; i < loops; i++) {
sem_wait(&empty);
put(i);
sem_post(&full);
}
}
void *consumer(void *arg) {
int tmp = 0;
while (tmp != -1) {
sem_wait(&full);
tmp = get();
sem_post(&empty);
printf("%d\n", tmp);
}
}
int main() {
sem_init(&empty, 0, MAX); // 初始有 MAX 个空缓冲区
sem_init(&full, 0, 0); // 初始有 0 个满缓冲区
}当 MAX > 1 且存在多个生产者时,并发调用 put() 会导致数据覆盖。需要增加互斥锁。
增加互斥量(死锁版本)
如果在 put()/get() 外层直接加锁:
sem_wait(&mutex);
sem_wait(&empty);
put(i);
sem_post(&full);
sem_post(&mutex);死锁原因:消费者先获取 mutex,然后等待 full(睡眠并持有 mutex)。生产者想要生产数据唤醒消费者,但由于 mutex 被消费者持有而阻塞。形成循环等待。
最终可行方案(缩小锁的作用域)
仅在操作缓冲区的临界区加锁。
sem_t empty, full, mutex;
void *producer(void *arg) {
for (int i = 0; i < loops; i++) {
sem_wait(&empty);
sem_wait(&mutex); // 仅保护临界区
put(i);
sem_post(&mutex);
sem_post(&full);
}
}
void *consumer(void *arg) {
for (int i = 0; i < loops; i++) {
sem_wait(&full);
sem_wait(&mutex); // 仅保护临界区
int tmp = get();
sem_post(&mutex);
sem_post(&empty);
printf("%d\n", tmp);
}
}
int main() {
sem_init(&empty, 0, MAX);
sem_init(&full, 0, 0);
sem_init(&mutex, 0, 1); // 互斥量初始为 1
}读者—写者锁
针对并发数据结构(如链表),允许多个读者并发访问,但写者必须独占访问。
typedef struct _rwlock_t {
sem_t lock; // 保护 readers 计数器
sem_t writelock; // 允许一个写者或多个读者
int readers; // 临界区内的读者数量
} rwlock_t;
void rwlock_init(rwlock_t *rw) {
rw->readers = 0;
sem_init(&rw->lock, 0, 1);
sem_init(&rw->writelock, 0, 1);
}
void rwlock_acquire_readlock(rwlock_t *rw) {
sem_wait(&rw->lock);
rw->readers++;
if (rw->readers == 1)
sem_wait(&rw->writelock); // 第一个读者获取写锁
sem_post(&rw->lock);
}
void rwlock_release_readlock(rwlock_t *rw) {
sem_wait(&rw->lock);
rw->readers--;
if (rw->readers == 0)
sem_post(&rw->writelock); // 最后一个读者释放写锁
sem_post(&rw->lock);
}
void rwlock_acquire_writelock(rwlock_t *rw) {
sem_wait(&rw->writelock);
}
void rwlock_release_writelock(rwlock_t *rw) {
sem_post(&rw->writelock);
}- 机制:第一个读者获取
writelock,阻止写者进入;后续读者直接进入。最后一个退出的读者释放writelock,允许写者进入。 - 缺陷:存在公平性问题,读者容易饿死写者。
哲学家就餐问题
5 位哲学家围坐圆桌,每两人之间有一把餐叉。哲学家必须同时拿到左右两把餐叉才能就餐。
辅助函数:
int left(int p) { return p; }
int right(int p) { return (p + 1) % 5; }有问题的解决方案(死锁)
sem_t forks[5]; // 初始化为 1
void getforks(int p) {
sem_wait(&forks[left(p)]);
sem_wait(&forks[right(p)]);
}
void putforks(int p) {
sem_post(&forks[left(p)]);
sem_post(&forks[right(p)]);
}死锁原因:当所有哲学家同时拿起左手边的餐叉时,所有餐叉被占有,所有人都在等待右手边的餐叉。
解决方案:破除依赖
改变最后一个哲学家的取餐叉顺序,打破循环等待。
void getforks(int p) {
if (p == 4) {
sem_wait(&forks[right(p)]); // 先拿右手边
sem_wait(&forks[left(p)]);
} else {
sem_wait(&forks[left(p)]); // 先拿左手边
sem_wait(&forks[right(p)]);
}
}如何实现信号量
使用底层的同步原语(锁和条件变量)实现信号量(Zemaphore)。
typedef struct _Zem_t {
int value;
pthread_cond_t cond;
pthread_mutex_t lock;
} Zem_t;
void Zem_init(Zem_t *s, int value) {
s->value = value;
Cond_init(&s->cond);
Mutex_init(&s->lock);
}
void Zem_wait(Zem_t *s) {
Mutex_lock(&s->lock);
while (s->value <= 0)
Cond_wait(&s->cond, &s->lock);
s->value--;
Mutex_unlock(&s->lock);
}
void Zem_post(Zem_t *s) {
Mutex_lock(&s->lock);
s->value++;
Cond_signal(&s->cond);
Mutex_unlock(&s->lock);
}- 细微区别:该实现与 Dijkstra 定义的区别在于,当信号量为负数时,其值不反映等待的线程数(值永远不会小于 0)。这更易于实现且符合 Linux 现有的行为。