博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
线程同步-生产者消费者问题
阅读量:2194 次
发布时间:2019-05-02

本文共 15667 字,大约阅读时间需要 52 分钟。

http://blog.csdn.net/big_bit/article/details/51356393

在进行多线程编程时,难免还要碰到两个问题,那就线程间的互斥与同步

线程同步是指线程之间所具有的一种制约关系,一个线程的执行依赖另一个线程的消息,当它没有得到另一个线程的消息时应等待,直到消息到达时才被唤醒。

线程互斥是指对于共享的进程系统资源,在各单个线程访问时的排它性。当有若干个线程都要使用某一共享资源时,任何时刻最多只允许一个线程去使用,其它要使用该资源的线程必须等待,直到占用资源者释放该资源。线程互斥可以看成是一种特殊的线程同步(下文统称为同步)。

生产者消费者问题就是一个著名的线程同步问题,该问题描述如下:有一个生产者在生产产品,这些产品将提供给若干个消费者去消费,为了使生产者和消费者能并发执行,在两者之间设置一个具有多个缓冲区的缓冲池,生产者将它生产的产品放入一个缓冲区中,消费者可以从缓冲区中取走产品进行消费,显然生产者和消费者之间必须保持同步,即不允许消费者到一个空的缓冲区中取产品,也不允许生产者向一个已经放入产品的缓冲区中再次投放产品。

关于线程同步和互斥的详细说明可以看:    http://blog.csdn.net/big_bit/article/details/51356381这篇文章

线程间的同步方法大体可分为两类:用户模式和内核模式。顾名思义,内核模式就是指利用系统内核对象的单一性来进行同步,使用时需要切换内核态与用户态,而用户模式就是不需要切换到内核态,只在用户态完成操作。

用户模式下的方法有:原子操作(例如一个单一的全局变量),临界区。内核模式下的方法有:事件,信号量,互斥量。下面我们来分别看一下这些方法:

一、互斥锁或互斥量(mutex)

    下面是用互斥量来解决生产者和消费者问题。为了现集中体现互斥量这个概念(就是一次只能有一个线程访问,其他线程阻塞),我们先简化一下问题:缓冲区或者仓库无限大(生产者和消费者都可以生产和消费产品,而且产品初始化时候数量就是无限多,这里我们主要体现),只有一个生产者和一个消费者,我们这个时候就可以把缓冲区设置为一个互斥量,一次要么生产者要么消费者霸占它。

·  初始化锁。在Linux下,线程的互斥量数据类型是pthread_mutex_t。在使用前,要对它进行初始化。

静态分配:pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
动态分配:int pthread_mutex_init(pthread_mutex_t *mutex,const pthread_mutex_attr_t *mutexattr);

·  加锁。对共享资源的访问,要对互斥量进行加锁,如果互斥量已经上了锁,调用线程会阻塞,直到互斥量被解锁。

int pthread_mutex_lock(pthread_mutex *mutex);
int pthread_mutex_trylock(pthread_mutex_t *mutex);

·  解锁。在完成了对共享资源的访问后,要对互斥量进行解锁。

int pthread_mutex_unlock(pthread_mutex_t *mutex);

·  销毁锁。锁在是使用完成后,需要进行销毁以释放资源。

int pthread_mutex_destroy(pthread_mutex *mutex);

接下来我们来看看实现流程:

下面开始代码实现:

[cpp]  
 
  1. #include <stdio.h>  
  2. #include <pthread.h>  
  3.   
  4. #define LOOP_COUNT 5            //生产者和消费者各自循环次数  
  5. pthread_mutex_t mutex;          //定义一个全局互斥量,在不同函数中  
  6.                                 //初始化和使用  
  7.   
  8. void *producer( void *arg );    //生产者线程  
  9. void *consumer( void *arg );    //消费者线程  
  10.   
  11. int main(int argc , char *argv[]){  
  12.     pthread_t thrd_prod , thrd_cons;  
  13.   
  14.     pthread_mutex_init( &mutex , NULL );    //初始化互斥量  
  15.   
  16.     //创建生产者和消费者线程  
  17.     if( pthread_create( &thrd_prod , NULL, producer ,  
  18.                 NULL ) != 0 )  
  19.         oops( "thread create failed." );  
  20.     sleep(1);                               //保证生产者线程先运行  
  21.   
  22.     if( pthread_create( &thrd_cons , NULL, consumer ,  
  23.                 NULL ) != 0 )  
  24.         oops( "thread create failed." );  
  25.   
  26.     //等待线程结束  
  27.     if( pthread_join( thrd_prod , NULL ) != 0 )  
  28.         oops( " wait thread failed.");  
  29.     if( pthread_join( thrd_cons , NULL ) != 0 )  
  30.         oops( " wait thread failed.");  
  31.   
  32.     pthread_mutex_destroy( &mutex );        //关闭互斥量  
  33.     return 0;  
  34. }  
  35.   
  36. void *producer( void *arg){  
  37.     int count = 0 ;             //循环计数  
  38.   
  39.     while( count++ < LOOP_COUNT ){  
  40.         pthread_mutex_lock( &mutex );   //加锁  
  41.   
  42.         //成功占有互斥量,接下来可以对缓冲区(仓库)进行生产  
  43.         //操作  
  44.         printf( " producer put a product to buffer.\n");  
  45.         sleep(3);               //休眠3秒, 便于程序观察  
  46.   
  47.         pthread_mutex_unlock( &mutex ); //解锁  
  48.         sleep(1);               //休眠一秒,防止它又马上占据锁  
  49.     }  
  50. }  
  51. void *consumer( void *arg ){  
  52.     int count = 0 ;             //循环计数  
  53.   
  54.     while( count++ < LOOP_COUNT ){  
  55. //      sleep(2);               //休眠一秒, 便于程序观察  
  56.         pthread_mutex_lock( &mutex );   //加锁  
  57.   
  58.         //成功占有互斥量,接下来可以对缓冲区(仓库)进行取出  
  59.         //操作  
  60.         printf( " consumer get a product from buffer.\n");  
  61.   
  62.         pthread_mutex_unlock( &mutex ); //解锁  
  63.         sleep(1);               //休眠一秒,防止它又马上占据锁  
  64.     }  
  65. }  
结果如下:
从结果可以看到,当生产者和消费者成功lock互斥量时,另一个就阻塞等待。

二、读写锁

   读写锁也叫做共享-独占锁,当读写锁以读模式锁住时,它是以共享模式锁住的;当它以写模式锁住时,它是以独占模式锁住的。

接下来我们改变一下生产者消费者问题:现在缓冲区或者仓库无限大(生产者和消费者都可以生产和消费产品,而且产品初始化时候数量就是无限多,这里我们主要体现),只有一个生产者(读写锁也可以应用到多个生产者问题),但有多个消费者, 我们这个时候就可以把为生产者设置一个写锁,为每个消费者设置一个读锁。

  1. 1.初始化读写锁。

    #include <pthread.h>

    int pthread_rwlock_init(pthread_rwlock_t *restrict rwlock,constpthread_rwlockattr_t *restrict attr);

    2.加锁。要在读模式下锁定读写锁,需要调用pthread_rwlock_rdlock要在写模式下锁定读写锁,需要调用pthread_rwlock_wrlock当读写锁是写加锁状态时,在这个锁被解锁之前,所有试图对这个锁加锁的线程都会被阻塞当读写锁在读加锁状态时,所有试图以读模式对它进行加锁的线程都可以得到访问权,但是如果线程希望以写模式对此锁进行加锁,它必须阻塞直到所有的线程释放读锁

    intpthread_rwlock_rdlock(pthread_rwlock_t *rwlock);

    intpthread_rwlock_wrlock(pthread_rwlock_t *rwlock);

    3.解锁。在完成了对共享资源的访问后,要对读写锁进行解锁。

    int pthread_rwlock_destroy(pthread_rwlock_t *rwlock);

    4.销毁锁。在释放读写锁占用的内存之前,需要调用pthread_rwlock_destroy做清理工作如果pthread_rwlock_init为读写锁分配了资源,pthread_rwlock_destroy将释放这些资源。如果在调用pthread_rwlock_destroy之前就释放了读写锁占用的内存空间,那么分配给这个锁的资源就丢失了。

    int pthread_rwlock_destroy(pthread_rwlock_t *rwlock);

[cpp]  
 
  1. #include <stdio.h>  
  2. #include <pthread.h>  
  3.   
  4. #define LOOP_COUNT 2            //生产者和消费者各自循环次数  
  5. #define LOOP_THRD 5             //消费者线程个数  
  6. pthread_rwlock_t rwlock;        //定义一个全局读写锁,在不同函数中  
  7.                                 //初始化和使用  
  8.   
  9. void *producer( void *arg );    //生产者线程  
  10. void *consumer( void *arg );    //消费者线程  
  11.   
  12. int main(int argc , char *argv[]){  
  13.     int thrd_num ,thrd_id[LOOP_THRD]  ;  
  14.     pthread_t thrd_prod , thrd_cons[LOOP_THRD];  
  15.   
  16.     pthread_rwlock_init( &rwlock , NULL );  //初始化互斥量  
  17.   
  18.     //创建一个生产者和多个消费者线程  
  19.     if( pthread_create( &thrd_prod , NULL, producer ,  
  20.                 NULL ) != 0 )  
  21.         oops( "thread create failed." );  
  22.   
  23.     for( thrd_num = 0 ; thrd_num < LOOP_THRD; thrd_num++ ){  
  24.         thrd_id[thrd_num] = thrd_num;       //线程id,注意线程共享变量  
  25.         if( pthread_create( &thrd_cons[thrd_num], NULL, consumer   
  26.                     , <span style="background-color: rgb(255, 0, 0);">(void *)( thrd_id+thrd_num)</span> ) != 0 )  
  27.             oops( "thread %d create failed." , thrd_num );  
  28.     }  
  29.   
  30.     //等待线程结束  
  31.     if( pthread_join( thrd_prod , NULL ) != 0 )  
  32.         oops( " wait thread failed.");  
  33.     for( thrd_num = 0 ; thrd_num < LOOP_THRD; thrd_num++ ){  
  34.         if( pthread_join( thrd_cons[thrd_num] , NULL ) != 0 )  
  35.             oops( " wait thread %d failed." , thrd_num);  
  36. //      printf("wait %d thread.\n" , thrd_num);  
  37.     }  
  38.     pthread_rwlock_destroy( &rwlock );      //关闭互斥量  
  39.     return 0;  
  40. }  
  41.   
  42. void *producer( void *arg){  
  43.     int count = 0 ;             //循环计数  
  44.   
  45.     while( count++ < LOOP_COUNT ){  
  46.         printf( "producer try to lock wrlock.\n");  
  47.         pthread_rwlock_wrlock( &rwlock );   //加锁  
  48.   
  49.         //成功占有互斥量,接下来可以对缓冲区(仓库)进行生产  
  50.         //操作  
  51.         printf( "producer lock successful, producer put a product to buffer.\n");  
  52.           
  53.         /* 
  54.             休眠3秒, 便于程序观察,可以看到 
  55.             其他读取线程不能占据锁而阻塞 
  56.         */        
  57.         sleep(3);                 
  58.         printf("prducer finished ,unlock wrlock.\n");  
  59.         pthread_rwlock_unlock( &rwlock ); //解锁  
  60.         sleep(1);                           //休眠一秒, 防止马上又占据写锁  
  61.     }  
  62. }  
  63. void *consumer( void *arg ){  
  64.     int count = 0 ;                         //循环计数  
  65.     int thrd_id = *( ( int*)arg );  
  66.   
  67. //  printf( "consumer %d ,%#x . \n" , thrd_id ,arg);  
  68.     while( count++ < LOOP_COUNT ){  
  69. //      sleep( thrd_id+1 );                 //休眠一秒, 便于程序观察  
  70.         printf( "consumer try to lock rdlock.\n" );  
  71.         pthread_rwlock_rdlock( &rwlock );   //加锁  
  72.   
  73.         //成功占有互斥量,接下来可以对缓冲区(仓库)进行取出  
  74.         //操作  
  75.         printf( " consumer locked successful ,consumer %d get a product from buffer."  
  76.                 "\n" , thrd_id);  
  77.         /* 
  78.             休眠3秒, 便于程序观察,可以看到 
  79.             其他读取线程能占据读锁 
  80.         */  
  81.         sleep(3);  
  82.         printf("consumer finished ,unlock rdlock.\n");  
  83.         pthread_rwlock_unlock( &rwlock );   //解锁  
  84.         sleep(thrd_id+1);                           //休眠一秒, 防止马上又占据读锁  
  85.     }  
  86. }  
结果如下:
可以看到当读写锁是写加锁状态时,在这个锁被解锁之前,所有试图对这个锁加锁的线程都会被阻塞当读写锁在读加锁状态时,所有试图以读模式对它进行加锁的线程都可以得到访问权,但是如果线程希望以写模式对此锁进行加锁,它必须阻塞直到所有的线程释放读锁。虽然读写锁的实现各不相同,但当读写锁处于读模式锁住状态时,如果有另外的线程试图以写模式加锁,读写锁通常会阻塞随后的读模式锁请求(貌似在程序里面没有体现出来)。这样可以避免读模式锁长期占用,而等待的写模式锁请求一直得不到满足
 另外我要说明的一点就是,传递参数 arg 为(void *)( thrd_id+thrd_num),我一开始并没有定义一个数组thrd_cons[LOOP_THRD]来存储线程编号的, 而是直接传thrd_num的地址,但通过在线程
int thrd_id = *( ( int*)arg );
// printf( "consumer %d ,%#x . \n" , thrd_id ,arg);
 这两句话就可以知道,当传递的是thrd_num地址时候,由于进程的所有信息对该进程的所有线程都是共享的,包括可执行的程序文本、程序的全局内存和堆内存、栈以及文件描述符。地址, 由于进程的所有信息对该进程的所有线程都是共享的,包括可执行的程序文本、程序的全局内存和堆内存、栈以及文件描述符。 thrd_num的值会随着线程的执行而发生改变,系统调度频率之快是我们无法想像的,所以thrd_num的值也是动态改变的。

三、条件变量(cond)

与互斥锁不同,条件变量是用来等待而不是用来上锁的。条件变量用来自动阻塞一个线程,直到某特殊情况发生为止。通常条件变量和互斥锁同时使用。条件变量分为两部分:条件和变量。条件本身是由互斥量保护的。线程在改变条件状态前先要锁住互斥量。条件变量使我们可以睡眠等待某种条件出现。条件变量是利用线程间共享的全局变量进行同步的一种机制,主要包括两个动作:一个线程等待"条件变量的条件成立"而挂起;另一个线程使"条件成立"(给出条件成立信号)。条件的检测是在互斥锁的保护下进行的。如果一个条件为假,一个线程自动阻塞,并释放等待状态改变的互斥锁。如果另一个线程改变了条件,它发信号给关联的条件变量,唤醒一个或多个等待它的线程,重新获得互斥锁,重新评价条件。如果两进程共享可读写的内存,条件变量可以被用来实现这两进程间的线程同步。

1.初始化条件变量。

静态态初始化,pthread_cond_t cond = PTHREAD_COND_INITIALIER;
动态初始化,int pthread_cond_init(pthread_cond_t *cond,pthread_condattr_t *cond_attr);

2.等待条件成立。释放锁,同时阻塞等待条件变量为真才行。timewait()设置等待时间,仍未signal,返回ETIMEOUT(加锁保证只有一个线程wait)

int pthread_cond_wait(pthread_cond_t *cond,pthread_mutex_t *mutex);
int pthread_cond_timewait(pthread_cond_t *cond,pthread_mutex 
*mutex,consttimespec *abstime);

3.激活条件变量。pthread_cond_signal,pthread_cond_broadcast(激活所有等待线程)

int pthread_cond_signal(pthread_cond_t *cond);
int pthread_cond_broadcast(pthread_cond_t *cond); //解除所有线程的阻塞

4.清除条件变量。无线程等待,否则返回EBUSY

int pthread_cond_destroy(pthread_cond_t *cond); 

    接下来我们又改变一下生产者消费者问题:现在缓冲区或者仓库大小为BUFSIZE,只有一个生产者和一个消费者(其实也适用于多个生产者和消费者), 我们这个时候就可以把缓冲区设置为一个互斥量,一次要么生产者要么消费者霸占它。但接下来处理方式与互斥量有所不同:假如生产者成功占据锁(缓冲区),这时它不能马上开始往里面生产东西,要先判断缓冲区是不是满的,如果缓冲区满了,那么生产者就会把自己放到等待条件的线程列表上,然后对互斥量进行解锁,这是一个原子操作。如果缓冲区不满则可以生产产品,然后给消费者发送notempty信号,表示缓冲区有产品了, 你可以yy了。然后解锁互斥量。假如是消费者成功占据锁(缓冲区),同样它要检查缓冲区是不是空的,如果空,那么消费者就会把自己放到等待条件的线程列表上,然后对互斥量进行解锁。如果不空,消费者开始yy,然后给生产者发送nofull信号, 表示缓冲区有位置可以生产了, 你快生产吧。然后解锁互斥量。就这样,生产者消费者和谐同步工作着。

 

流程图我就不画了,看代码也能明白过程:

 ---producer过程:lock(mutex)->checknotfull->(if notfull wait until notfull)->produce product->sendnotempty to consumer->unlock(mutex) 

---consumer过程:lock(mutex)->checknotempty->(if notempty wait until notempty)->get productfrom buffer->send notfull to poducer->unlock(mutex)

 

[cpp]  
 
  1. #include <stdio.h>  
  2. #include <pthread.h>  
  3.   
  4. #define LOOP_COUNT 20               //生产者和消费者各自循环次数,也可以说生产商品的总量  
  5. //#define LOOP_THRD 5               //消费者线程个数  
  6. #define BUFSIZE 5                   //缓冲区大小,也就是最多能放多少个产品  
  7.   
  8. pthread_mutex_t mutex;              //定义一个全局互斥量,在不同函数中  
  9.                                     //初始化和使用  
  10. pthread_cond_t notempty , notfull;  //定义两个条件变量,当作信号投放  
  11. unsigned int prod_pos = 3;          //定义生产者在缓冲区开始生产的位置  
  12. unsigned int cons_pos = 0;          //定义消费者在缓冲区开始消费的位置  
  13.   
  14. void *producer( void *arg );        //生产者线程  
  15. void *consumer( void *arg );        //消费者线程  
  16.   
  17. int main(int argc , char *argv[]){  
  18.     pthread_t thrd_prod , thrd_cons;  
  19.   
  20.     pthread_mutex_init( &mutex , NULL );    //初始化互斥量  
  21.   
  22.     //创建生产者和消费者线程  
  23.     if( pthread_create( &thrd_prod , NULL, producer ,  
  24.                 NULL ) != 0 )  
  25.         oops( "thread create failed." );  
  26.     sleep(1);                               //保证生产者线程先运行  
  27.   
  28.     if( pthread_create( &thrd_cons , NULL, consumer ,  
  29.                 NULL ) != 0 )  
  30.         oops( "thread create failed." );  
  31.   
  32.     //等待线程结束  
  33.     if( pthread_join( thrd_prod , NULL ) != 0 )  
  34.         oops( " wait thread failed.");  
  35.     if( pthread_join( thrd_cons , NULL ) != 0 )  
  36.         oops( " wait thread failed.");  
  37.   
  38.     pthread_mutex_destroy( &mutex );        //关闭互斥量  
  39.     return 0;  
  40. }  
  41.   
  42. void *producer( void *arg){  
  43.     int count = 0 ;             //循环计数  
  44.   
  45.     while( count++ < LOOP_COUNT ){  
  46.         printf( "producer try to lock .\n");  
  47.         pthread_mutex_lock( &mutex );   //加锁  
  48.   
  49.         /* 
  50.            成功占有互斥量,接着检查缓冲区是不是满了, 
  51.         */  
  52.         if( ( prod_pos + 1 ) % BUFSIZE == cons_pos ){  
  53.             //缓冲区满了  
  54.             printf( "producer wait not full.\n");  
  55.             pthread_cond_wait( &notfull , &mutex ); //等待条件满足  
  56.         }  
  57.         //如果没满,接下来可以对缓冲区(仓库)进行生产  
  58.         //操作  
  59.         printf( "producer lock successful, producer put %d's "  
  60.                 "product to buffer.\n" ,count);  
  61.         prod_pos = ( prod_pos +1 ) % BUFSIZE;       //下标前进一个  
  62.         pthread_cond_signal( &notempty );           //向消费者发送信号  
  63.         /* 
  64.             休眠3秒, 便于程序观察,可以看到 
  65.             其他读取线程不能占据锁而阻塞 
  66.         */        
  67.         sleep( 1 );               
  68.         printf("prducer finished ,unlock lock.\n");  
  69.         pthread_mutex_unlock( &mutex ); //解锁  
  70.         sleep( 1 );                         //休眠一秒, 防止马上又占据写锁  
  71.     }  
  72. }  
  73. void *consumer( void *arg ){  
  74.     int count = 0 ;                         //循环计数  
  75.   
  76.     while( count++ < LOOP_COUNT ){  
  77. //      sleep( thrd_id+1 );                 //休眠一秒, 便于程序观察  
  78.         printf( "consumer try to lock .\n" );  
  79.         pthread_mutex_lock( &mutex ); //解锁  
  80.   
  81.         /* 
  82.            成功占有互斥量,接下来检查缓冲区是否为空 
  83.         */  
  84.         if( cons_pos == prod_pos ){  
  85.             printf( "consumer wait not empty.\n");  
  86.             pthread_cond_wait( &notempty , &mutex );  
  87.         }  
  88.   
  89.         //缓冲区不空,可以对缓冲区(仓库)进行取出操作  
  90.         printf( " consumer locked successful ,consumer  "  
  91.                 "get %d product from buffer.\n" , count);  
  92.         cons_pos = ( cons_pos + 1) % BUFSIZE ;  //下标前进一个  
  93.         pthread_cond_signal( &notfull );        //向生产着发送信号  
  94.   
  95.         /* 
  96.             休眠3秒, 便于程序观察,可以看到 
  97.             其他读取线程能占据读锁 
  98.         */  
  99.         sleep( 1 );  
  100.         printf("consumer finished ,unlock lock.\n");  
  101.         pthread_mutex_unlock( &mutex );         //解锁  
  102.         sleep(1);                       //休眠一秒, 防止马上又占据读锁  
  103.     }  
  104. }  

先不忙看结果, 想想结果跟你预想的是不是一样,然后看结果:

死锁了!!!!  万万没想到!!!
然后排查,锁定到
pthread_cond_wait函数,查看其他资料,总结如下:

函数将解锁mutex参数指向的互斥锁,并使当前线程阻塞在cond参数指向的条件变量上。

被阻塞的线程可以被pthread_cond_signal函数,pthread_cond_broadcast函数唤醒,也可能在被信号中断后被唤醒。

pthread_cond_wait函数的返回并不意味着条件的值一定发生了变化,必须重新检查条件的值。

pthread_cond_wait函数返回时,相应的互斥锁将被当前线程锁定,即使是函数出错返回。

一般一个条件表达式都是在一个互斥锁的保护下被检查。当条件表达式未被满足时,线程将仍然阻塞在这个条件变量上。当另一个线程改变了条件的值并向条件变量发出信号时,等待在这个条件变量上的一个线程或所有线程被唤醒,接着都试图再次占有相应的互斥锁。

阻塞在条件变量上的线程被唤醒以后,直到pthread_cond_wait()函数返回之前条件的值都有可能发生变化。所以函数返回以后,在锁定相应的互斥锁之前,必须重新测试条件值。最好的测试方法是循环调用pthread_cond_wait函数,并把满足条件的表达式置为循环的终止条件。

所以上述代码应该用循环而不是if。具体修改如下:
[cpp]  
 
  1. consumer函数中:    /* 
  2.            成功占有互斥量,接下来循环检查缓冲区是否为空. 这个while要特别 
  3.            说明一下,单个pthread_cond_wait功能很完善,为何这里要有一个 
  4.            while (cons_pos >=prod_pos)呢?因为pthread_cond_wait里的线程可 
  5.            能会被意外唤醒返回了,mutex又被重新lock(不一定是本线程,有可能 
  6.            是其他线程),此时情况是cons_pos >= prod_pos ,表示缓冲区空了, 
  7.            不能再取product,也没有product可取。这不是我们想要的结果。应该 
  8.            让线程继续进入pthread_cond_wait   
  9.         */  
  10.   
  11.         while( cons_pos == prod_pos ){  
  12.             printf( "consumer wait not empty.\n");  
  13.             /* 
  14.                pthread_cond_wait会先解除之前的pthread_mutex_lock锁定的 
  15.                mutex,然后阻塞在等待对列里休眠,直到再次被唤醒(大多数 
  16.                情况下是等待的条件成立而被唤醒,唤醒后,该进程会先锁定先 
  17.                pthread_mutex_lock(&mutex);,再读取资源,用这个流程是比较 
  18.                清楚的 block-->unlock-->cond_wait() return-->lock   
  19.             */  
  20.             pthread_cond_wait( &notempty , &mutex );  
  21.         }  
[cpp]  
 
  1. produer函数中: /* 
  2.            成功占有互斥量,接着循环检查缓冲区是不是满了, 
  3.         */  
  4.         while( ( prod_pos + 1 ) % BUFSIZE == cons_pos ){  
  5.             //缓冲区满了  
  6.             printf( "producer wait not full.\n");  
  7.             pthread_cond_wait( &notfull , &mutex ); //等待条件满足  
  8.         }  
这样来看结果就对了
注:关于生产者和消费者操作缓冲区的操作,大家下来仔细揣摩一下,搞懂
[cpp]  
 
  1. while( ( prod_pos + 1 ) % BUFSIZE == cons_pos )  
[cpp]  
 
  1. while( cons_pos == prod_pos )  
这两个循环条件,大家就明白缓冲区操作了。

四、信号量

关于信号量的说明见http://blog.csdn.net/big_bit/article/details/51471811,我主要写了一个程序来学习信号量,这个例子是两个相同的线程同时向屏幕输出数据,我们可以看到如何使用信号量来使两个进程协调工作,使同一时间只有一个线程可以向屏幕输出数据。

[cpp]  
 
  1. #include <stdio.h>  
  2. #include <sys/types.h>  
  3. #include <sys/sem.h>  
  4. #include <sys/ipc.h>  
  5. #include <sys/stat.h>  
  6. #include <unistd.h>  
  7. #include <fcntl.h>  
  8. #include <pthread.h>  
  9.   
  10. /* 利用信号量实现PV操作,来达到同步 */  
  11.   
  12. //#define IPC_PATH "."  
  13. //#define IPC_ID 0x1  
  14.   
  15. union semun{  
  16.     int val;  
  17.     struct semid_ds *buf;  
  18.     unsigned short *array;  
  19. };  
  20. struct my_arg{      //在线程的例程函数里面要调用pv函数,所以给例程传递结构体  
  21.     char mes;  
  22.     int semid;  
  23.     unsigned short nsems;  
  24. };  
  25. static int creat_sem( const int nsems);  
  26. static void del_sem( const int semid);  
  27. static int semaphore_p( const int semid , const unsigned short nsems );  
  28. static int semaphore_v( const int semid , const unsigned short nsems );  
  29. static void *pr_char( void *);      //print char  
  30.   
  31. int main(int argc , char *argv[]){  
  32.     int sem_id;  
  33.     int nsems = 1;  
  34.     //static int count = 0;  
  35.     char mes1 = 'X';  
  36.     char mes2 = 'O';  
  37.     pthread_t thrd1,thrd2;  
  38.     struct my_arg my_arg1,my_arg2;  
  39.       
  40.     /* init semaphore */  
  41.     sem_id = creat_sem( nsems );  
  42.       
  43.     my_arg1.semid=my_arg2.semid = sem_id;  
  44.     my_arg1.nsems = my_arg2.nsems = (unsigned short )(nsems-1);  
  45.     my_arg1.mes = mes1;  
  46.     my_arg2.mes = mes2;  
  47.       
  48.     //create 2 thread  
  49.     pthread_create( &thrd1 , NULL , pr_char , (void *)&my_arg1);  
  50.     pthread_create( &thrd2 , NULL , pr_char , (void *)&my_arg2);      
  51.       
  52.     /* print data by sem*/  
  53.   
  54.     pthread_join( thrd1 , NULL );  
  55.     pthread_join( thrd2 , NULL);  
  56.     /* delete semaphore */  
  57.     if( argc > 1){  
  58.         sleep(3);  
  59.         del_sem( sem_id);  
  60.     }  
  61.     return 0;  
  62. }  
  63.   
  64. static int creat_sem( const  int nsems ){  
  65.     key_t key ;  
  66.     union semun arg;  
  67.     int sem_id;  
  68.   
  69.     /* creat key */  
  70. //  key = ftok( IPC_PATH , IPC_ID );  
  71. //  key = ftok( "." , 1 );  
  72. //  if( key == -1 )  
  73. //      oops( "ftok" );  
  74.     if( -1 ==( sem_id = semget( (key_t)1324 , nsems , 0666 | IPC_CREAT) ) )  
  75.         oops( "semget " );  
  76.       
  77.     /* set sem */  
  78.     arg.val = nsems;  
  79.     if( -1 == semctl( sem_id , 0 , SETVAL , arg) )  
  80.         oops( " semctl " );  
  81.   
  82.     return sem_id;  
  83. }  
  84.   
  85. /* sem delete */  
  86. static void del_sem( const int semid ){  
  87.     union semun arg;  
  88.   
  89.     if( -1 == semctl( semid , 0 , IPC_RMID , arg) )  
  90.         oops( "delete semaphore");  
  91.   
  92. }  
  93. /* 信号量P操作 */  
  94. static int semaphore_p( const int semid , const unsigned short nsems ){  
  95.     struct sembuf sem_buf;  
  96.   
  97.     //set sembuf  
  98.     //sem_buf.sem_num = ( unsigned short )(nsems-1);  
  99.     //sem_buf.sem_num  = 0;  
  100.     sem_buf.sem_num = nsems;  
  101.     sem_buf.sem_op = -1 ;           //P:get resource which sem control  
  102.     sem_buf.sem_flg = SEM_UNDO;  
  103.   
  104.     if( -1 == semop( semid , &sem_buf , 1 ) )  
  105.         oops( " semop ");  
  106.     return 1;  
  107. }  
  108.   
  109. /* 信号量V操作 */  
  110. static int semaphore_v( const int semid , const unsigned short nsems ){   
  111.     struct sembuf sem_buf;  
  112.   
  113.     //set sembuf  
  114.     //sem_buf.sem_num = ( unsigned short )(nsems-1);  
  115.     //sem_buf.sem_num = 0;  
  116.     sem_buf.sem_num = nsems;  
  117.     sem_buf.sem_op = 1 ;            //P:release resource which sem control  
  118.     sem_buf.sem_flg = SEM_UNDO;  
  119.   
  120.     if( -1 == semop( semid , &sem_buf , 1 ) )  
  121.         oops( " semop ");  
  122.     return 1;  
  123. }  
  124.   
  125. static void *pr_char(void *arg){  
  126.     static int count = 0;  
  127.     struct my_arg *parg = (struct my_arg*)arg;  
  128.     printf( " semid = %d\n" , parg->semid);  
  129.     printf( " nsems = %d\n" , parg->nsems);  
  130.     while(count++ < 10){  
  131.     //进入临界区    
  132.     if(!semaphore_p( parg->semid , parg->nsems))    
  133.         oops( " semaphore_p" );  
  134.     printf("%c" ,  parg->mes);  //向屏幕中输出数据   
  135.           
  136.     fflush(stdout);     //清理缓冲区,然后休眠随机时间  
  137.     sleep(rand() % 3);    
  138.           
  139.     printf("%c" , parg->mes);  //离开临界区前再一次向屏幕输出数据   
  140.     fflush(stdout);    
  141.           
  142.     if(!semaphore_v(parg->semid , parg->nsems))   
  143.         oops( "semaphore" );  //离开临界区,休眠随机时间后继续循环   
  144.     sleep(rand() % 2);    
  145.     }  
  146.     printf("print finished.");  
  147. }  

你可能感兴趣的文章
搞懂分布式技术6:Zookeeper典型应用场景及实践
查看>>
搞懂分布式技术10:LVS实现负载均衡的原理与实践
查看>>
搞懂分布式技术11:分布式session解决方案与一致性hash
查看>>
搞懂分布式技术12:分布式ID生成方案
查看>>
搞懂分布式技术13:缓存的那些事
查看>>
搞懂分布式技术14:Spring Boot使用注解集成Redis缓存
查看>>
搞懂分布式技术15:缓存更新的套路
查看>>
搞懂分布式技术16:浅谈分布式锁的几种方案
查看>>
搞懂分布式技术17:浅析分布式事务
查看>>
搞懂分布式技术18:分布式事务常用解决方案
查看>>
搞懂分布式技术19:使用RocketMQ事务消息解决分布式事务
查看>>
搞懂分布式技术20:消息队列因何而生
查看>>
搞懂分布式技术21:浅谈分布式消息技术 Kafka
查看>>
后端技术杂谈1:搜索引擎基础倒排索引
查看>>
后端技术杂谈2:搜索引擎工作原理
查看>>
后端技术杂谈3:Lucene基础原理与实践
查看>>
后端技术杂谈4:Elasticsearch与solr入门实践
查看>>
后端技术杂谈5:云计算的前世今生
查看>>
后端技术杂谈6:白话虚拟化技术
查看>>
后端技术杂谈7:OpenStack的基石KVM
查看>>