c++ - Single producer multiple consumer buffer pthread implementation using condition variables -


i'm writing naive implementation of single producer multiple consumer buffer pthreads , condition variables, using c++ list buffer. i'm not worried how fast code runs, want rid of errors.

the producer thread reads string file , insert end of buffer while each 1 of consumers reads beginning , put on different matrix. so, essencialy, have fifo queue has max size , first element can erased when consumers have read it.

here's important part of 3 functions of code:

producer:

void *feedbuffer(void *threadproducer){ //some declarations... while(!file->eof()) {        pthread_mutex_lock(&mutex);      while(*buffer_current_size == buffer_max_size) { // full         // wait until elements consumed         pthread_cond_wait(&can_produce, &mutex);     }      pthread_mutex_lock(&lock_buffer);          *file >> temp.word;         buffer->push_back(temp);         (*buffer_current_size)++;     pthread_mutex_unlock(&lock_buffer);      pthread_cond_broadcast(&can_consume);     pthread_mutex_unlock(&mutex); } file->close(); pthread_cond_broadcast(&can_consume);  pthread_mutex_lock(&lock_buffer);     buffer_current_size->store(-1); //end of read signal pthread_mutex_unlock(&lock_buffer); pthread_exit(null); } 

buffer controller , worker threads caller:

void *main_consumer(void *threadconsumer) //consumer caller , buffer controll {   //some declarations... for(int j=0; j<numthreads; j++) {       pthread_create(&threads[j],&attr,worker,(void *) &workerargs[j]); }  //buffer controller pthread_mutex_lock(&lock_buffer);  while(*buffer_current_size!=-1){ //while read hasn't ended      pthread_mutex_unlock(&lock_buffer); //unlock , lock again let other threads hold lock while     pthread_mutex_lock(&lock_buffer);      it=buffer->begin(); //get first element of buffer     if(it->cnt == numthreads){         buffer->pop_front(); //delete first element         (*buffer_current_size)--; //decrease size          pthread_cond_signal(&can_produce); //producer can produce     } } pthread_mutex_unlock(&lock_buffer); for(int i=0; i<numthreads; i++) {     pthread_join(threads[i],null); } } 

worker:

void *worker(void *threadwoker) { //some declarations... pthread_mutex_lock(&lock_buffer); //lock begin it=buffer->begin(); while(!(*buffer_current_size==-1 && it==args->buffer->end())) {     pthread_mutex_unlock(&lock_buffer);     //insert matrix...      pthread_mutex_lock(&lock_buffer); //unified lock , cnt, solving issue         (it->cnt)++;         it++;     pthread_mutex_unlock(&lock_buffer);      pthread_mutex_lock(&mutex);     while (*buffer_current_size==0) { //wait if buffer empty          pthread_cond_wait(&can_consume, &mutex);     }     pthread_mutex_unlock(&mutex);       pthread_mutex_lock(&lock_buffer); //locking while arguments } pthread_mutex_unlock(&lock_buffer); pthread_exit(null); } 

as can see, used int counter on each element of buffer check if worker threads have read it. when condition becomes true, buffer controller erases first element queue. bounded locks, guarantee data integrity.

the problem is, code doesn't work, either seg fault or mutex error. can enlighten ideas why?

firstly, not clear data structures being protected each mutex. suggest initial implementation @ least, simplify down 1 mutex protecting of shared state - buffer size counter, buffer itself, , counter in work items.

as specific issues:

  • the producer should re-test condition after pthread_cond_wait() (it should while () loop rather if () statement);
  • when producer finishes, accesses *buffer_current_size without lock held , doesn't signal waiting consumers;
  • the buffer controller accesses *buffer_current_size without lock held;
  • the worker accesses buffer->begin() , buffer->end() without lock held;
  • the worker accesses *buffer_current_size without lock held;
  • the worker calls pthread_mutex_trylock(&mutex) without checking result, means access shared state , unlock mutex without having locked;
  • the worker needs re-check condition it's waiting after calling pthread_cond_wait();
  • the worker accesses iterator it without lock held, problematic because of other threads modifying underlying ::list- since thread has incremented counter, buffer controller have deleted item iterator points to, means can't increment iterator.

Comments

Popular posts from this blog

c - Bitwise operation with (signed) enum value -

xslt - Unnest parent nodes by child node -

python - Healpy: From Data to Healpix map -