浅析线程间通信三:Barriers、信号量(semaphores)以及各种同步方

之前的文章讨论了互斥量、条件变量、读写锁和自旋锁用于线程的同步,本文将首先讨论Barriers和信号量的使用,并给出了相应的代码和注意事项,相关代码也可在我的github上下载,然后对线程各种同步方法进行了比较。

Barriers

Barriers是一种不同于前面线程同步机制,它主要用于协调多个线程并行(parallel)共同完成某项任务。一个barrier对象可以使得每个线程阻塞,直到所有协同(合作完成某项任务)的线程执行到某个指定的点,才让这些线程继续执行。前面使用的pthread_join调用也可以看成简单的barrier,那里主线程要求其他线程都退出后,才能继续执行。可以使用接口pthread_barrier_init来初始化一个对象,用phread_barrier_destory来销毁一个barrier对象。他们声明如下:

#include <pthread.h>int pthread_barrier_destroy(pthread_barrier_t *barrier);int pthread_barrier_init(pthread_barrier_t *restrict barrier,const pthread_barrierattr_t *restrict attr, unsigned count); 当使用pthread_barrier_init接口初始化hygienebarrier对象时,参数count指定需要多少个线程执行到指定的点,才能使得所有线程继续往下执行。每个线程执行的指定的点是指线程自身通过调用pthread_barrier_wait来表明当前线程执行到指定点了,等待其他线程调用pthread_barrier_wait。pthread_barrier_wait声明如下:

#include <pthread.h>int pthread_barrier_wait(pthread_barrier_t *barrier); 该函数返回0,表明还需要等待其他线程调用pthread_barrier_wait,返回PTHREAD_BARRIER_SERIAL_THREAD表明所有线程可以继续往下执行了,具体那个线程返回PTHREAD_BARRIER_SERIAL_THREAD则是随机的。下面是用barrier来实现多个线程的排序,代码如下:#include <stdlib.h>#include <stdio.h>#include <pthread.h>#include <limits.h>#include <sys/time.h>#define NTHR 8/* number of threads */#define NUMNUM 800L/* number of numbers to sort */#define TNUM (NUMNUM/NTHR) /* number to sort per thread */long nums[NUMNUM];long snums[NUMNUM];pthread_barrier_t b;#define heapsort qsort/* * Compare two long integers (helper function for heapsort) */int complong(const void *arg1, const void *arg2){long l1 = *(long *)arg1;long l2 = *(long *)arg2;if (l1 == l2)return 0;else if (l1 < l2)return -1;elsereturn 1;}/* * Worker thread to sort a portion of the set of numbers. */void * thr_fn(void *arg){long idx = (long)arg;heapsort(&nums[idx], TNUM, sizeof(long), complong);pthread_barrier_wait(&b);/** Go off and perform more work …*/return((void *)0);}/* * Merge the results of the individual sorted ranges. */void merge(){long idx[NTHR];long i, minidx, sidx, num;for (i = 0; i < NTHR; i++)idx[i] = i * TNUM;for (sidx = 0; sidx < NUMNUM; sidx++) {num = LONG_MAX;for (i = 0; i < NTHR; i++) {if ((idx[i] < (i+1)*TNUM) && (nums[idx[i]] < num)) {num = nums[idx[i]];minidx = i;}}snums[sidx] = nums[idx[minidx]];idx[minidx]++;}}int main(){unsigned long i;struct timeval start, end;long longstartusec, endusec;doubleelapsed;interr;pthread_ttid;/** Create the initial set of numbers to sort.*/srandom(1);for (i = 0; i < NUMNUM; i++)nums[i] = random();/** Create 8 threads to sort the numbers.*/gettimeofday(&start, NULL);pthread_barrier_init(&b, NULL, NTHR+1);for (i = 0; i < NTHR; i++) {err = pthread_create(&tid, NULL, thr_fn, (void *)(i * TNUM));if (err != 0){printf("can't create thread");return -1;}}pthread_barrier_wait(&b);merge();gettimeofday(&end, NULL);/** Print the sorted list.*/startusec = start.tv_sec * 1000000 + start.tv_usec;endusec = end.tv_sec * 1000000 + end.tv_usec;elapsed = (double)(endusec – startusec) / 1000000.0;printf("sort took %.4f seconds\n", elapsed);for (i = 0; i < NUMNUM; i++){if( (i < (NUMNUM – 1)) && (snums[i] > snums[i + 1]) )printf("sort failed!\n");printf("%ld,", snums[i]);}printf("\n");exit(0);}上面程序,有以下几点值得注意: I)在初始化barrier对象时,指定的线程的个数为工作线程个数+1,加1是加上主线程。 II)在程序中线程并不检测调用的pthread_barrier_wait()的返回值是0还是PTHREAD_BARRIER_SERIAL_THREAD,这是因为我们指定使用主线程来合并其他线程的执行结果。

信号量(semaphores)

信号量即可以用于进程间同步也可以用于同一进程内不同线程间的同步,下面暂时只讨论用于线程同步。一个信号量对象有一个相关联的整数值,该整数值用于大于等于0,对于一个已初始化的信号量对象,可以执行两类操作:一种是通过调用sem_wait对相应的整数值减1,即通常说的P操作,若当前的信号量的值等于0,则该操作会阻塞线程;另外一种通过sem_post对相应值加1,即通常说的V操作,若有线程阻塞在该信号量上,,则其中有一个线程被唤醒,即被唤醒的线程从sem_wait调用返回。这个两个操作声明如下:

#include <semaphore.h>int sem_post(sem_t *sem);int sem_wait(sem_t *sem);在Linux2.6后,NPTL实现了POSIX所要求的信号量特性。下面是使用信号量的一个简单例子,代码如下:

#include <unistd.h>#include <pthread.h>#include <stdio.h>#include <stdlib.h>#include <semaphore.h>#define BUFF_SIZE 5/* total number of slots */#define NP3/* total number of producers */#define NC3/* total number of consumers */#define NITERS4/* number of items produced/consumed */#define NONITEM-1/* stand for no item*/typedef struct {int buf[BUFF_SIZE]; /* shared var */int in;/* buf[in%BUFF_SIZE] is the first empty slot */int out;/* buf[out%BUFF_SIZE] is the first full slot */sem_t full;/* keep track of the number of full spots */sem_t empty;/* keep track of the number of empty spots */sem_t mutex;/* enforce mutual exclusion to shared data */} sbuf_t;sbuf_t shared;void *producer(void *arg){int i, item, index;index = (int)arg;for (i=0; i < NITERS; i++){/* Produce item */item = i;/* Prepare to write item to buf *//* If there are no empty slots, wait */sem_wait(&shared.empty);/* If another thread uses the buffer, wait */sem_wait(&shared.mutex);shared.buf[shared.in] = item;shared.in = (shared.in+1)%BUFF_SIZE;printf("[P%d] Producing item%d …\n", index, item);fflush(stdout);/* Release the buffer */sem_post(&shared.mutex);/* Increment the number of full slots */sem_post(&shared.full);/* Interleave producer and consumer execution */if (i % 2 == 1)sleep(1);}return NULL;}void *consumer(void *arg){int i, item, index;index = (int)arg;for (i=0; i < NITERS; i++){/* Prepare to read item to buf *//* If there are no full slots, wait */sem_wait(&shared.full);/* If another thread uses the buffer, wait */sem_wait(&shared.mutex);/* consume item */item = shared.buf[shared.out];shared.buf[shared.in] = NONITEM;shared.out = (shared.out+1)%BUFF_SIZE;printf(" ——> [C%d] Consuming item%d …\n", index, item);fflush(stdout);sem_post(&shared.mutex);/* Release the buffer *//* Increment the number of empty slots */sem_post(&shared.empty);/* Interleave producer and consumer execution */if (i % 2 == 1)sleep(1);}return NULL;}int main(){pthread_t idP[NP], idC[NC];int index;/*initialize an unnamed semaphore*/sem_init(&shared.full, 0, 0);sem_init(&shared.empty, 0, BUFF_SIZE);/*initialize mutex*/sem_init(&shared.mutex, 0, 1);/* Create NP producer */for (index = 0; index < NP; index++){pthread_create(&idP[index], NULL, producer, (void*)index);}/*Create NC consumers */for (index = 0; index < NC; index++){pthread_create(&idC[index], NULL, consumer, (void*)index);}/* wait for all producers and the consumer */for (index = 0; index < NP; index++){pthread_join(idP[index], NULL);}for (index = 0; index < NC; index++){pthread_join(idC[index], NULL);}exit(0);}编译运行程序结果如下:$gcc -o sem_example -Wall -lpthread sem_example.c $./sem_example [P1] Producing item0 …[P0] Producing item0 …[P0] Producing item1 …[P2] Producing item0 …[P2] Producing item1 …——> [C1] Consuming item0 …——> [C1] Consuming item0 …——> [C2] Consuming item1 …——> [C2] Consuming item0 …——> [C0] Consuming item1 …[P1] Producing item1 …——> [C0] Consuming item1 …[P0] Producing item2 …[P0] Producing item3 …——> [C1] Consuming item2 …——> [C1] Consuming item3 …[P1] Producing item2 …[P1] Producing item3 …[P2] Producing item2 …[P2] Producing item3 …——> [C0] Consuming item2 …——> [C0] Consuming item3 …——> [C2] Consuming item2 …——> [C2] Consuming item3 …上面程序用信号量实现了一个简单的多个生产者和多个消费者的问题。关于上面程序有几点值得说明的:

I)可以使用sem_init来初始化为一个信号量,其声明如下:

放弃那些不愿放弃的,容忍那些不可容忍的。

浅析线程间通信三:Barriers、信号量(semaphores)以及各种同步方

相关文章:

你感兴趣的文章:

标签云: