- 浏览: 16021262 次
- 性别:
- 来自: 济南
最新评论
-
wu1236:
ef0793cd94337324b6fefc4c9474af5 ...
Android ApiDemos示例解析(87):Media->MediaPlayer -
77219634:
0127bf2236bee4dd1f632ce430f1af1 ...
本博客文章都为转载,没有任何版权! -
77219634:
0127bf2236bee4dd1f632ce430f1af1 ...
VPLEX - EMC的RAC -
77219634:
0127bf2236bee4dd1f632ce430f1af1 ...
qTip2 Show -
77219634:
0127bf2236bee4dd1f632ce430f1af1 ...
SecureCRT中文乱码、复制粘贴乱码解决办法(修改版)
LINUX内核信号量设计与实现 |
taoistf just for fun |
taoistf@gmail.com 2008/08/18 |
一 LINUX内核信号量简介 |
为了同步对内核共享资源的访问,内核提供了 down函数和 up函数用于获取和释放资源。down和up所保护的访问资源的内核代码区域,就构成一个临界区。在等待获取资源进入临界区的过程中,代表进程运行的内核控制路径可以睡眠。 |
我们从 LINUX内核信号量最直观的设计/实现出发,通过一步步改进,揭示在 x86平台上完整的信号量设计/实现,然后探讨在不同平台上通用的信号量设计/实现。 |
二 LINUX内核信号量的初步设计与实现 |
1 数据结构 |
我们首先分析信号量 semphore应具备的数据结构。它需要一个计数 count,表示能进入临界区的进程个数。conut一般初始化为 1,表示信号量是互斥信号量,一次只允许一个进程进入临界区。它也能被初始化为其他正值,表示可有多个进程同时进入临界区。 |
当进程不能进入临界区时,它必须在信号量上睡眠,因此需要一个表示 “等待队列头 ”的字段 wait。在 SMP中,等待队列需要自旋锁来保护,因此 wait结构中应含有自旋锁 lock,和指向等待队列链表的指针 task_list。而插入等待队列的 “等待元素 ”,其字段中应含有指向进程结构的指针 task,及能够链入等待队列的指针 task_list。 |
Generated by Foxit PDF Creator © Foxit Software |
http://www.foxitsoftware.com For evaluation only. |
具体表示如下: |
struct wait_queue_t{ |
struct task_struct * task; |
struct list_head task_list; }; /*等待元素结构 */ |
struct wait_queue_head_t{ |
spinlock_t lock; |
struct list_head task_list; }; /*等待队列头结构 */ |
struct semphore{ |
int count; |
wait_queue_head_t wait; } /*信号量结构 */ |
2 算法 |
当进程需要获取信号量时,它调用down函数进入临界区。 down将semphore的count字段减1,若count非负,则可进入临界区;否则,加入睡眠队列。当进程离开临界区时,它调用up函数。 up将semphore的count字段加 1,若count非正,表示有进程睡眠,则将进程从wait等待队列中移走并唤醒它。 |
这里有一个问题:当up唤醒等待队列中睡眠进程时,是唤醒所有等待进程,还是只唤醒一个?如果唤醒所有进程,它们醒来后,就会去竞争一个获得信号量的机会,竞争失败的进程只能再度睡眠,这就使得系统有许多无谓的 schedule切换,降低了系统性能。所以, up只唤醒一个进程。为了保持 FIFO的唤醒顺序,down会将新进程以独占方式(表示唤醒时只唤醒一个)加在等待队列尾部,up则去唤醒等待队列头部的第一个独占进程。 |
3 实现 |
由于信号量的实现效率与系统性能息息相关,为了达到高效的信号量实现,不同的 cpu系统根据各自特点,提供了不同版本。我们以 x86平台为例,介绍信号量的实现与优化。 |
我们要注意一点:由于 x86平台对原子指令有很好的支持,而并发的多个进程可能 |
Generated by Foxit PDF Creator © Foxit Software |
http://www.foxitsoftware.com For evaluation only. |
会同时修改或读取 count值,所以对 count的修改与读取都要使用原子指令。为了达到高效, down函数与 up函数都使用汇编语言实现。 down函数特别要保证:进程能以最快的速度,判断出能否进入临界区。这样才能保证高效的系统性能。 |
按上述算法,其实现过程是很直接的。设sem是类型为 struct semphore的信号量地址。 down: |
movl $sem, %ecx lock;decl (%ecx); jns 1f /* 如非负,则往前跳到标号 1处进入临界区 */ /*调用者堆栈保存约定 */ pushl %eax |
pushl %edx pushl %ecx call __down /*如为负,调用 __down加入睡眠队列 */ popl %ecx popl %edx popl %eax |
1: |
void __down(struct semaphore * sem) |
{ struct task_struct *tsk = current; /*申明“等待元素结构 ”,并将结构中的进程指针初始化为 tsk */ DECLARE_WAITQUEUE(wait, tsk); tsk->state = TASK_UNINTERRUPTIBLE; /*将wait表示的“等待元素 ”以独占方式加在 sem->wait表示的等待队列尾部 */ add_wait_queue_exclusive(&sem->wait, &wait); schedule(); |
} |
up: movl $sem, %ecx lock;incl (%ecx); jg1f /* 如为正,则往前跳到标号 1处执行 up后第一条指令 */ /*调用者堆栈保存约定 */ pushl %eax |
pushl %edx pushl %ecx call __up /*如为非正,调用 __up唤醒睡眠进程 */ popl %ecx popl %edx popl %eax |
1: |
Generated by Foxit PDF Creator © Foxit Software |
http://www.foxitsoftware.com For evaluation only. |
void __up(struct semaphore * sem) |
{ |
/*在sem->wait表示的等待队列中,获取其第一个 “等待元素 ”wait */ |
struct wait_queue_t *wait = list_entry(&sem->wait.task_list,struct wait_queue_t , |
task_list); |
remove_wait_queue(&sem->wait, &wait); /* 在等待队列中删除 “等待元素 ” */ |
wake_up(sem->wait); /*唤醒等待队列中的第一个 “等待元素 ”所表示的独占进程 */ } |
三一步步改进信号量的设计与实现 |
前面提到过,信号量的实现效率与系统性能息息相关。下面,我们围绕如何提高性能,对上述信号量的设计/实现进行改进。 |
1 分析上述实现中的性能问题 |
我们设想这样的情景:当 wait等待队列不空时, up出现,它唤醒 wait队列的第一个进程prev。与此同时,有一个进程 next(通过内核控制路径)调用 down试图进入临界区。但prev原来调用 down试图进入临界区时对 count减1,在未能进入临界区而睡眠时并未恢复,直到prev被唤醒并调度执行进入临界区,而且执行完临界区代码,再调用up时才会对count增1。所以,进程 next只可能在 prev之后,进入临界区。也就是说:进程完全是按照发出down调用的次序,以 FIFO方式进入临界区,这非常公平。 |
但我们要注意一点:当 up出现时,说明可有一个进程进入临界区。如果 prev的优先级较低,它被唤醒后,很可能长时间都不能获得调度,在这段时间内,尽管允许进程进入临界区,而且进程next又试图进入临界区,但任何试图进入临界区的进程发只能睡眠! |
我们从系统性能的角度来考察是否允许next在prev之前进入临界区。对内核信号量的争用,影响系统性能的关键就是进程上下文切换schedule的代价。如果不允许 next在prev之前进入临界区,则 next必须被shedule切换掉。而允许 next在prev之前进入临界区,则 prev“至多 ”再次被 shedule。 |
而在以下两种情形下, prev无须被再次 shedule切换掉。 a在单 cpu上, up唤醒prev后,由于进程 next在prev之前获得调度运行, prev无须再次进行shedule切换。 |
Generated by Foxit PDF Creator © Foxit Software |
http://www.foxitsoftware.com For evaluation only. |
b在多 cpu上,如果 next在prev获得调度之前,执行完临界区代码,则也可减少一次对prev的shedule切换。 |
所以,如果允许进程 next在up后,但在 prev获得调度前进入临界区,能减少系统进行 schedule切换的次数,则系统的性能就会获得提高,系统会更有效率。其弊端是:后发出down调用的进程可能先获取信号量,一定程度上违背了公平原则。 |
如果允许next提前进入临界区,则原来的代码必须进行修改:当进程不能获取信号量而睡眠时,count要增1,抵消调用 down时的无条件减1。当进程睡眠醒来时,相当于进程调用 down之前的状态:可能与新进入的调用 down的进程竞争信号量。它再次尝试进入临界区的方式也应该与 down一致: count先减1,再判断能否进入临界区。 |
因此__down将被改写为: |
void __down(struct semaphore * sem) |
{ struct task_struct *tsk = current; DECLARE_WAITQUEUE(wait, tsk); |
for (;;) { /* 被唤醒后,还需再次判断能否进入临界区 */ atomic_inc(&sem->count); /* 睡眠前恢复 count */ tsk->state = TASK_UNINTERRUPTIBLE; add_wait_queue_exclusive(&sem->wait, &wait); |
schedule(); |
tsk->state = TASK_UNINTERRUPTIBLE; atomic_dec(&sem->count); /* 尝试进入临界区前,先减 1 */ if(atomic_read((&sem->count) >= 0)/* 重新判断能否进入临界区 */ |
break; } tsk->state = TASK_RUNNING; |
} |
在上述实现中,进程不一定按 FIFO进入临界区。而在进程尝试进入临界区时,总是无条件先减1,直到不能进入临界区时,才恢复 count值。这种对 count“优先减 1”的处理方法,使得在一些局部的时段, count的值会比“能进入临界区的”进程个数要小。这种实现上的特点,使得它能确保同时进入临界区的进程不会超过 count初值所限定的数目,但可能导致本来能进入临界区的进程无法进入临界区,特别是导致唤醒事件的丢失或延迟,这是我们在下面要尤其关注的。 |
Generated by Foxit PDF Creator © Foxit Software |
http://www.foxitsoftware.com For evaluation only. |
2 引入全局 spinlock保护对 count的修改与读取 |
分析一下,上述实现其实存在错误。考察下述情景:假设进程 A已进入临界区,进程B在等待队列。 A调用 up增加 sem->count,唤醒进程 B,同时进程 C也调用 __down试图进入临界区。假设事件发生的时序如下(表示形式:指令[所属进程顺序 ]): lock;decl(%ecx)[C1] lock;incl(%ecx)[A2] atomic_dec(&sem->count)[B3] if(atomic_read((&sem->count) >= 0)[B4] atomic_inc(&sem->count)[B5] atomic_inc(&sem->count)[C6]注:上述%ecx的值就是 &sem->count |
显然,如果按照这个时序,不管进程 B还是C都会睡眠。只有下一个 up才能将它们唤醒,这就相当于丢失了一次唤醒事件。出现这个问题的原因是:进程 B对sem->count进行修改与判断时,进程 C对sem->count的修改与判断也同时进行。要解决这个问题,必须把对sem->count的修改与判断放在一个自旋锁保护的“原子块”中,使得在同一个原子块中,对sem->count的判断与修改是一致的,而后一个原子块只能看到一致结果。我们引入专门用于信号量的全局自旋锁 semaphore_lock,因此 __down将被改写为: |
void __down(struct semaphore * sem) |
{ struct task_struct *tsk = current; DECLARE_WAITQUEUE(wait, tsk); spin_lock(&semaphore_lock); /* 原子化 sem->count的修改与判断 */ for (;;) { |
atomic_inc(&sem->count); spin_unlock(&semaphore_lock); |
tsk->state = TASK_UNINTERRUPTIBLE; add_wait_queue_exclusive(&sem->wait, &wait); schedule(); |
tsk->state = TASK_UNINTERRUPTIBLE; spin_lock(&semaphore_lock);/* 原子化 sem->count的修改与判断 */ atomic_dec(&sem->count); if(atomic_read((&sem->count) >= 0) /* 重新判断能否进入临界区 */ |
break; } tsk->state = TASK_RUNNING; |
} |
但上述实现仍有问题。我们还是沿用同样的情景,假设进程B比进程 C先获得 semaphore_lock,则原来的时序也依然有效: |
Generated by Foxit PDF Creator © Foxit Software |
http://www.foxitsoftware.com For evaluation only. |
lock;decl(%ecx)[C1] lock;incl(%ecx)[A2] atomic_dec(&sem->count)[B3] if(atomic_read((&sem->count) >= 0)[B4] atomic_inc(&sem->count)[B5] atomic_inc(&sem->count)[C6] |
不管进程B还是 C仍旧会睡眠。不过,现在的原因是非常明显的:进程 C获得 semaphore_lock时,sem->count已经发生变化,但进程 C并未读取判断sem->count,而是直接睡眠。只要将对 sem->count的判断放在 schedule之前,进程 C就可以进入临界区。这种改变使得任何情形下执行的原子块都含有对sem->count的读取判断,因而就能“看到“前一个原子块对sem->count的一致修改。因此__down将被改写为: |
void __down(struct semaphore * sem) |
{ struct task_struct *tsk = current; DECLARE_WAITQUEUE(wait, tsk); spin_lock(&semaphore_lock); /* 原子化 sem->count的修改与判断 */ for (;;) { |
if(atomic_read((&sem->count) >= 0) /* 将判断放在 schedule之前 */ |
break; atomic_inc(&sem->count); spin_unlock(&semaphore_lock); |
tsk->state = TASK_UNINTERRUPTIBLE; add_wait_queue_exclusive(&sem->wait, &wait); schedule(); |
tsk->state = TASK_UNINTERRUPTIBLE; spin_lock(&semaphore_lock);/* 原子化 sem->count的修改与判断 */ atomic_dec(&sem->count); |
} tsk->state = TASK_RUNNING; } |
在下面的讨论中,我们还会将自旋锁所保护的代码区域称作一个“原子块”。 |
为了对上述实现有一个更好的直观理解,我们来做一个小测试: |
假设count的初值为 1。进程 A已在临界区,进程 B在等待队列,C,D,E同时调用 __down试图进入临界区(即同时调用 lock;decl(%ecx))。然后, C发现不能进入临界区,首先获取自旋锁,此时进程 A调用up唤醒进程 B,它们按B,D,E的顺序获取自旋锁。那么,B,C,D,E中的哪一个进程将进入临界区? |
Generated by Foxit PDF Creator © Foxit Software |
http://www.foxitsoftware.com For evaluation only. |
3 无竞争地进入睡眠 |
上述实现仍在错误。考察下述情景:假设进程A已进入临界区,进程 B调用 __down试图进入临界区,同时, A调用 up增加 sem->count,唤醒睡眠进程。假设事件发生的时序如下: |
lock;decl((%ecx) [B1] if(atomic_read((&sem->count) >= 0)[B2] lock;incl (%ecx)[A3] wake_up(sem->wait)[A4] add_wait_queue_exclusive(&sem->wait, &wait)[B5] 注:上述%ecx的值就是 &sem->count |
结果是:进程 B无法进入临界区而睡眠,但进程 A却没有唤醒它!也是相当于丢失了一次唤醒事件。引起这个问题的原因,是因为进程 B先判断 sem->count,如果为负,然后加入睡眠队列。而进程 A的唤醒事件在进程 B“判断 sem->count是否为负 ”和“加入睡眠队列”之间到来。如果将原代码改成:先加入等待队列,然后再判断 sem->count,则不会出现上述竞态条件,问题迎刃而解。 |
在LDD“中断处理”一章“无竞争地睡眠”一节也讲到了类似内容,将其称作“半睡眠状态时的检查”。在内核中,引入wait_event,wait_event_interruptible,就是为了解决由sleep_on,sleep_on_interrupitle引发的同样的唤醒丢失问题,而 down进入睡眠的方式与wait_event一致。 |
因此__down将被改写为: |
void __down(struct semaphore * sem) |
{ struct task_struct *tsk = current; DECLARE_WAITQUEUE(wait, tsk); tsk->state = TASK_UNINTERRUPTIBLE; |
spin_lock(&semaphore_lock); |
for (;;) { /* 将加入等待队列操作放在判断之前 */ add_wait_queue_exclusive(&sem->wait, &wait); if (atomic_read((&sem->count) >= 0) |
break; atomic_inc(&sem->count); spin_unlock(&semaphore_lock); schedule(); tsk->state = TASK_UNINTERRUPTIBLE; spin_lock(&semaphore_lock); atomic_dec(&sem->count); |
} spin_unlock(&semaphore_lock); |
Generated by Foxit PDF Creator © Foxit Software |
http://www.foxitsoftware.com For evaluation only. |
remove_wait_queue(&sem->wait, &wait); /* 如能进入临界区,从等待队列中删除 */ tsk->state = TASK_RUNNING; } |
考察这个实现,需要注意一点,add_wait_queue_exclusive函数需要获取 sem->wait.lock自旋锁,它本身又在 semaphore_lock自旋锁保护之内。如果获取 sem->wait.lock自旋锁需要较长时间,就会使得由 semaphore_lock自旋锁所保护临界区代码,执行时间过长,其他等待获得自旋锁的进程也会等待很长时间。另一方面,获取不同自旋锁的方式一般是:获取一把锁,释放后再获取另一把锁,以避免发生死锁。所以,需要在上述实现中将add_wait_queue_exclusive函数移到 spin_lock(&semaphore_lock)之前,也即for循环之外。 |
此时,会出现另一个问题:如果进程不能进入临界区而睡眠,当它被唤醒时,同时被__up从等待队列中删除,但由于add_wait_queue_exclusive在for循环之外,它不会重新加入等待队列!此时,需要我们改进__up:__up唤醒进程时,并不把它从等待队列中删除。这种改进还有一个额外的功效,能避免进程在尝试进入临界区的过程中,反复加入等待队列和从等待队列中删除,从而提高了性能。(但也会引发新的问题,我们在下一个步骤中就会看到) |
因此__down将被改写为: |
void __down(struct semaphore * sem) |
{ struct task_struct *tsk = current; DECLARE_WAITQUEUE(wait, tsk); tsk->state = TASK_UNINTERRUPTIBLE; /*将加入等待队列操作放在获取 semaphore_lock之前 */ add_wait_queue_exclusive(&sem->wait, &wait); spin_lock(&semaphore_lock); for (;;) { |
if (atomic_read((&sem->count) >= 0) |
break; atomic_inc(&sem->count); spin_unlock(&semaphore_lock); schedule(); tsk->state = TASK_UNINTERRUPTIBLE; spin_lock(&semaphore_lock); atomic_dec(&sem->count); |
} spin_unlock(&semaphore_lock); remove_wait_queue(&sem->wait, &wait); tsk->state = TASK_RUNNING; |
} |
而__up将被改写为: |
Generated by Foxit PDF Creator © Foxit Software |
http://www.foxitsoftware.com For evaluation only. |
void __up(struct semaphore * sem) { |
wake_up(sem->wait); /*去掉从等待队列中删除进程的操作 */ } |
4进入临界区前,弥补可能丢失的唤醒事件 |
上述实现仍在问题。在上述实现中,进程一开始加入等待队列,被唤醒时仍然在等待队列里。直到获取sem,进入临界区之前,才从等待队列里删除。前面提到过, sem->count的初值一般为 1,表示sem为互斥信号量。但 sem的设计允许count的初值为任何正值,表示可有 count个进程同时进入临界区。设想这样的情形: count的初值为2。进程A,B都在临界区里,进程 C,D在睡眠队列里等待进入临界区,进程 C在睡眠队列头部,进程A,B几乎同时调用up唤醒睡眠进程。 |
假设事件发生的时序如下: |
wake_up(sem->wait)[A1] wake_up(sem->wait)[B2] remove_wait_queue(&sem->wait, &wait)[C3] |
这样,虽然有两次唤醒操作,但针对的都是进程 C,进程 D依然处于睡眠状态,相当于又丢失了一次唤醒事件。只有在进程C被调度执行并进入临界区,执行完临界区代码,然后调用up,进程 D才能被唤醒。所以,严格来说,唤醒事件并未丢失,而是被严重延迟。引起这个问题的原因是:一个唤醒操作只能唤醒睡眠队列第一个进程,而且并未将其从等待队列中删除,使其仍然处在睡眠队列头,下一个唤醒操作可能又针对同一个进程。为了解决这个问题,在进程进入临界区之前,需再次调用 wake_up(sem->wait),以弥补可能延迟的唤醒事件。 |
需要额外注意的一点是,在linux中,还提供了一种 down操作的非阻塞版本 down_trylock:当不能获取信号量时,进程不睡眠而直接退出,它一般用在不能睡眠的中断服务例程和可延迟函数中。因此在上述实现中,如果在spin_lock保护的原子块中发生中断,而中断中又调用 down_trylock,down_trylock也会调用 spin_lock,就会导致死锁。因此,需将上述实现中涉及到spin_lock的地方,换成其带中断控制的版本。 |
因此__down将被改写为: |
void __down(struct semaphore * sem) |
{ |
struct task_struct *tsk = current; |
DECLARE_WAITQUEUE(wait, tsk); |
Generated by Foxit PDF Creator © Foxit Software |
http://www.foxitsoftware.com For evaluation only. |
tsk->state = TASK_UNINTERRUPTIBLE; add_wait_queue_exclusive(&sem->wait, &wait); |
spin_lock_irq(&semaphore_lock); /* 加上 “禁止中断 ”功能 */ for (;;) { if (atomic_read(&sem->count) >= 0) |
break; atomic_inc(&sem->count); spin_unlock_irq(&semaphore_lock); /* 加上“允许中断 ”功能 */ schedule(); tsk->state = TASK_UNINTERRUPTIBLE; spin_lock_irq(&semaphore_lock); /*加上“禁止中断 ”功能 */ atomic_dec(&sem->count); |
} spin_unlock_irq(&semaphore_lock); remove_wait_queue(&sem->wait, &wait); tsk->state = TASK_RUNNING; wake_up(&sem->wait); /*弥补可能丢失的唤醒事件 */ |
} |
从直观上看,当存在“可能丢失的唤醒事件”时, sem->count的值应该大于0,那么,在__down最后对 wake_up函数的调用能否改成条件判断,即: if (atomic_read(&sem->count) > 0) |
wake_up(&sem->wait)当条件判断不满足时,就能减少对wake_up的调用,从而优化了程序性能。 |
仔细一想,其实这是有问题的!设想这样的情形: count的初值为2。进程 A正准备进入临界区,但尚未从等待队列中删除,依然位于等待队列头部。进程 B已在临界区里,进程C在等待队列中,位于进程 A之后。当进程 A执行到 __down的尾部 spin_unlock_irq(&semaphore_lock)与 remove_wait_queue(&sem->wait, &wait)之间时候,进程B调用up,与此同时,有一个进程 D调用down试图进入临界区。假设事件发生时序如下: |
spin_unlock_irq(&semaphore_lock)[A1] lock;decl (%ecx) [D2] spin_lock_irq(&semaphore_lock) [D3] if atomic_read(&sem->count)[D4] lock;incl (%ecx) [B5] wake_up(&sem->wait)[B6] remove_wait_queue(&sem->wait, &wait)[A7] if (atomic_read(&sem->count) > 0)[A8] atomic_inc(&sem->count)[D9] |
按照这个时序,虽然进程 B调用了 up,但它不能唤醒进程 C,也不能使进程 D进入临界区,而 [A8]处的判断,使得并不重新调用 wake_up,因此,唤醒事件仍然丢失了!问题的根源在于: [A8]处的判断不在semaphore_lock的保护之下,这种判断是不可靠的,因而是无意义的。 |
Generated by Foxit PDF Creator © Foxit Software |
http://www.foxitsoftware.com For evaluation only. |
5 引入 sleepers,尽量减少原子指令 |
在上述实现中,用到了很多原子指令。而原子指令是很耗时的指令,并且在几乎所有系统中,它相当于memory barrier,使得编译器和 cpu都不能改变它前后的读写内存指令的顺序,因此也无法进行优化。因此,从程序性能的角度来讲,应该尽量避免使用原子指令。我们回忆一下:为避免 “只有先调用 down的进程才能先进入临界区 ”导致的系统性能损失,当进程无法进入临界区睡眠和被唤醒时,引入原子指令对 count计数进行直接修改,使新调用 down的进程有可能先进入临界区。 |
在保证同样功能的前提下,要减少原子指令,关键是:当进程无法进入临界区而睡眠时,及当进程被唤醒时,尽量不要直接修改count的值,而且尽可能合并 “修改与读取判断count的值 ”为一个原子指令。所以,我们可考虑一个两阶段修改 count的方案:当进程无法进入临界区而睡眠时,不修改 count,而是由下一个调用 down的进程在count中将此 1加上。 |
具体实现中,我们在 semphore结构中引入新的字段 sleepers,当进程能够进入临界区时,设置其值为0;当进程需要睡眠时,设置其值为 1。设想前一个进程已经设置 sleepers,则当本进程在 __down中读取count时(不管是第一次进入 for循环,还是从睡眠中醒来),需将sleepers加上,它表示:如果前一个进程已经进入临界区,则count不变;如果前一个进程睡眠,则需在 count中将 1补上。如果本进程不能进入临界区而睡眠,则当本进程醒来时,它仍相当于进程调用 down之前的状态,与原来的处理一样:使 count先减 1,再判断能否进入临界区。 |
我们可以考虑将__down中for循环改成如下代码: |
spin_lock_irq(&semaphore_lock); /* 加上“禁止中断 ”功能 */ |
for (;;) { |
int sleepers = sem->sleepers; |
if (!atomic_add_negative(sleepers, &sem->count)) { |
sem->sleepers = 0; /* 本进程能进入临界区,后一个进程不用增加 count */ |
break; |
} |
sem->sleepers = 1; /*本进程不能进入临界区,后一个进程需要增加 count */ |
spin_unlock_irq(&semaphore_lock); |
schedule(); |
tsk->state = TASK_UNINTERRUPTIBLE; |
spin_lock_irq(&semaphore_lock); |
atomic_dec(&sem->count); /* 尝试进入临界区前,先减 1 */ } |
我们在这里要注意一点:对函数atomic_add_negative的调用与对sleepers的赋值 |
Generated by Foxit PDF Creator © Foxit Software |
http://www.foxitsoftware.com For evaluation only. |
(sem->sleepers = 0 or 1)一定要放在 semaphore_lock自旋锁的保护之下,这与步骤 2中要将atomic_read与atomic_inc放在自旋锁内是一个道理,否则会引发相似的并发问题。 |
接下来思考:能否将atomic_dec(&sem->count)的功能,考虑由 atomic_add_negative(sleepers, &sem->count)来代替,这样就又能减少一条原子指令。我们注意到一点:进程只有在睡眠醒来后才需要对count减1,而在第一次进入 for循环时,并不需要减 1。因此,我们考虑将 atomic_add_negative函数中的 sleepers参数替换为sleepers-1,这样,就使得 count无条件减 1。但在进入 for循环时,使 sleepers增1,以抵消原子指令无条件减1的影响。 |
因此__down将被改写为: |
void __down(struct semaphore * sem) |
{ struct task_struct *tsk = current; DECLARE_WAITQUEUE(wait, tsk); tsk->state = TASK_UNINTERRUPTIBLE; add_wait_queue_exclusive(&sem->wait, &wait); |
spin_lock_irq(&semaphore_lock); sem->sleepers++; /* 抵消原子指令中无条件减 1的影响 */ for (;;) { |
int sleepers = sem->sleepers; /* 相当于先赋值: count = count + sleepers, 然后使count无条件减 1 */ if (!atomic_add_negative(sleepers - 1, &sem->count)) { |
sem->sleepers = 0; |
break; } sem->sleepers = 1; spin_unlock_irq(&semaphore_lock); schedule(); tsk->state = TASK_UNINTERRUPTIBLE; spin_lock_irq(&semaphore_lock); |
} spin_unlock_irq(&semaphore_lock); remove_wait_queue(&sem->wait, &wait); tsk->state = TASK_RUNNING; wake_up(&sem->wait); |
}以上实现对应linux2.4版本信号量实现。 |
性能分析:对比上一个实现,本实现减少了两条原子指令,但在进程不能进入临界区睡眠时,并没有及时减1,而是靠下一个试图进入临界区的进程将 1补上。这样也会引发一些性能问题,考虑这样的情形:当前一个进程睡眠后,up出现,唤醒睡眠 |
Generated by Foxit PDF Creator © Foxit Software |
http://www.foxitsoftware.com For evaluation only. |
队列头的进程A,在进程 A被调度执行之前,如果有新进程 B调用 down试图进入临界区,此时,在上一个实现中,进程 B通过 down的第一条指令,就会马上进入临界区。而在本实现中,进程 B需要进入 __down的for循环,通过原子指令 atomic_add_negative的判断后,才能进入临界区,进程B就会多执行一些指令。但要注意的是: up出现后,进程B只有在特定时间段内出现,才会出现上述问题。但由于进程调用 down试图进入临界区的时机完全是随机的,所以进程B在特定时间段内出现的概率几乎为 0,而原子指令的减少所带来的程序性能优化,则是完全确定的。因此,整体来看,上述实现仍然提高了程序性能。 |
6 去掉全局自旋锁 |
在上述实现中,我们用来保护原子块的自旋锁是一个全局自旋锁。也就是说,不同的信号量使用的是完全相同的一个锁,这会使得不同的信号量之间也会存在竞争。如果希望每个信号量都使用一个特定的自旋锁,最简单的方法是在信号量结构中增加一个字段lock作为自旋锁,则原实现中所有的 semaphore_lock都可替换成sem.lock。 |
假设原实现中所有的semaphore_lock都已替换成 sem.lock。我们注意到,在 add_wait_queue_exclusive和remove_wait_queue函数中,因为要对等待队列进行修改操作,必须先获取sem->wait结构中用来保护等待队列的自旋锁 lock,修改完睡眠队列,然后释放自旋锁;在 wake_up(&sem->wait)函数中要扫描等待队列,也必须先获取wait.lcok自旋锁。所以,在实现中两个自旋锁的使用是息息相关的:总是先释放一个自旋锁,然后马上去获取另外一个自旋锁。还要注意到一点:对这两个自旋锁的使用只存在于__down与__up之中,没有其他内核函数会使用它们。(当然, __down_interuptible也会使用这两个自旋锁,但它本质上与 _down在并发处理方式上是完全一致的,只要把__down考虑清楚了,它自然就没有问题了),因此,对这两个自旋锁的竞争只存在于__down与__down,__down与__up,__up与__up之间。 |
因此,我们有一个直观的想法:能否在 __down中将这两个自旋锁合并?使其所保护的原子指令块合二为一。因为wait.lock是与特定信号量相关的局部自旋锁,如果合并成wait.lock,还能避免引入 sem.lock而增加信号量空间。我们先分析一下,如果合并成wait.lock,上述实现中的代码应如何变化?先将 sem.lock简单地替换为 wait.lock。于是在__down中就会出现刚释放 wait.lock,紧接着又去获取 wait.lock的情形,这是一种无谓的锁争用,需要我们引入 add_wait_queue_exclusive,remove_wait_queue和wake_up的locked版本:他们都假设相应函数在调用之前,已经获取自旋锁,因此它们本身都不带有获取自旋锁的操作。 |
__down可被改写为: |
void __down(struct semaphore * sem) |
Generated by Foxit PDF Creator © Foxit Software |
http://www.foxitsoftware.com For evaluation only. |
{ struct task_struct *tsk = current; DECLARE_WAITQUEUE(wait, tsk); tsk->state = TASK_UNINTERRUPTIBLE; spin_lock_irq(&sem->wait.lcok); |
/*不带获取等待队列自旋锁操作 */ add_wait_queue_exclusive_locked(&sem->wait, &wait); sem->sleepers++; for (;;) { |
int sleepers = sem->sleepers; |
if (!atomic_add_negative(sleepers - 1, &sem->count)) { sem->sleepers = 0; break; |
} sem->sleepers = 1; spin_unlock_irq(&sem->wait.lock); schedule(); tsk->state = TASK_UNINTERRUPTIBLE; spin_lock_irq(&sem->wait.lock); |
} /*不带获取等待队列自旋锁操作 */ remove_wait_queue_locked(&sem->wait, &wait); wake_up_locked(&sem->wait); /* 不带获取等待队列自旋锁操作 */ spin_unlock_irq(&sem->wait.lock); tsk->state = TASK_RUNNING; |
} |
以上实现对应linux2.6版本中在 2.6.16之前的信号量实现。 |
我们接着分析上述修改对自旋锁竞争的影响。这种修改不会对__up之间竞争自旋锁有任何影响,我们只用考虑对__down与__down之间, __down与__up之间竞争自旋锁的影响。 |
先考虑__down与__down竞争自旋锁的情形。使用两个自旋锁时, __down在每次尝试进入临界区时至少需获取两次自旋锁(只在睡眠醒来又不能进入临界区时,只获取一次自旋锁),现在改成了一个自旋锁,在所有情形下,只需获取一次自旋锁即可完成尝试进入临界区的功能,因此,减少了 __down与__down竞争自旋锁的频率。 |
再者,需要注意的是:有两个自旋锁时, __down中两个自旋锁各自保护的原子块区域,比只有一个自旋锁时所保护的原子块区域要小,因此 __down之间竞争某一个锁时,时间可能会减少,但 __down只有依次获得这两个锁,执行完这两个原子块区域才能尝试进入临界区,所以的总的执行时间不可能减少。因此,整体来看,改成一个自旋锁wait.lock,提高了 __down之间竞争自旋锁的性能。 |
Generated by Foxit PDF Creator © Foxit Software |
http://www.foxitsoftware.com For evaluation only. |
再考虑__down与__up竞争自旋锁的情形。我们以进程能进入临界区的情形为例来说明,其他情形是一样的。 |
在使用两个自旋锁时,__down在进入临界区时候,会执行 add_wait_queue_exclusive, remove_wait_queue,wake_up三个函数,它们执行时都必须先获取 wait.lock自旋锁,因此都可能与__up形成三次竞争。 |
在只使用wait.lock时, __down在执行过程中,只须获取一次 wait.lock自旋锁,也就只有一次可能与__up形成竞争。我们要注意到一点,虽然减少了可能的竞争次数,但如果__down中wait.lcok保护的原子块执行时间明显增长,就会使得 __up等待自旋锁的时间明显增长,还是会影响系统性能。所以,我们考察一下 __down中原子块的内容: |
1 add_wait_queue_exclusive_locked 2对count的修改判断操作 3 remove_wait_queue_locked 4 wake_up_locked而add_wait_queue_exclusive_locked,remove_wait_queue_locked两个函数都是很简单的一个链表增/删操作,“对 count的处理”也是很简单的修改判断操作,只有 wake_up_locked比较复杂:它读取睡眠队列头元素,如被唤醒进程不在运行队列中,获取运行队列自旋锁,将其加入运行队列,并尝试在其他 cpu上重新调度它。因此,wait.lock自旋锁所保护的原子块的大小只相当于比 wake_up_locked有很小增长,加上获取 wait.lock自旋锁的时间,其执行时间只比 wake_up函数执行时间有很小的增长。因此,__down与__up的竞争类似于 wake_up函数与 __up的竞争,显然,它比使用两个自旋锁时有效得多。 |
综上所述,去掉全局自旋锁,只用一个wait.lock,提高了系统性能。 |
还有一个类似于步骤4的小问题:能否将 __down结尾处的wake_up_locked调用改成条件判断,即: |
if (atomic_read(&sem->count) > 0) wake_up_locked(&sem->wait);我感觉似乎是可行的,因为整个判断都处于 wait.lock的保护之下。但内核不这么写,问题在哪呢? |
四通用信号量 |
在信号量的设计中,其 count字段可初始化为任何非负值。在信号量的实际使用中,其初值一般都为1,即作为互斥信号量来使用。在 2.6.16中,内核引入了 mutex,其功 |
Generated by Foxit PDF Creator © Foxit Software |
http://www.foxitsoftware.com For evaluation only. |
能相当于count初值为 1的信号量,因为不用考虑多个进程同时进入临界区的情形,就可更加优化代码的性能。对性能敏感的内核代码如果要使用信号量,一般都会使用mutex,因而通常的信号量实现就不必过于苛求性能。 |
我们在上面的讨论中,都是从优化性能的角度来改进信号量的设计 /实现,例如通过原子指令直接判断能否进入临界区,进程睡眠时对count的增加,sleepers字段的引入等等。对性能的关注,还使得不同 cpu平台中的信号量实现代码会有很大不同。(我们上面讨论的信号量实现,其实是基于 x86平台的)比喻说,不同的平台,对原子指令的支持就不一样,有的平台就无法支持__down中的原子指令 atomic_add_negative的实现。如果不过于关注性能,就无须使用原子指令,也无须根据特定平台对代码进行优化,那么,信号量代码就能统一,形成通用的信号量实现。 |
通用的信号量实现,关键就在于对 count字段的处理。由于不能使用原子指令,在任何时候读取或修改count时,都需要放在自旋锁 wait.lock的保护之下。按照这个思路,我们考虑如此使用count和等待队列:当进程能进入临界区,我们对 count减1。当进程不能进入临界区,不修改 count,只是将进程加入睡眠队列。当调用 up时,如果睡眠队列为空,对 count增1,否则,不修改 count,只是唤醒睡眠队列头的进程。这样, count永远为非负值,而且,进程永远按照 “先调用 down先进入临界区 ”的公平原则顺序进入临界区。 |
因此,信号量的实现可 发表评论 |
相关推荐
本文深入浅出的讲解了Linux内核信号量的设计与实现,非常值得一读。
LINUX内核 信号量的设计与实现。学习内核的好工具
Linux内核信号量分析.pdf
Linux内核信号量分析
仓库管理问题,Linux0.11 信号量的实现与应用。
优先级继承运用于Linux内核信号量的研究与实现.pdf
Linux从2.0版本开始增加对于SMP的支持,最初的实现较为简单,以后随着版本更新,SMP的实现也逐步趋于完善。本文主要对Linux 2.6版本的内核信号量机制进行深入分析。
Linux内核中信号量机制的研究与实现.pdf
6.8 信号量 第7章基于socket的进程间通信 7.1系统调用socket() 7.2函数sys—socket()——创建插口 7.3函数sys—bind()——指定插口地址 7.4函数sys—listen()——设定server插口 7.5函数sys—accept()——接受...
内核机制与模块 107 9.1 内核机制 107 9.1.1 Bottom Half控制 107 9.1.2 任务队列 108 9.1.3 定时器 109 9.1.4 等待队列 110 9.1.5 自旋锁 110 9.1.6 信号量 110 9.2 模块 111 9.2.1 ...
Linux内核是Linux操作系统中最核心的部分,用于实现对硬件部件的编程控制和接口操作。《Linux2.6内核标准教程》深入、系统地讲解了 Linux内核的工作原理,对Linux内核的核心组件逐一进行深入讲解。 全书共8章,首先...
深入分析Linux内核源码 前言 第一章 走进linux 1.1 GNU与Linux的成长 1.2 Linux的开发模式和运作机制 1.3走进Linux内核 1.3.1 Linux内核的特征 1.3.2 Linux内核版本的变化 1.4 分析Linux内核的意义 ...
Linux内核是Linux操作系统中最核心的部分,用于实现对硬件部件的编程控制和接口操作。《Linux2.6内核标准教程》深入、系统地讲解了 Linux内核的工作原理,对Linux内核的核心组件逐一进行深入讲解。 全书共8章,首先...
6.8 信号量 第7章 基于socket的进程间通信 7.1 系统调用socket() 7.2 函数sys—socket()——创建插口 7.3 函数sys—bind()——指定插口地址 7.4 函数sys—listen()——设定server插口 7.5 函数sys—...
Linux内核是Linux操作系统中最核心的部分,用于实现对硬件部件的编程控制和接口操作。《Linux2.6内核标准教程》深入、系统地讲解了 Linux内核的工作原理,对Linux内核的核心组件逐一进行深入讲解。 全书共8章,首先...
Linux内核 前言 第1章 硬件基础与软件基础 6 1.1 硬件基础 6 1.1.1 CPU 7 1.1.2 存储器 8 1.1.3 总线 8 1.1.4 控制器和外设 8 1.1.5 地址空间 9 1.1.6 时钟 9 1.2 软件基础 9 1.2.1 计算机语言 9 1.2.2 什么是操作...