"> "> 操作系统-并发 | Yufei Luo's Blog

操作系统-并发

并发

定义

如果一个逻辑流的执行在时间上与另一个流重叠,称为并发流,这两个流被称为并发地运行。多个流并发执行的一般现象被简称为并发。在多线程的并发执行中,由于这些线程会共享地址空间,这就带来了一些必须要处理的问题。

备注—并发(concurrent)与并行(parallel):

在计算机系统中,只要两个任务在时间上重叠就被称为并发,这一概念并不关心任务如何具体地被执行,与执行这些任务的处理器核数或者是计算机数无关。并发主要关注的是任务的抽象调度,相当于是站在任务安排者视角看到的东西。

并行是并发的一个真子集,指的是两个流并发地运行在不同的处理器核或者计算机上。并行关注的是任务的实际执行,相当于是站在任务执行者视角的概念。

并发带来的问题

临界区

由于线程之间共享地址空间,因此可能会出现多个线程访问共享资源的情况,共享资源通常是一个变量或者数据结构。临界区(critical section)指的是访问共享资源的代码片段。

竞态条件

当多个执行线程大致同时进入临界区时,如果它们都试图更新共享的数据结构,则会导致非期望的结果,而这种结果通常也是我们不希望得到的。这种情况被称为竞态条件(race condition)。

不确定性

如果程序中包含了一个或者多个竞态条件,那么程序的输出则会因运行而异,具体取决于哪些线程在何时运行。这样会导致结果出现不确定性(indeterminate),然而我们通常期望程序能给出一个确定的结果。

为了避免程序的不确定性,代码需要使用互斥(mutual exclusion)。通过这种方式,便可以保证只有一个线程进入临界区,从而避免出现竞态条件,产生确定的程序输出。

因此,我们需要硬件提供一些有用的指令,可以在这些指令之上构建一个通用的同步原语(synchronization primitive)集合,从而以同步和受控的方式访问临界区。

概述

锁就是一个变量(因此需要声明一个锁变量才能使用它),这个锁变量保存了锁在某一时刻的状态:可用(没有线程持有锁)或占用(一个线程持有锁)。锁可以被使用在代码块的临界区周围,只有当某个线程占用锁时才能执行临界区的代码,并在临界区执行完成之后释放锁,供其它线程使用。通过这种方式,锁便为程序员提供了最小程度的调度控制,让程序员获得一些控制权,从而使得操作系统的调度变得可控。

在POSIX库中,锁被称为互斥量,因为它被用来提供线程之间的互斥。对于锁的获取和释放有一组函数:lock()unlock()函数。调用lock()函数尝试获取锁,如果没有其他线程持有锁,那么该线程会获得锁,进入临界区。这个线程也被称为锁的持有者。如果另外一个进程对相同的锁变量调用lock(),因为锁被另一个线程持有,这一调用便不会返回。锁的持有者调用unlock()函数之后,便失去对锁的所有权。如果没有其它等待线程,则锁的状态变为可用;如果有等待线程,则其中一个等待线程会收到锁状态的变化,获取锁并进入临界区。

在实际编程中,一般倾向于使用不同的锁来保护不同的数据和结构,从而允许更多的线程进入临界区。这种细粒度的方案可以增加并发。同时也要注意,如果在程序执行的过程中会频繁地获取锁、释放锁,那么高并发就没有什么意义。

API

POSIX线程库提供了一些通过锁来提供互斥进入临界区的函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
//锁的初始化
pthread_mutex_t lock=PTHREAD_MUTEX_INITIALIZER; //第一种方法,将锁设置为默认值
int rc=pthread_mutex_init(&lock,NULL); //第二种方法,调用函数,第一个参数是锁本身的地址,它是一个指向pthread_mutex_t类型的指针;第二个参数是一组可选属性,NULL表示使用默认值。函数执行成功则返回0

//加锁解锁函数
int pthread_mutex_lock(pthread_mutex_t* mutex);
int pthread_mutex_unlock(pthread_mutex_t* mutex);

//获取锁,通常应该避免使用,但是对于避免死锁来说会很有用
int pthread_mutex_trylock(pthread_mutex_t* mutex); //尝试获取锁,如果锁已被占用则会失败
int pthread_mutex_timedlock(pthread_mutex_t* mutex, struct timespec* abs_timeout); //在超时或者获取锁之后返回

//销毁锁
pthread_mutex_destroy()

锁的实现

评价标准

  • 提供互斥:最基本的评价标准,判断锁是否有效,以及能否阻止多个线程同时进入临界区;
  • 公平性:是否能保证每一个竞争线程有公平的机会抢到锁,不会有竞争锁的线程会饿死,一直无法获得锁的情况;
  • 性能:使用锁之后的额外时间开销,需要考虑没有竞争、一个CPU上多个线程竞争、以及多个CPU多个线程竞争等不同使用场景。

控制中断

最早提供的互斥解决方案之一是在临界区关闭中断,这种方案是为单处理器开发的,伪代码如下:

1
2
3
4
5
6
void lock(){
DisableInterrupts();
}
void unlock(){
EnableInterrupts();
}

这种方法虽然简单,但是却有很多缺点。首先是这种方法要求我们允许调用线程执行特权操作,也就是说信任这种机制不会被滥用,但是这样常常会有麻烦;第二是这种方法不支持多处理器;而且关闭中断可能也会导致严重的系统问题;最后还有效率低的问题,因为打开与关闭中断的代码执行地很慢。

因此,只有在很有限的情况之下才能用关闭中断来实现互斥原语。例如某些情况下操作系统本身会采用屏蔽中断的方式,保证访问自己数据结构的原子性,或是避免某些复杂的中断处理情况。

自旋锁

自旋锁是最简单的一种锁,它一直自旋直到锁可用。在单处理器上,需要抢占式的调度器才能使用自旋锁,否则会因为自旋线程永远不会放弃CPU而导致无法使用。

自旋锁只允许一个线程进入临界区,因此它的正确性可以保证。但是自旋锁不提供任何公平性的保证,可能会导致饿死的情况出现;同时在单CPU上性能开销很大,因为处于自旋等待状态的线程会浪费掉它使用的CPU周期(但是多CPU上的性能不错)。

在不同的操作系统中,自旋锁的实现方式也各有不同。

不同硬件原语下的实现

为了实现自旋锁,需要硬件提供一些硬件原语的支持,这些硬件原语为一系列的原子指令。在不同的硬件原语下,自旋锁也有着不同的实现方式。

备注—原子指令:指令执行时会像期望那样全部执行完,不能在指令中间中断。当发生中断时,原子指令要么根本没有运行,要么运行完成,没有中间状态。

测试并设置

一些硬件提供了测试并设置指令,它返回old_ptr指向的旧值,同时将指针指向的值更新为new的新值。而且这一过程被原子地(Atomically)执行。它的伪代码如下:

1
2
3
4
5
int TestAndSet(int* old_ptr, int new){
int old=*old_ptr;
*old_ptr=new;
return old;
}

使用这一指令,自旋锁便可以用如下的伪代码来实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
typedef struct lock_t{
int flag;
} lock_t;

void init(lock_t* lock){
lock->flag=0; //0代表没有线程持有锁,1代表锁已被持有
}

void lock(lock_t* lock){
while(TestAndSet(&lock->flag,1)==1){
; //自旋并等待,在这期间什么也不做
}
}

void unlock(lock_t* lock){
lock->flag=0;
}
比较并交换

一些系统提供了比较并交换的硬件原语,它的基本思想是检测ptr指向的值是否与expected相等,如果是则更新ptr所指的值为新值,否则什么也不做,并且无论哪种情况都返回程序结束时ptr指向的那个值。它的伪代码如下:

1
2
3
4
5
6
int CompareAndSwap(int* ptr, int expected, int new){
int actual=*ptr;
if(actual==expected)
*ptr=new;
return actual;
}

只需要将测试并设置中的加锁函数稍作修改,便可以用比较并交换指令实现一个自旋锁:

1
2
3
4
5
void lock(lock_t* lock){
while(CompareAndSwap(&lock->flag,0,1)==1){
; //自旋并等待,在这期间什么也不做
}
}
链接的加载与条件式存储

一些平台提供了链接的加载和条件式存储这一对指令,可以配合使用来实现自旋锁。链接的加载指令从内存中取出一个值存入一个寄存器,而条件式存储指令是当上一次加载的地址在期间都没有更新时才会成功。这一对指令的伪代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
int LoadLinked(int* ptr){
return *ptr;
}

int StoreConditional(int* ptr, int value){
if(/*no one has updated *ptr since the LoadLinked to this address*/){
*ptr=value;
return 1; //success
}
else{
return 0; //failed to update;
}
}

通过这一对指令,自旋锁的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
void lock(lock_t* lock){
while(1){
while(LoadLinked(&lock->flag)==1){
; //spin until it's zero
}
if(StoreConditional(&lock->flag,1)==1){
return;
}
}
}

void unlock(lock_t* lock){
lock->flag=0;
}

##### 获取并增加

这一指令可以原子地返回特定地址的旧值,并且让该值自增1。这一指令的伪代码如下:

1
2
3
4
5
int FetchAndAdd(int* ptr){
int old=*ptr;
*ptr=old+1;
return old;
}

使用这一指令所实现的自旋锁如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
typedef struct lock_t{
int ticket;
int turn;
} lock_t;

void lock_init(lock_t* lock){
lock->ticket=0;
lock->turn=0;
}

void lock(lock_t* lock){
int myturn=FetchAndAdd(&lock->ticket);
while(lock->turn!=myturn)
; //spin
}

void unlock(lock_t* lock){
FetchAndAdd(&lock->turn);
}

这一方法可以保证所有线程都能抢到锁,因为一个线程执行时一定会修改并获得一个ticket值,因此它最终一定会被调度。

解决自旋过多

基于硬件的锁简单且有效,但是自旋会浪费CPU的时间片。比如有N个线程去竞争一个锁,那么就会浪费掉N-1个时间片用于自旋并等待一个线程释放掉锁。如果要让锁不会不必要地自旋,需要操作系统的支持。

放弃CPU

一个简单的办法是在要自旋的时候放弃CPU的使用权,从而允许其他线程的执行。下面的伪代码以硬件原语测试并设置为例,其它的硬件原语实现使用类似的办法进行修改:

1
2
3
4
void lock(){
while(TestAndSet(&flag,1)==1)
yield(); //放弃CPU
}

在单CPU上,如果运行的线程数比较少,这种方法十分有效。但是在多个线程反复竞争一把锁的条件下,这一办法仍然成本很高,因为上下文切换的成本仍然存在。而且这种办法也没有考虑到饿死的问题,可能一个线程会一直处于让出的循环。

使用队列

在前面讨论的办法中,存在的真正问题是存在太多的偶然性。因此,我们需要显式地施加某种控制,决定锁释放时谁可以抢到。这需要操作系统的更多支持,并需要一个队列来保存等待锁的线程。使用这种方法的伪代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
typedef struct lock_t{
int flag;
int guard;
queue_t* q;
} lock_t;

void lock_init(lock_t* m){
m->flag=0;
m->guard=0;
queue_init(m->q);
}

void lock(lock_t* m){
while(TestAndSet(&m->guard,1)==1)
;
if(m->flag==0){
m->flag=1; //获得锁
m->guard=0;
}
else{
queue_add(m->q,gettid());
setpark(); //让调用线程休眠,同时避免唤醒/等待竞争的出现
m->guard=0;
}
}

void unlock(lock_t* m){
while(TestAndSet(&m->guard,1)==1)
; //
if(queue_empty(m->q))
m->flag=0;
else
unpark(queue_remove(m->q)); //唤醒threadID标识的线程
m->guard=0;
}
两阶段锁

两阶段锁的程序执行也如字面一样分为两个阶段:在第一阶段,会先自旋一段时间,程序希望可以获取锁;如果第一阶段没有获得锁,那么便会执行第二阶段,调用者进入睡眠状态直到锁可用。

条件变量

定义

在很多情况下,线程需要检查某一条件满足之后,才会继续运行。这可以通过使用一个共享变量,然后让该线程去不断重复检查来实现。但是这种方法效率低下,浪费CPU的时间。一个更好的办法是让这一线程处于休眠状态,直到等待的条件满足,这可以通过条件变量来实现。

条件变量是一个显式队列,当某些执行状态(即条件)不满足时,线程可以把自己加入队列,等待该条件。当另外某个线程改变了这些状态时,就可以唤醒一个或者多个等待进程(通过在该条件上发信号),让它们继续执行。要使用条件变量,需要额外有一个相关的锁。

API

条件变量的相关API如下:

1
2
3
4
5
//声明
pthread_cond_t c=PTHREAD_COND_INITIALIZER;
//操作
pthread_cond_wait(pthread_cond_t* c,pthread_mutex_t* m); //线程休眠,当调用wait时,需要保证互斥量m处于上锁状态。函数会释放锁,并让调用线程休眠。而在线程被唤醒之后,仍需要重新获取锁。
pthread_cond_signal(pthread_cond_t* c); //唤醒等待在某个条件变量上的睡眠线程。虽然API没有要求,但是在调用这一函数时最好持有锁,以保证程序正确

生产者/消费者(有界缓冲区)问题

问题描述

假设有一个或者多个生产者线程,以及一个或者多个消费者线程。生产者将生成的数据项放入到缓冲区,消费者从缓冲区取走数据项,以某种方式消费。因为有界缓冲区是共享资源,因此必须通过同步机制来访问它,以免产生竞争条件。

很多实际的系统都有这种场景:例如多线程的网络服务器中,一个生产者将HTTP请求放入到工作队列中,消费线程从队列中取走请求并处理;在使用Linux的管道连接操作时,也使用到了有界缓冲区。

实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
int buffer[MAX];
int fill_ptr=0;
int use_ptr=0;
int count=0;

void put(int value){
buffer[fill_ptr]=value;
fill_ptr=(fill_ptr+1)%MAX;
count++;
}

int get(){
int tmp=buffer[use_ptr];
use_ptr=(use_ptr+1)%MAX;
count--;
return tmp;
}

cond_t empty,fill; //使用两个信号的目的是使得信号更具有指向性,消费者只唤醒生产者,生产者只唤醒消费者。否则可能会出现生产者唤醒生产者,或是消费者唤醒消费者的情况
mutex_t mutex;

void* producer(void* arg){
int i;
for(i=0;i<loops;i++){
Pthread_mutex_lock(&mutex);
while(count==MAX) //使用while而不是if,是为了保证线程再被唤醒之后,再一次检查条件是否仍旧满足。因为信号使用的是Mesa语义,也就是发信号给线程的时候只是唤醒它们,暗示状态发生了变化,但是并不会保证在它运行之前状态一直是期望的情况。(与之相反的叫做Hoare语义,保证被唤醒线程立刻执行)
Pthread_cond_wait(&empty,&mutex);
put(i);
Pthread_cond_signal(&fill);
Pthread_mutex_unlock(&mutex);
}
}

void* consumer(void* arg){
int i;
for(i=0;i<loops;i++){
Pthread_mutex_lock(&mutex);
while(count==0)
Pthread_cond_wait(&fill,&mutex);
int tmp=get();
Pthread_cond_signal(&empty);
Pthread_mutex_unlock(&mutex);
printf("%d\n",tmp);
}
}

覆盖条件

下面是一个多线程分配库的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
int bytesLeft=MAX_HEAP_SIZE;

cond_t c;
mutex_t m;

void* allocate(int size){
Pthread_mutex_lock(&m);
while(bytesLeft<size)
Pthread_cont_wait(&c,&m);
void* ptr=... //从堆中获取内存
bytesLeft-=size;
Pthread_mutex_unlock(&m);
return ptr;
}

void free(void* ptr,int size){
Pthread_mutex_lock(&m);
bytesLeft+=size;
Pthread_cond_broadcast(&c); //注意此处用的不是Pthread_cond_signal函数
Pthread_mutex_unlock(&m);
}

在上面的代码中,使用Pthread_cond_broadcast()函数来唤醒所有的等待线程,因为此时无法确定哪一个内存申请程序所请求的空间可以被分配。这种条件变量被叫做覆盖条件,因为它可以覆盖所有需要唤醒线程的场景,但是这样会导致太多线程被唤醒,从而影响性能。

信号量

定义

信号量是有一个整数值的对象,可以用两个函数来操作它,在POSIX标准中为sem_wait()sem_post()。信号量可以被用作锁或者条件变量,它的初始值能够决定其行为。所以首先要初始化信号量,才能调用其它函数与之交互。初始化过程如下:

1
2
3
#include<semaphore.h>
sem_t s;
sem_init(&s,0,1); //第二个参数为0代表信号量在同一个进程的多个线程间共享,如果使用其他值可以用于跨不同进程的同步访问;第三个参数用于初始化信号量的值

在初始化之后,可以调用sem_wait()sem_post()进行交互。sem_wait()会将信号量减1,如果这一步操作之后信号量变为负值,则让调用线程挂起,直到之后的一个post操作。而sem_post()会直接增加信号量的值,如果有等待线程则唤醒其中的一个。当信号量的值为负数时,这个值的绝对值就是等待线程的个数。

信号量的实现

下面的代码是信号量的一个简单实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
typedef struct _My_sem_t{
int value;
pthread_cond_t cond;
pthread_mutex_t lock;
} My_sem_t;

void My_sem_t_init(My_sem_t* s,int value){
s->value=value;
Cond_init(&s->cond);
Mutex_init(&s->lock);
}

void My_sem_wait(My_sem_t* s){
Mutex_lock(&s->lock);
while(s->value<=0){
Cond_wait(&s->cond,&s->lock);
}
s->value--;
Mutex_unlock(&s->lock);
}

void My_sem_post(My_sem_t* s){
Mutex_lock(&s->lock);
s->value++;
Cond_signal(&s->cond);
Mutex_unlock(&s->lock);
}

信号量的使用

二值信号量作为锁

将信号量的初值设置为1,便可以将信号量用作锁来使用。此时,便可以用sem_wait()sem_post()将临界区包裹起来,伪代码如下:

1
2
3
4
5
6
sem_t m;
sem_init(&m,0,1);

sem_wait(&m);
//临界区代码
sem_post(&m);

例:

假设有两个线程的场景。一开始线程0调用了sem_wait(),它把信号量的值减为0。由于信号量小于0时才会等待,故线程0从函数返回之后便进入临界区。如果没有其它线程尝试获取锁,线程0在执行完临界区的代码并调用sem_post()时,信号量会被重置为1。

如果在线程0调用sem_post()之前,线程1调用sem_wait()尝试进入临界区,此时线程1会把信号量减为-1,然后等待。线程0继续运行,最后调用sem_post()将信号量增加到0。此时线程1便可以获取锁,并且当它执行结束的时候再次增加信号量的值,将其恢复为1。

信号量用作条件变量

信号量也可以用在一个线程暂停执行,等待某一条件成立的场景。在这种场景下,通常一个线程等待条件成立,另一个线程修改条件并发送信号给等待线程,从而唤醒等待线程。因为等待线程在等待某些条件发生变化,因此此时的信号量作为条件变量。在这种情况下,信号量的初始值应该设置为0。

例:

假设一个父线程创建一个子线程,并等待它结束再继续执行。此时有两种情况:

第一种是父线程创建了子线程,但是子线程并没有运行。这种情况下,父进程调用sem_wait()会先于子进程调用sem_post()。此时,父线程会首先把信号量减为-1,然后睡眠等待。子线程运行结束的时候调用sem_post(),信号量增加为0。此时,父线程被唤醒,从sem_wait()中返回,执行剩下的程序。

第二种是子线程在父线程调用sem_wait()之前就已经执行结束。在这种情况下,子线程首先调用sem_post()将信号量从0变为1。当父线程执行到sem_wait()时,信号量的值为1,父线程将信号量减为0,然后从函数中返回继续执行剩下的程序,无需等待。

生产者/消费者问题

使用信号量来解决生产者/消费者问题的伪代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
sem_t empty;
sem_t full;
sem_t mutex;

void* producer(void* arg){
int i;
for(i=0;i<loops;i++){
sem_wait(&empty); //这一操作需要放在锁之前,否则会出现死锁的情况
sem_wait(&mutex); //放入队列或是从队列取出的操作必须是互斥的,否则可能会造成数据丢失
put(i);
sem_post(&mutex);
sem_post(&full);//这一操作需要放在锁之后,否则会出现死锁的情况
}
}

void* consumer(void* arg){
int i;
for(i=0;i<loops;i++){
sem_wait(&full);
sem_wait(&mutex);
int tmp=get();
sem_post(&mutex);
sem_post(&empty);
printf("%d\n",tmp);
}
}

int main(int argc,char* argv[]){
//……
sem_init(&empty,0,MAX); //MAX表示队列的上限
sem_init(&full,0,0); //信号量full为0的时候表示队列满
sem_init(&mutex,0,1); //互斥锁
//……
}

读者—写者锁

考虑到不同的数据结构访问可能需要不同类型的锁,人们发明了读者—写者锁。

如果某个线程要更新数据结构,需要调用rwlock_acquire_writelock()获得写锁,调用rwlock_release_writelock()来释放锁。内部通过一个writelock的信号量保证只有一个写者可以获得锁进入临界区,从而更新数据结构。

而在获取读锁时,读者首先要获取lock,然后增加reader变量,追踪目前有多少个读者在访问该数据结构。在函数rwlock_acquire_readlock()中,当第一个读者获取读锁的时候,也会同时获得写锁。一旦一个读者获取了读锁之后,其他的读者也可以获取读锁,但是想要获取写锁的线程就必须要等到所有的读者都结束。最后一个退出的读者会放弃写锁,从而让等待的写者可以获取写锁。

一个简单的读者—写者锁的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
typedef struct _rwlock_t{
sem_t lock;
sem_t writelock;
int readers;
} rwlock_t;

void rwlock_init(rwlock_t* rw){
rw->readers=0;
sem_init(&rw->lock,0,1);
sem_init(&rw->writelock,0,1);
}

void rwlock_acquire_readlock(rwlock_t* rw){
sem_wait(&rw->lock);
rw->readers++;
if(rw->readers==1){
sem_wait(&rw->writelock);
}
sem_post(&rw->lock);
}

void rwlock_release_readlock(rwlock_t* rw){
sem_wait(&rw->lock);
rw->readers--;
if(rw->readers==0){
sem_post(&rw->writelock);
}
sem_post(&rw->lock);
}

void rwlock_acquire_writelock(rwlock_t* rw){
sem_wait(&rw->writelock);
}

void rwlock_release_writelock(rwlock_t* rw){
sem_post(&rw->writelock);
}

哲学家就餐问题

这一问题的基本情况是:假定有N位哲学家围着一个圆桌,每两位哲学家中间有一把餐叉(共N把)。哲学家有时候需要思考,此时不需要餐叉;有时又需要就餐,而只有同时拿到左手和右手边的餐叉才能吃到东西。

每个哲学家的基本循环可以用如下的伪代码描述:

1
2
3
4
5
6
while(1){
think();
getforks();
eat();
putforks();
}

这一问题的关键是如何实现getforks()putforks()函数,保证没有死锁,没有哲学家饿死,并且尽可能有高的并发度。因此,每个餐叉需要用一个信号量来表示其是否被使用:sem_t forks[N],每个信号量都被初始化为1。

下面是一个有问题的解决方案:

1
2
3
4
5
6
7
8
9
void getforks(){
sem_wait(forks[p]);
sem_wait(forks[(p+1)%N]);
}

void putforks(){
sem_post(forks[p]);
sem_post(forks[(p+1)%N]);
}

上面的方案中,每个哲学家都首先获取左边的餐叉,然后获取右边的,在释放时也按照同样的顺序。但是这种方式会造成死锁,假设每个哲学家都拿取了左边的餐叉,此时所有的餐叉都被占有,所有的哲学家都被阻塞着,都在等待另一个哲学家占有的餐叉。解决这一问题最简单的办法,是修改某个或者某些(但一定不是全部)哲学家取餐叉的顺序,即改为先右后左,这样便会打破等待循环。

常见问题

非死锁缺陷

违反原子性缺陷

这种缺陷的定义是:违反了多次内存访问中预期的可串行性。也就是说,代码段本意是原子性的,但是在执行中却没有强制地实现原子性。通过给共享变量的访问加锁,便可以修复这一问题。

违反顺序缺陷

违反顺序缺陷的定义是:两个内存访问的预期顺序被打破。即A本来应该在B之前执行,但是实际运行中却不是这个顺序。使用条件变量来强制执行顺序即可修复这一缺陷。

死锁缺陷

发生原因

当线程1持有锁L1,正在等待另外一个锁L2;而线程2持有锁L2,正在等待锁L1释放时,便会因为两个线程互相等待而产生死锁,导致程序无法执行下去。在实际编程中,一个大型代码库里的组件之间通常会有复杂的依赖,这便会容易因循环依赖导致死锁;同时,软件开发中的封装思想与锁也不是很契合,因此一些看起来没有关系的接口也可能会导致死锁。

产生条件

  • 互斥:线程对于所需资源进行互斥地访问;
  • 持有并等待:线程持有资源并且又在等待其他的资源;
  • 非抢占:线程获得的资源不能被抢占;
  • 循环等待:线程之间存在一个环路,环路上每个线程都持有一个资源,而这个资源又是下一个线程需要申请的。

死锁的处理

预防

避免循环等待

最实用的预防技术是让代码不会产生循环等待。一个直接的办法是获取锁的时候提供一个全序,这也就是说,假如系统有两个锁L1和L2,每次都先申请L1再申请L2,这样便可以避免死锁。但是更复杂的系统中通常有许多锁,因此也常常使用偏序。

避免持有并等待

死锁的这一条件可以通过原子地抢锁来预防。在抢锁的过程中,不会发生不合适的线程切换,从而避免了死锁。但是这种方案不适用于封装,因为这一方案需要准确地知道要抢哪些锁,并且提前抢到这些锁。

抢锁

如果允许线程抢锁,则可以预防死锁的产生。但是这又会造成活锁的问题:两个线程可能一直重复抢锁的动作,又同时抢锁失败。此时,系统一直在运行这段代码,但是却没有进展。

避免互斥

完全避免互斥也可以预防死锁的形成。一些人设计了无等待的数据结构,这是通过使用硬件指令,从而构造出不需要锁的数据结构。但是这种方式也有可能会产生活锁。

通过调度避免

一些场景也更加适合使用死锁避免的方案。通过了解不同线程在运行过程中对锁的需求情况,以此进行调度,可以避免死锁的产生。

检查和恢复

有时我们允许死锁偶尔发生,检查到发生死锁之后再采取行动(如果死锁很少发生,并且产生死锁造成的代价很小,就无需花费大量精力避免死锁)。

基于事件的并发

## 概述

基于事件的并发指的是,我们等待事件的发生,当它发生时检查事件类型,然后做相应的工作。处理事件的代码叫做事件处理程序,它运行的时候,是系统中发生的唯一活动。

因此,调度就是决定接下来处理哪个事件。这种对调度的显式控制是基于事件方法的一个重要优点。由于一次只处理一个事件,所以不需要获取或者释放锁,线程化程序中常见的并发性错误也不会出现在基于事件的并发方法中。

存在的问题与解决

接收事件

基于事件的服务器需要决定哪个事件发生,也就是如何确定是否有它的消息已经到达。大多数系统提供了select()poll()系统调用,这些接口检查是否有任何应该关注的进入I/O。

例如select()函数在MacOS X上的定义如下:

1
2
3
4
5
int select(int nfds,
fd_set* restrict readfds,
fd_set* restrict writefds,
fd_set* restrict errorfds,
struct timeval* restrict timeout)

这一函数检查I/O描述符集合,它们的地址通过readfdswritefdserrorfds传入,分别查看它们中的某些描述符是否已准备好读取、是否准备好写入、或是否有异常情况待处理,在每个集合中检查前nfds个描述符。该函数返回所有集合中就绪描述符的总数。

补充—阻塞与非阻塞接口:

阻塞(又称同步)接口在返回给调用者之前完成所有的工作,而非阻塞(又称异步)接口开始一些工作,但是立即返回,剩余所有需要完成的工作都在后台完成。

阻塞系统调用

在使用基于事件的方法时,只有主事件在执行,此时如果一个事件处理程序发出一个阻塞调用,整个服务器就会阻塞直到调用完成。当事件处于阻塞状态时,系统处于闲置,造成潜在的巨大资源浪费。因此,在基于事件的系统中不允许阻塞调用。

为了克服这一限制,许多现代操作系统引入了异步I/O的接口。这些接口使得应用程序可以发出I/O请求,并在I/O完成之前立即将控制权返回给调用者,另外的接口让应用程序能够确定各种I/O是否已完成。同时,为了能够确定I/O是否完成,一些系统提供了基于中断的方法,使用UNIX的信号在异步I/O完成时通知应用程序,从而避免了轮询系统的需要。

状态管理

基于事件方法的代码通常比传统基于线程的代码更加复杂,因为当事件处理程序发出异步I/O时,它必须打包一些程序状态,以便下一个事件处理程序在I/O最终完成时使用。这个额外的工作被称为手工栈管理。

其他

基于事件的方法还有一些其他困难,例如:

  • 在多核CPU上不可能使用基于事件的方法。因为利用多个CPU时,事件服务器必须并行运行多个事件处理程序,此时便会出现同步问题,并且必须采用通用的解决方案如加锁。
  • 不能很好地与某些类型的系统活动集成,如分页等。
  • 基于事件的代码可能难以管理。
  • 异步网络I/O集成很难实现。

参考

  1. Operating Systems: Three Easy Pieces
  2. 深入理解计算机系统 第三版