1.glibc读写锁的原理
加读锁: int __pthread_rwlock_rdlock(pthread_rwlock_t* rwlock); (nptl/pthread_rwlock_rdlock.c) 加写锁: int __pthread_rwlock_wrlock(pthread_rwlock_t* rwlock) (nptl/pthread_rwlock_wrlock.c) 解锁: int __pthread_rwlock_unlock(pthread_rwlock_t* rwlock) (nptl/pthread_rwlock_wrlock.c)
默认读写锁是读优先的,如果有写者在等待锁,不会阻塞读者。 如果配置为写优先,如果有写者在等待锁,所有的后续的读者加锁都会阻塞。 但是当占有读者锁的读者数为0时,当有写者在等待时,会优先唤醒写者。 当配置为读者优先时,会出现写者饿死的情况 当配置为写者优先时,会出现读者饿死的情况 读写锁适用于读多写少的场景
1)加读锁源码
int __pthread_rwlock_rdlock(pthread_rwlock_t* rwlock) { ...//进入临界区 lll_lock (rwlock->__data.__lock, rwlock->__data.__shared); while (1) { if (rwlock->__data.__writer == 0 && (!rwlock->__data.__nr_writers_queued || PTHREAD_RWLOCK_PREFER_READER_P (rwlock))) {//当写者数为0,或者有写者在等待锁,但是配置的是读优先,则可以抢占,否则阻塞读者 if (__builtin_expect (++rwlock->__data.__nr_readers == 0, 0)) {//读者数+1,当超过上限溢出时,则返回EAGAIN --rwlock->__data.__nr_readers; result = EAGAIN; break; } else LIBC_PROBE (rdlock_acquire_read, 1, rwlock); break; } .... if (__buildtin_expect (++rwlock->__data.__nr_readers_queued == 0, 0)) //排队读者数+1 {//读者等待数+1,当溢出时,则返回EAGAIN --rwlock->__data.__nr_readers_queued; result = EAGAIN; break; } .... lll_unlock (rwlock->__data.__lock, rwlock->__data.__shared); lll_futex_wait(&rwlock->__data.__readers_wakeup, waitval, rwlock->__data.shared); //等待唤醒 lll_lock (rwlock->__data.__lock, rwlock->__data.__shared); --rwlock->__data.__nr_readers_queued; //排队读者数-1 }//end while lll_unlock (rwlock->__data.__lock, rwlock->__data.__shared); }
2)加写锁源码
int __pthread_rwlock_wrlock (pthread_rwlock_t* rwlock) { ..... lll_lock (rwlock->__data.__lock, rwlock->__data.__shared); while (1) { if (rwlock->__data.__writer == 0 && rwlock->__data.__nr_readers == 0) {//写者数为0,读者数为0,加写锁成功 LIBC_PROBE (wrlock_acquire_write, 1, rwlock); break; } ..... if (++rwlock->__data.__nr_writers_queued == 0) {//写者等待数+1,当溢出时,则返回EAGAIN --rwlock->__data.__nr_writers_queued; result = EAGAIN; break; } lll_unlock (rwlock->__data.__lock, rwlock->__data.__shared); lll_futex_wait(&rwlock->__data.__readers_wakeup, waitval, rwlock->__data.shared); //等待唤醒 lll_lock (rwlock->__data.__lock, rwlock->__data.__shared); --rwlock->__data.__nr_writers_queued; //排队写者数-1 } lll_unlock (rwlock->__data.__lock, rwlock->__data.__shared); }
3)解锁源码
int __pthread_rwlock_unlock (pthread_rwlock_t* rwlock) { ...... lll_lock (rwlock->__data.__lock, rwlock->__data.__shared); if (rwlock->__data.__writer) //写者数不为0 rwlock->__data.__writer = 0; else //读者数不为0 --rwlock->__data.__nr_readers; if (rwlock->__data.__nr_readers == 0) //没有读者 { if (rwlock->__data.__nr_writers_queued) //有写者在等待 { ++rwlock->__data.__writer_wakeup; //唤醒写者 lll_unlock (rwlock->__data.__lock, rwlock->__data.__shared); lll_futex_wake (&rwlock->__data.__writer_wakeup, 1, rwlock->__data.__shared); return 0; } else if (rwlock->__data.__nr_readers_queued) //有读者等待 { ++rwlock->__data.__readers_wakeup; //唤醒读者 lll_unlock (rwlock->__data.__lock, rwlock->__data.__shared); lll_futex_wake (&rwlock->__data.__readers_wakeup, INT_MAX, rwlock->__data.__shared); return 0; } } lll_unlock (rwlock->__data.__lock, rwlock->__data.__shared); }
2.内核读写自旋锁
linux内核的读写自旋锁也是读优先的。 以2.6.18内核的实现为例
1)加读锁: int __lockfunc generic__raw_read_trylock(raw_rwlock_t* lock) |—>__raw_read_lock(lock) |—>__build_read_lock(rw, "__read_lock_failed"); |—>__build_read_lock_const(rw, "__read_lock_failed") 当lock初始化时,初始化为RW_LOCK_BIAS(0×01000000),所以内核读写锁最多支持001000000个读者) __build_read_lock_const实现:
#define __build_read_lock_const(rw, helper) \ asm volatile(LOCK_PREFIX " subl $1,%0\n\t" \ //锁总线,rw-1 "jns 1f\n" \ 如果非负,加读锁成功,跳转到1(因为加写锁时,会直接rw - RW_LOCK_BIAS,所以再减1会变为负数) "pushl %%eax\n\t" \ "leal %0,%%eax\n\t" \ "call " helper "\n\t" \ 如果为负,加读锁失败,跳转到__read_lock_failed "popl %%eax\n\t" \ "1:\n" \ :"+m" (*(volatile int *)rw) : : "memory")
__read_lock_failed实现
asm(".section .sched.text\n"".align 4\n"".globl __read_lock_failed\n""__read_lock_failed:\n\t" LOCK_PREFIX "incl (%eax)\n""1: rep; nop\n\t" "cmpl $1,(%eax)\n\t" //循环等待,直到rw变为0x01000000 "js 1b\n\t" LOCK_PREFIX "decl (%eax)\n\t" "js __read_lock_failed\n\t" "ret");
2)解读锁:_read_unlock()|—>__raw_read_unlock()
static inline void __raw_read_unlock(raw_rwlock_t *rw){ asm volatile(LOCK_PREFIX "incl %0" :"+m" (rw->lock) : : "memory");}
3)加写锁:__write_trylock(rwlock_t* lock)|—>__raw_write_lock(lock)
|—>__build_write_lock(rw, "__write_lock_failed")
|—>__build_write_lock_const(rw, "__writer_lock_failed");
__build_write_lock_const实现
#define __build_write_lock_const(rw, helper) \ asm volatile(LOCK_PREFIX " subl $" RW_LOCK_BIAS_STR ",%0\n\t" \ //rw - RW_LOCK_BIAS "jz 1f\n" \ //等于0,加锁成功 "pushl %%eax\n\t" \ "leal %0,%%eax\n\t" \ "call " helper "\n\t" \ 不成功,跳转__writer_lock_failed "popl %%eax\n\t" \ "1:\n" \ :"+m" (*(volatile int *)rw) : : "memory")
__write_lock_failed实现
asm(".section .sched.text\n"".align 4\n"".globl __write_lock_failed\n""__write_lock_failed:\n\t" LOCK_PREFIX "addl $" RW_LOCK_BIAS_STR ",(%eax)\n""1: rep; nop\n\t" "cmpl $" RW_LOCK_BIAS_STR ",(%eax)\n\t" "jne 1b\n\t" LOCK_PREFIX "subl $" RW_LOCK_BIAS_STR ",(%eax)\n\t" //直到rw-RW_LOCK_BIAS_STR=0,加锁成功,否则循环行等待 "jnz __write_lock_failed\n\t" "ret");
4)解写锁:
_write_unlock()
|–>__raw_write_unlock(lock)
static inline void __raw_write_unlock(raw_rwlock_t *rw){ asm volatile(LOCK_PREFIX "addl $" RW_LOCK_BIAS_STR ", %0" : "+m" (rw->lock) : : "memory"); //rw + 1}
3.参考文章:1)linux下写优先的读写锁的设计:http://www.ibm.com/developerworks/cn/linux/l-rwlock_writing/