Posix信号量与互斥量解决生产者消费者问题

Posix信号量

Posix信号量

有名信号量

无名信号量

sem_open

sem_init

sem_close

sem_destroy

sem_unlink

sem_wait

sem_post

有名信号量

#include <fcntl.h>/* For O_* constants */#include <sys/stat.h>/* For mode constants */#include <semaphore.h>sem_t *sem_open(const char *name, int oflag);sem_t *sem_open(const char *name, int oflag, mode_t mode, unsigned int value);int sem_close(sem_t *sem);int sem_unlink(const char *name);

与Posix类IPC用法类似:名字以/somename形式标识,且只能有一个/,并且总长不能超过NAME_MAX-4(i.e.,251)。

Posix有名信号量需要用sem_open函数创建或打开,PV操作分别是sem_wait和sem_post,可以使用sem_close关闭,删除用sem_unlink。

有名信号量用于不需要共享内存的进程间同步(可以通过名字访问),类似SystemV信号量。

匿名信号量

#include <semaphore.h>int sem_init(sem_t *sem, int pshared, unsigned int value);int sem_destroy(sem_t *sem);

匿名信号量只存在于内存中,并要求使用信号量的进程必须可以访问内存;这意味着他们只能应用在同一进程中的线程,或者不同进程中已经映射相同内存内容到它们的地址空间中的线程.

匿名信号量必须用sem_init初始化,sem_init函数的第二个参数pshared决定了线程共享(pshared=0)还是进程共享(pshared!=0),也可以用sem_post和sem_wait进行操作,在共享内存释放前,匿名信号量要先用sem_destroy销毁。

Posix信号量PV操作

int sem_wait(sem_t *sem);//P操作int sem_post(sem_t *sem);//V操作

wait操作实现对信号量的减1,如果信号量计数原先为0则会发生阻塞;

post操作将信号量加1,在调用sem_post时,如果在调用sem_wait中发生了进程阻塞,那么进程会被唤醒并且sem_post增1的信号量计数会再次被sem_wait减1;

Posix互斥锁#include <pthread.h>int pthread_mutex_init(pthread_mutex_t *mutex,const pthread_mutexattr_t *mutexattr);//互斥锁初始化, 注意:函数成功执行后,互斥锁被初始化为未锁住状态。int pthread_mutex_lock(pthread_mutex_t *mutex);//互斥锁上锁int pthread_mutex_trylock(pthread_mutex_t *mutex);//互斥锁判断上锁int pthread_mutex_unlock(pthread_mutex_t *mutex);//互斥锁解锁int pthread_mutex_destroy(pthread_mutex_t *mutex);//消除互斥锁

互斥锁是用一种简单的加锁方法来控制对共享资源的原子操作。这个互斥锁只有两种状态,也就是上锁/解锁,可以把互斥锁看作某种意义上的全局变量。在同一时刻只能有一个线程掌握某个互斥锁,,拥有上锁状态的线程能够对共享资源进行操作。若其他线程希望上锁一个已经被上锁的互斥锁,则该线程就会阻塞,直到上锁的线程释放掉互斥锁为止。可以说,这把互斥锁保证让每个线程对共享资源按顺序进行原子操作。

其中,互斥锁可以分为快速互斥锁(默认互斥锁)、递归互斥锁和检错互斥锁。这三种锁的区别主要在于其他未占有互斥锁的线程在希望得到互斥锁时是否需要阻塞等待。快速锁是指调用线程会阻塞直至拥有互斥锁的线程解锁为止。递归互斥锁能够成功地返回,并且增加调用线程在互斥上加锁的次数,而检错互斥锁则为快速互斥锁的非阻塞版本,它会立即返回并返回一个错误信息。

生产者消费者问题

运用C++,将缓冲区封装成classStorage

//Storage类设计class Storage{public:Storage(unsigned int _bufferSize);~Storage();void consume(int id); //消费void produce(int id); //生产private:// 打印缓冲区状态void display(bool isConsumer = false);private:unsigned int buffSize;int *m_storage; //缓冲区unsigned short int in; //生产位置unsigned short int out; //消费位置unsigned int product_number; //产品编号sem_t sem_full; //满信号量sem_t sem_empty;//空信号量pthread_mutex_t mutex; //互斥量: 保护缓冲区互斥访问};//Storage类实现Storage::Storage(unsigned int _bufferSize):buffSize(_bufferSize), in(0), out(0), product_number(0){m_storage = new int[buffSize];for (unsigned int i = 0; i < buffSize; ++ i)m_storage[i] = -1;sem_init(&sem_full, 0, 0);//将empty信号量初始化为缓冲区大小sem_init(&sem_empty, 0, buffSize);pthread_mutex_init(&mutex, NULL);}Storage::~Storage(){delete []m_storage;pthread_mutex_destroy(&mutex);sem_destroy(&sem_empty);sem_destroy(&sem_full);}void Storage::produce(int id){printf("producer %d is waiting storage not full\n", id);//获取empty信号量sem_wait(&sem_empty);//获取互斥量pthread_mutex_lock(&mutex);//生产cout << "++ producer " << id << " begin produce "<< ++product_number << " …" << endl;m_storage[in] = product_number;//打印此时缓冲区状态display(false);in = (in+1)%buffSize;cout << " producer " << id << " end produce …\n" << endl;//释放互斥量pthread_mutex_unlock(&mutex);//释放full信号量sem_post(&sem_full);sleep(1);}void Storage::consume(int id){printf("consumer %d is waiting storage not empty\n", id);//获取full信号量sem_wait(&sem_full);//获取互斥量pthread_mutex_lock(&mutex);//消费int consume_id = m_storage[out];cout << "– consumer " << id << " begin consume "<< consume_id << " …" << endl;m_storage[out] = -1;//打印此时缓冲区状态display(true);out = (out+1)%buffSize;cout << " consumer " << id << " end consume …\n" << endl;//解锁互斥量pthread_mutex_unlock(&mutex);//释放empty信号量sem_post(&sem_empty);sleep(1);}void Storage::display(bool isConsme){cout << "states: { ";for (unsigned int i = 0; i < buffSize; ++i){if (isConsme && out == i)cout << ‘#’;else if (!isConsme && in == i)cout << ‘*’;if (m_storage[i] == -1)cout << "null ";elseprintf("%-4d ", m_storage[i]);}cout << "}" << endl;}//生产者, 消费者代码实现//缓冲区Storage *storage;//生产者-线程void *producer(void *args){int id = *(int *)args;delete (int *)args;while (1)storage->produce(id); //生产return NULL;}//消费者-线程void *consumer(void *args){int id = *(int *)args;delete (int *)args;while (1)storage->consume(id); //消费return NULL;}//主控线程int main(){int nProducer = 1;int nConsumer = 2;cout << "please input the number of producer: ";cin >> nProducer;cout << "please input the number of consumer: ";cin >> nConsumer;cout << "please input the size of buffer: ";int size;cin >> size;storage = new Storage(size);pthread_t *thread = new pthread_t[nProducer+nConsumer];//创建消费者进程for (int i = 0; i < nConsumer; ++i)pthread_create(&thread[i], NULL, consumer, new int(i));//创建生产者进程for (int i = 0; i < nProducer; ++i)pthread_create(&thread[nConsumer+i], NULL, producer, new int(i));//等待线程结束for (int i = 0; i < nProducer+nConsumer; ++i)pthread_join(thread[i], NULL);delete storage;delete []thread;}

完整源代码:

要知道,当你一直在担心错过了什么的时候,其实你已经错过了旅行的意义。

Posix信号量与互斥量解决生产者消费者问题

相关文章:

你感兴趣的文章:

标签云: