第 29 章 基于锁的并发数据结构

本章讨论如何在常见数据结构中添加锁,使其成为线程安全(thread safe)的并发数据结构,同时兼顾性能。

并发计数器 (Concurrent Counters)

简单但无法扩展的计数器

最简单的并发计数器实现是在每次操作(增加、减少、读取)时加一把互斥锁。

typedef struct counter_t {
    int value;
    pthread_mutex_t lock;
} counter_t;
 
void init(counter_t *c) {
    c->value = 0;
    pthread_mutex_init(&c->lock, NULL);
}
 
void increment(counter_t *c) {
    pthread_mutex_lock(&c->lock);
    c->value++;
    pthread_mutex_unlock(&c->lock);
}
 
void decrement(counter_t *c) {
    pthread_mutex_lock(&c->lock);
    c->value--;
    pthread_mutex_unlock(&c->lock);
}
 
int get(counter_t *c) {
    pthread_mutex_lock(&c->lock);
    int rc = c->value;
    pthread_mutex_unlock(&c->lock);
    return rc;
}

问题:这种方案在多核 CPU 上扩展性极差。随着线程数增加,锁竞争导致 CPU 大量时间消耗在上下文切换上,性能急剧下降。

可扩展的计数器:懒惰计数器 (Sloppy Counter)

懒惰计数器通过局部计数器全局计数器解决扩展性问题:

  • 每个 CPU 核心有一个局部计数器和对应的局部锁。
  • 有一个全局计数器和全局锁。
  • 更新逻辑:线程更新局部计数器(获取局部锁)。
  • 同步逻辑:当局部计数器值达到阈值 (Sloppiness) 时,获取全局锁,将局部值累加到全局计数器,并重置局部计数器。
  • 读取逻辑:获取全局锁,读取全局计数器值(可能是不精确的)。

阈值 的权衡

  • 小:趋近于非扩展计数器,性能差,精度高。
  • 大:扩展性强,性能好,但全局值滞后(精度低)。
typedef struct counter_t {
    int global;             // 全局计数
    pthread_mutex_t glock;  // 全局锁
    int local[NUMCPUS];     // 局部计数 (每个 CPU)
    pthread_mutex_t llock[NUMCPUS]; // ... 及其锁
    int threshold;          // 更新阈值
} counter_t;
 
void init(counter_t *c, int threshold) {
    c->threshold = threshold;
    c->global = 0;
    pthread_mutex_init(&c->glock, NULL);
    for (int i = 0; i < NUMCPUS; i++) {
        c->local[i] = 0;
        pthread_mutex_init(&c->llock[i], NULL);
    }
}
 
void update(counter_t *c, int threadID, int amt) {
    pthread_mutex_lock(&c->llock[threadID]);
    c->local[threadID] += amt;
    if (c->local[threadID] >= c->threshold) { // 传输到全局
        pthread_mutex_lock(&c->glock);
        c->global += c->local[threadID];
        pthread_mutex_unlock(&c->glock);
        c->local[threadID] = 0;
    }
    pthread_mutex_unlock(&c->llock[threadID]);
}
 
int get(counter_t *c) {
    pthread_mutex_lock(&c->glock);
    int val = c->global;
    pthread_mutex_unlock(&c->glock);
    return val; // 只是近似值!
}

并发链表 (Concurrent Linked Lists)

优化锁的范围

基本思路是在插入和查找操作中加锁。为了提高效率和异常安全性(如 malloc 失败),应尽量缩小锁的范围,只包裹临界区

void List_Insert(list_t *L, int key) {
    // 内存分配不需要同步
    node_t *new = malloc(sizeof(node_t));
    if (new == NULL) {
        perror("malloc");
        return;
    }
    new->key = key;
 
    // 只对临界区加锁
    pthread_mutex_lock(&L->lock);
    new->next = L->head;
    L->head = new;
    pthread_mutex_unlock(&L->lock);
}
 
int List_Lookup(list_t *L, int key) {
    int rv = -1;
    pthread_mutex_lock(&L->lock);
    node_t *curr = L->head;
    while (curr) {
        if (curr->key == key) {
            rv = 0;
            break;
        }
        curr = curr->next;
    }
    pthread_mutex_unlock(&L->lock);
    return rv; 
}

过手锁 (Hand-over-hand Locking)

  • 原理:每个节点一把锁。遍历时,先获取下一个节点的锁,再释放当前节点的锁。
  • 缺点:频繁获取/释放锁的开销巨大,通常比单锁方案更慢。

并发队列 (Concurrent Queues)

Michael 和 Scott 设计的并发队列使用了两个锁:

  1. headLock:保护队头,用于出队 (dequeue)。
  2. tailLock:保护队尾,用于入队 (enqueue)。
  3. 假节点 (Dummy Node):初始化时分配一个假节点,分离开头尾操作,使得入队和出队可以并发执行。
typedef struct node_t {
    int value;
    struct node_t *next;
} node_t;
 
typedef struct queue_t {
    node_t *head;
    node_t *tail;
    pthread_mutex_t headLock;
    pthread_mutex_t tailLock;
} queue_t;
 
void Queue_Init(queue_t *q) {
    node_t *tmp = malloc(sizeof(node_t));
    tmp->next = NULL;
    q->head = q->tail = tmp; // 假节点 (Dummy node)
    pthread_mutex_init(&q->headLock, NULL);
    pthread_mutex_init(&q->tailLock, NULL);
}
 
void Queue_Enqueue(queue_t *q, int value) {
    node_t *tmp = malloc(sizeof(node_t));
    assert(tmp != NULL);
    tmp->value = value;
    tmp->next = NULL;
 
    pthread_mutex_lock(&q->tailLock);
    q->tail->next = tmp;
    q->tail = tmp;
    pthread_mutex_unlock(&q->tailLock);
}
 
int Queue_Dequeue(queue_t *q, int *value) {
    pthread_mutex_lock(&q->headLock);
    node_t *tmp = q->head;
    node_t *newHead = tmp->next;
    if (newHead == NULL) {
        pthread_mutex_unlock(&q->headLock);
        return -1; // 队列为空
    }
    *value = newHead->value;
    q->head = newHead;
    pthread_mutex_unlock(&q->headLock);
    free(tmp);
    return 0;
}

并发散列表 (Concurrent Hash Tables)

并发散列表通过分段锁实现高性能:

  • 每个散列桶(Bucket,即链表)有一把锁。
  • 操作不同桶时互不干扰,支持高并发。
#define BUCKETS (101)
 
typedef struct hash_t {
    list_t lists[BUCKETS]; // list_t 包含一个锁
} hash_t;
 
void Hash_Init(hash_t *H) {
    for (int i = 0; i < BUCKETS; i++) {
        List_Init(&H->lists[i]);
    }
}
 
int Hash_Insert(hash_t *H, int key) {
    int bucket = key % BUCKETS;
    return List_Insert(&H->lists[bucket], key);
}
 
int Hash_Lookup(hash_t *H, int key) {
    int bucket = key % BUCKETS;
    return List_Lookup(&H->lists[bucket], key);
}

总结与建议

  1. 避免不成熟的优化 (Knuth 定律):先使用简单的粗粒度锁(大锁)。只有在确认存在性能瓶颈时,才进行细粒度锁或无锁优化。
  2. 控制流与锁:注意函数返回或异常退出时必须释放锁。
  3. 并发不一定更快:如果锁的开销超过了并行带来的收益,性能反而会下降。