第 31 章 信号量

信号量Semaphore)是 Dijkstra 引入的同步原语,可用作锁和条件变量。

信号量的定义

信号量是一个拥有整数值的对象,通过两个函数操作:sem_wait()sem_post()

#include <semaphore.h>
sem_t s;
sem_init(&s, 0, 1); // 第二个参数0表示同一进程多线程共享,第三个参数1是初始值
int sem_wait(sem_t *s) {
    // 将信号量 s 的值减 1
    // 如果 s 的值为负数,则等待
}
 
int sem_post(sem_t *s) {
    // 将信号量 s 的值加 1
    // 如果有等待的线程,唤醒其中一个
}
  • sem_wait():值 时立即返回;否则挂起调用线程。
  • sem_post():直接增加信号量的值,并唤醒一个等待线程。
  • 负值含义:当信号量的值为负数时,其绝对值等于等待线程的个数。

二值信号量(锁)

信号量初始值设为 1,即可作为锁(二值信号量)使用。

sem_t m;
sem_init(&m, 0, 1); // 初始化为 1
 
sem_wait(&m);
// 临界区
sem_post(&m);

双线程竞争锁的场景追踪

信号量的值线程 0线程 1
1
1调用 sem_wait()
0sem_wait() 返回
0(进入临界区
0调用 sem_wait()
-1sem 减 1,睡眠等待
-1调用 sem_post()
0增加 sem,唤醒线程 1
0sem_post() 返回就绪并返回
0(进入临界区
1调用 sem_post() 并返回

信号量用作条件变量

信号量可用于等待某一条件成立(如父线程等待子线程结束)。此时信号量初始值应设为 0

sem_t s;
 
void *child(void *arg) {
    printf("child\n");
    sem_post(&s); // 子线程完成,发信号
    return NULL;
}
 
int main(int argc, char *argv[]) {
    sem_init(&s, 0, 0); // 初始值为 0
    printf("parent: begin\n");
    pthread_t c;
    pthread_create(&c, NULL, child, NULL);
    sem_wait(&s); // 等待子线程
    printf("parent: end\n");
    return 0;
}
  • 场景 1(父线程先运行):父线程调用 sem_wait()信号量减为 -1 并睡眠。子线程运行后调用 sem_post()信号量增为 0 并唤醒父线程。
  • 场景 2(子线程先运行):子线程调用 sem_post()信号量增为 1。父线程后续调用 sem_wait() 时,信号量减为 0 并直接返回,无需等待。

生产者/消费者(有界缓冲区)问题

使用 emptyfull 两个信号量分别表示空缓冲区和满缓冲区的数量。

初次尝试(无互斥,存在竞态条件)

int buffer[MAX];
int fill = 0, use = 0;
 
void put(int value) {
    buffer[fill] = value;
    fill = (fill + 1) % MAX;
}
 
int get() {
    int tmp = buffer[use];
    use = (use + 1) % MAX;
    return tmp;
}
 
sem_t empty;
sem_t full;
 
void *producer(void *arg) {
    for (int i = 0; i < loops; i++) {
        sem_wait(&empty);
        put(i);
        sem_post(&full);
    }
}
 
void *consumer(void *arg) {
    int tmp = 0;
    while (tmp != -1) {
        sem_wait(&full);
        tmp = get();
        sem_post(&empty);
        printf("%d\n", tmp);
    }
}
 
int main() {
    sem_init(&empty, 0, MAX); // 初始有 MAX 个空缓冲区
    sem_init(&full, 0, 0);    // 初始有 0 个满缓冲区
}

MAX > 1 且存在多个生产者时,并发调用 put() 会导致数据覆盖。需要增加互斥锁。

增加互斥量(死锁版本)

如果在 put()/get() 外层直接加锁:

sem_wait(&mutex);
sem_wait(&empty);
put(i);
sem_post(&full);
sem_post(&mutex);

死锁原因:消费者先获取 mutex,然后等待 full(睡眠并持有 mutex)。生产者想要生产数据唤醒消费者,但由于 mutex 被消费者持有而阻塞。形成循环等待。

最终可行方案(缩小锁的作用域)

仅在操作缓冲区的临界区加锁。

sem_t empty, full, mutex;
 
void *producer(void *arg) {
    for (int i = 0; i < loops; i++) {
        sem_wait(&empty);
        sem_wait(&mutex); // 仅保护临界区
        put(i);
        sem_post(&mutex);
        sem_post(&full);
    }
}
 
void *consumer(void *arg) {
    for (int i = 0; i < loops; i++) {
        sem_wait(&full);
        sem_wait(&mutex); // 仅保护临界区
        int tmp = get();
        sem_post(&mutex);
        sem_post(&empty);
        printf("%d\n", tmp);
    }
}
 
int main() {
    sem_init(&empty, 0, MAX);
    sem_init(&full, 0, 0);
    sem_init(&mutex, 0, 1); // 互斥量初始为 1
}

读者—写者锁

针对并发数据结构(如链表),允许多个读者并发访问,但写者必须独占访问。

typedef struct _rwlock_t {
    sem_t lock;      // 保护 readers 计数器
    sem_t writelock; // 允许一个写者或多个读者
    int readers;     // 临界区内的读者数量
} rwlock_t;
 
void rwlock_init(rwlock_t *rw) {
    rw->readers = 0;
    sem_init(&rw->lock, 0, 1);
    sem_init(&rw->writelock, 0, 1);
}
 
void rwlock_acquire_readlock(rwlock_t *rw) {
    sem_wait(&rw->lock);
    rw->readers++;
    if (rw->readers == 1)
        sem_wait(&rw->writelock); // 第一个读者获取写锁
    sem_post(&rw->lock);
}
 
void rwlock_release_readlock(rwlock_t *rw) {
    sem_wait(&rw->lock);
    rw->readers--;
    if (rw->readers == 0)
        sem_post(&rw->writelock); // 最后一个读者释放写锁
    sem_post(&rw->lock);
}
 
void rwlock_acquire_writelock(rwlock_t *rw) {
    sem_wait(&rw->writelock);
}
 
void rwlock_release_writelock(rwlock_t *rw) {
    sem_post(&rw->writelock);
}
  • 机制:第一个读者获取 writelock,阻止写者进入;后续读者直接进入。最后一个退出的读者释放 writelock,允许写者进入。
  • 缺陷:存在公平性问题,读者容易饿死写者。

哲学家就餐问题

5 位哲学家围坐圆桌,每两人之间有一把餐叉。哲学家必须同时拿到左右两把餐叉才能就餐。

辅助函数:

int left(int p)  { return p; }
int right(int p) { return (p + 1) % 5; }

有问题的解决方案(死锁)

sem_t forks[5]; // 初始化为 1
 
void getforks(int p) {
    sem_wait(&forks[left(p)]);
    sem_wait(&forks[right(p)]);
}
 
void putforks(int p) {
    sem_post(&forks[left(p)]);
    sem_post(&forks[right(p)]);
}

死锁原因:当所有哲学家同时拿起左手边的餐叉时,所有餐叉被占有,所有人都在等待右手边的餐叉。

解决方案:破除依赖

改变最后一个哲学家的取餐叉顺序,打破循环等待。

void getforks(int p) {
    if (p == 4) {
        sem_wait(&forks[right(p)]); // 先拿右手边
        sem_wait(&forks[left(p)]);
    } else {
        sem_wait(&forks[left(p)]);  // 先拿左手边
        sem_wait(&forks[right(p)]);
    }
}

如何实现信号量

使用底层的同步原语(锁和条件变量)实现信号量(Zemaphore)。

typedef struct _Zem_t {
    int value;
    pthread_cond_t cond;
    pthread_mutex_t lock;
} Zem_t;
 
void Zem_init(Zem_t *s, int value) {
    s->value = value;
    Cond_init(&s->cond);
    Mutex_init(&s->lock);
}
 
void Zem_wait(Zem_t *s) {
    Mutex_lock(&s->lock);
    while (s->value <= 0)
        Cond_wait(&s->cond, &s->lock);
    s->value--;
    Mutex_unlock(&s->lock);
}
 
void Zem_post(Zem_t *s) {
    Mutex_lock(&s->lock);
    s->value++;
    Cond_signal(&s->cond);
    Mutex_unlock(&s->lock);
}
  • 细微区别:该实现与 Dijkstra 定义的区别在于,当信号量为负数时,其值不反映等待的线程数(值永远不会小于 0)。这更易于实现且符合 Linux 现有的行为。