网站首页 > 技术文章 正文
基于生产者消费者模式的MQ(msg queue)实现了线程间通信,在生产者消费者模型中通常都会用到互斥锁pthread_mutex_t来保护共享内存资源,多个线程访问共享内核空间之前都会尝试获取mutex,如果有其他线程正在使用则当前线程进入锁等待状态。这样的机制难免会带来两个问题:
1:如果两个线程同时获取mutex,则两个线程会进入死锁状态
2:如果多个线程依次获取mutex,那么这些线程都会进入锁等待,浪费了cpu资源
因此,我们可以使用互斥锁pthread_mutex_t和pthread_cond_t配合来避免上面的问题
简单来说这种方法的核心思路:就是调用pthread_mutex_lock(mutex)获取锁时再判断一下是否满足获取资源的条件,比如队列为空时执行get操作就是不满足条件,如果不满足条件则调用pthread_cond_wait(cond, mutex)这个函数使当前线程进入睡眠并且把mutex互斥锁释放掉,等待获取资源的条件满足时,比如队列新加了一条msg,此时其他线程会调用pthread_cond_signal()来唤醒这个线程并锁定mutex,这样就不会出现多个线程等待一个mutex或者死锁的情况了。
生产者和消费者模型中互斥锁和条件变量的使用流程图如下,其中蓝色代表消费者的执行流,红色是生产者的执行流
下面是一个简单的程序示例:
#include <unistd.h>
#include <pthread.h>
#define CONSUMERS_COUNT 2
#define PRODUCERS_COUNT 1
pthread_mutex_t g_mutex ;
pthread_cond_t g_cond ;
pthread_t g_thread[CONSUMERS_COUNT + PRODUCERS_COUNT] ;
int share_variable = 0 ;// this is the share variable, shared by consumer and producer
void* consumer( void* arg )
{
int num = (int)arg ;
while ( 1 )
{
/******* critical section begin *******/
pthread_mutex_lock( &g_mutex ) ;
// if share_variable == 0, means consumer shell stop here
while ( share_variable == 0 )
{
printf( "consumer %d begin wait a condition...\n", num ) ;
// put a thread blocked ont a condition variable( here is g_cond),
// and unlock the mutex( here is g_mutex )
pthread_cond_wait( &g_cond, &g_mutex ) ;
}
// here means n != 0 and consumer can goes on
// consumer consumed shared variable, so the number of shared variable shell minus
printf( "consumer %d end wait a condition...\n", num ) ;
printf( "consumer %d begin consume product\n", num ) ;
-- share_variable ;
pthread_mutex_unlock( &g_mutex ) ;
/******** critial section end *********/
sleep( 1 ) ;
}
return NULL ;
}
void* producer( void* arg )
{
int num = (int)arg ;
while ( 1 )
{
/******* critical section begin *******/
pthread_mutex_lock( &g_mutex ) ;
// produce a shared variable
printf( "producer %d begin produce product...\n", num ) ;
++ share_variable ;
printf( "producer %d end produce product...\n", num ) ;
// unblock threads blocked on a condition variable( here is g_cond )
pthread_cond_signal( &g_cond ) ;
printf( "producer %d notified consumer by condition variable...\n", num ) ;
pthread_mutex_unlock( &g_mutex ) ;
/******** critial section end *********/
sleep( 5 ) ;
}
return 1 ;
}
int main( void )
{
// initiate mutex
pthread_mutex_init( &g_mutex, NULL ) ;
// initiate condition
pthread_cond_init( &g_cond, NULL ) ;
// initiate consumer threads
for ( int i = 0; i < CONSUMERS_COUNT; ++ i )
{
pthread_create( &g_thread[i], NULL, consumer, (void*)i ) ;
}
sleep( 1 ) ;
// initiate producer threads
for ( int i = 0; i < PRODUCERS_COUNT; ++ i )
{
pthread_create( &g_thread[i], NULL, producer, (void*)i ) ;
}
for ( int i = 0; i < CONSUMERS_COUNT + PRODUCERS_COUNT; ++ i )
{
pthread_join( g_thread[i], NULL ) ;
}
pthread_mutex_destroy( &g_mutex ) ;
pthread_cond_destroy( &g_cond ) ;
}
猜你喜欢
- 2024-10-12 漫画 | Linux 并发和竞态问题究竟是什么?
- 2024-10-12 【驱动】串口驱动分析(三)-serial driver
- 2024-10-12 synchronized锁 synchronized锁的是类还是对象
- 2024-10-12 Golang 程序遇到性能问题该怎么办?
- 2024-10-12 线程间通信——互斥锁 线程间互斥方式
- 2024-10-12 【Linux系统编程】互斥锁 linux 互斥锁优先级反转
- 2024-10-12 linux c/c++开发:多线程并发锁:互斥锁、自旋锁、原子操作、CAS
- 2024-10-12 每行代码都带注释,带你看懂Go互斥锁的源码
- 2024-10-12 一文搞懂pprof 一文搞懂伤寒论六经辨证
- 2024-10-12 并发原理系列八:信号量、互斥锁、自旋锁
你 发表评论:
欢迎- 最近发表
- 标签列表
-
- oraclesql优化 (66)
- 类的加载机制 (75)
- feignclient (62)
- 一致性hash算法 (71)
- dockfile (66)
- 锁机制 (57)
- javaresponse (60)
- 查看hive版本 (59)
- phpworkerman (57)
- spark算子 (58)
- vue双向绑定的原理 (68)
- springbootget请求 (58)
- docker网络三种模式 (67)
- spring控制反转 (71)
- data:image/jpeg (69)
- base64 (69)
- java分页 (64)
- kibanadocker (60)
- qabstracttablemodel (62)
- java生成pdf文件 (69)
- deletelater (62)
- com.aspose.words (58)
- android.mk (62)
- qopengl (73)
- epoch_millis (61)
本文暂时没有评论,来添加一个吧(●'◡'●)