Skip to content

同步与并发

为什么驱动需要同步

Linux 内核是高度并发的环境,驱动代码可能同时被以下路径访问:

  • 多个 CPU 核心上的进程同时调用驱动接口
  • 中断处理程序与进程上下文并发执行
  • 内核抢占导致同一 CPU 上的任务切换
  • SMP(对称多处理)系统上的真正并行

不加保护的共享数据会导致竞态条件(Race Condition),引发数据损坏、系统崩溃。

原子操作

最轻量的同步原语,适合简单计数器:

c
#include <linux/atomic.h>

atomic_t ref_count = ATOMIC_INIT(0);

atomic_inc(&ref_count);           /* +1 */
atomic_dec(&ref_count);           /* -1 */
int val = atomic_read(&ref_count);/* 读取 */
atomic_set(&ref_count, 5);        /* 设置 */

/* 原子加减并返回结果 */
int new_val = atomic_add_return(3, &ref_count);

/* 比较并交换(CAS) */
int old = atomic_cmpxchg(&ref_count, expected, new_val);

/* 64 位版本 */
atomic64_t counter = ATOMIC64_INIT(0);

自旋锁(Spinlock)

适用于中断上下文持锁时间极短的场景,等待时 CPU 忙等(spin):

c
#include <linux/spinlock.h>

spinlock_t my_lock;
spin_lock_init(&my_lock);

/* 进程上下文 */
spin_lock(&my_lock);
/* 临界区 */
spin_unlock(&my_lock);

/* 中断上下文:禁用本地中断,防止死锁 */
unsigned long flags;
spin_lock_irqsave(&my_lock, flags);
/* 临界区 */
spin_unlock_irqrestore(&my_lock, flags);

/* 仅禁用中断(不保存状态,适合已知中断状态的场景) */
spin_lock_irq(&my_lock);
spin_unlock_irq(&my_lock);

黄金规则:持有自旋锁期间,不能睡眠,不能调用任何可能睡眠的函数(如 kmalloc(GFP_KERNEL)copy_from_user())。

读写自旋锁

允许多个读者并发,写者独占:

c
#include <linux/rwlock.h>

rwlock_t rw_lock;
rwlock_init(&rw_lock);

/* 读者 */
read_lock(&rw_lock);
/* 读操作 */
read_unlock(&rw_lock);

/* 写者 */
write_lock_irqsave(&rw_lock, flags);
/* 写操作 */
write_unlock_irqrestore(&rw_lock, flags);

互斥锁(Mutex)

适用于进程上下文,等待时进程睡眠(不浪费 CPU):

c
#include <linux/mutex.h>

struct mutex my_mutex;
mutex_init(&my_mutex);

/* 加锁(可被信号中断则用 mutex_lock_interruptible) */
mutex_lock(&my_mutex);
/* 临界区(可以睡眠) */
mutex_unlock(&my_mutex);

/* 可被信号中断的加锁 */
if (mutex_lock_interruptible(&my_mutex))
    return -ERESTARTSYS;

/* 尝试加锁(非阻塞) */
if (mutex_trylock(&my_mutex)) {
    /* 成功获取锁 */
    mutex_unlock(&my_mutex);
}

Mutex vs Spinlock 选择:

场景选择
中断上下文Spinlock(mutex 不可用)
持锁时间 < 几十微秒Spinlock
持锁时间较长 / 需要睡眠Mutex
需要读写分离rwlock / rwsem

信号量(Semaphore)

计数型同步原语,允许多个并发访问者:

c
#include <linux/semaphore.h>

struct semaphore sem;
sema_init(&sem, 1);   /* 初始值 1 = 二值信号量(等同 mutex) */

down(&sem);           /* P 操作,计数 -1,为 0 时阻塞 */
/* 临界区 */
up(&sem);             /* V 操作,计数 +1 */

/* 可中断版本 */
if (down_interruptible(&sem))
    return -ERESTARTSYS;

现代内核推荐优先使用 mutex,信号量主要用于需要计数语义的场景。

RCU(Read-Copy-Update)

适合读多写少的场景,读者完全无锁,性能极高:

c
#include <linux/rcupdate.h>

/* 读者 */
rcu_read_lock();
struct my_data *p = rcu_dereference(global_ptr);
/* 使用 p,不能睡眠 */
rcu_read_unlock();

/* 写者:分配新对象,替换指针,等待旧读者完成 */
struct my_data *new_data = kmalloc(sizeof(*new_data), GFP_KERNEL);
/* 填充 new_data */
struct my_data *old_data = rcu_dereference_protected(global_ptr, lockdep_is_held(&my_lock));
rcu_assign_pointer(global_ptr, new_data);
synchronize_rcu();    /* 等待所有已有读者退出 RCU 临界区 */
kfree(old_data);

RCU 广泛用于内核网络路由表、进程列表等高频读取的数据结构。

完成量(Completion)

用于等待某个事件完成,比信号量语义更清晰:

c
#include <linux/completion.h>

struct completion data_ready;
init_completion(&data_ready);

/* 等待方(进程上下文) */
wait_for_completion(&data_ready);
/* 或带超时 */
if (!wait_for_completion_timeout(&data_ready, msecs_to_jiffies(1000)))
    return -ETIMEDOUT;

/* 触发方(可在中断中调用) */
complete(&data_ready);
complete_all(&data_ready);   /* 唤醒所有等待者 */

典型用法:驱动初始化时等待硬件就绪,中断处理函数中 complete()

等待队列(Wait Queue)

更灵活的睡眠等待机制,支持条件等待:

c
#include <linux/wait.h>

DECLARE_WAIT_QUEUE_HEAD(my_wq);
/* 或动态初始化 */
wait_queue_head_t my_wq;
init_waitqueue_head(&my_wq);

/* 等待条件成立(进程上下文) */
wait_event(my_wq, condition);
wait_event_interruptible(my_wq, condition);
wait_event_timeout(my_wq, condition, timeout);

/* 唤醒 */
wake_up(&my_wq);
wake_up_interruptible(&my_wq);

字符设备驱动的 read() 通常这样实现:

c
static ssize_t my_read(struct file *f, char __user *buf, size_t len, loff_t *off)
{
    if (wait_event_interruptible(my_wq, data_available))
        return -ERESTARTSYS;
    /* 拷贝数据到用户空间 */
    return copy_to_user(buf, kbuf, len) ? -EFAULT : len;
}

每 CPU 变量

消除 CPU 间竞争的终极方案——每个 CPU 有独立副本:

c
#include <linux/percpu.h>

DEFINE_PER_CPU(int, my_counter);

/* 访问(需禁止抢占) */
int val = get_cpu_var(my_counter);
get_cpu_var(my_counter)++;
put_cpu_var(my_counter);

/* 或使用 this_cpu 系列(已在禁止抢占的上下文中) */
this_cpu_inc(my_counter);

lockdep:锁依赖检测

内核内置的死锁检测工具(CONFIG_LOCKDEP=y),运行时检测:

  • 锁的获取顺序是否一致(防止 ABBA 死锁)
  • 是否在中断上下文中使用了不安全的锁
  • 是否重复加锁
bash
# 死锁时 dmesg 会打印完整的锁依赖链
dmesg | grep -A 50 "possible circular locking"

同步原语选择速查

需要在中断上下文使用?
    ├── 是 → spinlock_irqsave / atomic
    └── 否 → 持锁时间短且不睡眠?
                ├── 是 → spinlock
                └── 否 → 读多写少?
                            ├── 是 → RCU / rwsem
                            └── 否 → mutex

褚成志的笔记