本章讨论如何在常见数据结构中添加锁,使其成为线程安全(thread safe)的并发数据结构,同时兼顾性能。
并发计数器 (Concurrent Counters)
简单但无法扩展的计数器
最简单的并发计数器实现是在每次操作(增加、减少、读取)时加一把互斥锁。
typedef struct counter_t {
int value;
pthread_mutex_t lock;
} counter_t;
void init(counter_t *c) {
c->value = 0;
pthread_mutex_init(&c->lock, NULL);
}
void increment(counter_t *c) {
pthread_mutex_lock(&c->lock);
c->value++;
pthread_mutex_unlock(&c->lock);
}
void decrement(counter_t *c) {
pthread_mutex_lock(&c->lock);
c->value--;
pthread_mutex_unlock(&c->lock);
}
int get(counter_t *c) {
pthread_mutex_lock(&c->lock);
int rc = c->value;
pthread_mutex_unlock(&c->lock);
return rc;
}问题:这种方案在多核 CPU 上扩展性极差。随着线程数增加,锁竞争导致 CPU 大量时间消耗在上下文切换上,性能急剧下降。
可扩展的计数器:懒惰计数器 (Sloppy Counter)
懒惰计数器通过局部计数器和全局计数器解决扩展性问题:
- 每个 CPU 核心有一个局部计数器和对应的局部锁。
- 有一个全局计数器和全局锁。
- 更新逻辑:线程更新局部计数器(获取局部锁)。
- 同步逻辑:当局部计数器值达到阈值 (Sloppiness) 时,获取全局锁,将局部值累加到全局计数器,并重置局部计数器。
- 读取逻辑:获取全局锁,读取全局计数器值(可能是不精确的)。
阈值 的权衡:
- 小:趋近于非扩展计数器,性能差,精度高。
- 大:扩展性强,性能好,但全局值滞后(精度低)。
typedef struct counter_t {
int global; // 全局计数
pthread_mutex_t glock; // 全局锁
int local[NUMCPUS]; // 局部计数 (每个 CPU)
pthread_mutex_t llock[NUMCPUS]; // ... 及其锁
int threshold; // 更新阈值
} counter_t;
void init(counter_t *c, int threshold) {
c->threshold = threshold;
c->global = 0;
pthread_mutex_init(&c->glock, NULL);
for (int i = 0; i < NUMCPUS; i++) {
c->local[i] = 0;
pthread_mutex_init(&c->llock[i], NULL);
}
}
void update(counter_t *c, int threadID, int amt) {
pthread_mutex_lock(&c->llock[threadID]);
c->local[threadID] += amt;
if (c->local[threadID] >= c->threshold) { // 传输到全局
pthread_mutex_lock(&c->glock);
c->global += c->local[threadID];
pthread_mutex_unlock(&c->glock);
c->local[threadID] = 0;
}
pthread_mutex_unlock(&c->llock[threadID]);
}
int get(counter_t *c) {
pthread_mutex_lock(&c->glock);
int val = c->global;
pthread_mutex_unlock(&c->glock);
return val; // 只是近似值!
}并发链表 (Concurrent Linked Lists)
优化锁的范围
基本思路是在插入和查找操作中加锁。为了提高效率和异常安全性(如 malloc 失败),应尽量缩小锁的范围,只包裹临界区。
void List_Insert(list_t *L, int key) {
// 内存分配不需要同步
node_t *new = malloc(sizeof(node_t));
if (new == NULL) {
perror("malloc");
return;
}
new->key = key;
// 只对临界区加锁
pthread_mutex_lock(&L->lock);
new->next = L->head;
L->head = new;
pthread_mutex_unlock(&L->lock);
}
int List_Lookup(list_t *L, int key) {
int rv = -1;
pthread_mutex_lock(&L->lock);
node_t *curr = L->head;
while (curr) {
if (curr->key == key) {
rv = 0;
break;
}
curr = curr->next;
}
pthread_mutex_unlock(&L->lock);
return rv;
}过手锁 (Hand-over-hand Locking)
- 原理:每个节点一把锁。遍历时,先获取下一个节点的锁,再释放当前节点的锁。
- 缺点:频繁获取/释放锁的开销巨大,通常比单锁方案更慢。
并发队列 (Concurrent Queues)
Michael 和 Scott 设计的并发队列使用了两个锁:
- headLock:保护队头,用于出队 (dequeue)。
- tailLock:保护队尾,用于入队 (enqueue)。
- 假节点 (Dummy Node):初始化时分配一个假节点,分离开头尾操作,使得入队和出队可以并发执行。
typedef struct node_t {
int value;
struct node_t *next;
} node_t;
typedef struct queue_t {
node_t *head;
node_t *tail;
pthread_mutex_t headLock;
pthread_mutex_t tailLock;
} queue_t;
void Queue_Init(queue_t *q) {
node_t *tmp = malloc(sizeof(node_t));
tmp->next = NULL;
q->head = q->tail = tmp; // 假节点 (Dummy node)
pthread_mutex_init(&q->headLock, NULL);
pthread_mutex_init(&q->tailLock, NULL);
}
void Queue_Enqueue(queue_t *q, int value) {
node_t *tmp = malloc(sizeof(node_t));
assert(tmp != NULL);
tmp->value = value;
tmp->next = NULL;
pthread_mutex_lock(&q->tailLock);
q->tail->next = tmp;
q->tail = tmp;
pthread_mutex_unlock(&q->tailLock);
}
int Queue_Dequeue(queue_t *q, int *value) {
pthread_mutex_lock(&q->headLock);
node_t *tmp = q->head;
node_t *newHead = tmp->next;
if (newHead == NULL) {
pthread_mutex_unlock(&q->headLock);
return -1; // 队列为空
}
*value = newHead->value;
q->head = newHead;
pthread_mutex_unlock(&q->headLock);
free(tmp);
return 0;
}并发散列表 (Concurrent Hash Tables)
并发散列表通过分段锁实现高性能:
- 每个散列桶(Bucket,即链表)有一把锁。
- 操作不同桶时互不干扰,支持高并发。
#define BUCKETS (101)
typedef struct hash_t {
list_t lists[BUCKETS]; // list_t 包含一个锁
} hash_t;
void Hash_Init(hash_t *H) {
for (int i = 0; i < BUCKETS; i++) {
List_Init(&H->lists[i]);
}
}
int Hash_Insert(hash_t *H, int key) {
int bucket = key % BUCKETS;
return List_Insert(&H->lists[bucket], key);
}
int Hash_Lookup(hash_t *H, int key) {
int bucket = key % BUCKETS;
return List_Lookup(&H->lists[bucket], key);
}总结与建议
- 避免不成熟的优化 (Knuth 定律):先使用简单的粗粒度锁(大锁)。只有在确认存在性能瓶颈时,才进行细粒度锁或无锁优化。
- 控制流与锁:注意函数返回或异常退出时必须释放锁。
- 并发不一定更快:如果锁的开销超过了并行带来的收益,性能反而会下降。