c++ - Single producer multiple consumer buffer pthread implementation using condition variables -
i'm writing naive implementation of single producer multiple consumer buffer pthreads , condition variables, using c++ list buffer. i'm not worried how fast code runs, want rid of errors.
the producer thread reads string file , insert end of buffer while each 1 of consumers reads beginning , put on different matrix. so, essencialy, have fifo queue has max size , first element can erased when consumers have read it.
here's important part of 3 functions of code:
producer:
void *feedbuffer(void *threadproducer){ //some declarations... while(!file->eof()) { pthread_mutex_lock(&mutex); while(*buffer_current_size == buffer_max_size) { // full // wait until elements consumed pthread_cond_wait(&can_produce, &mutex); } pthread_mutex_lock(&lock_buffer); *file >> temp.word; buffer->push_back(temp); (*buffer_current_size)++; pthread_mutex_unlock(&lock_buffer); pthread_cond_broadcast(&can_consume); pthread_mutex_unlock(&mutex); } file->close(); pthread_cond_broadcast(&can_consume); pthread_mutex_lock(&lock_buffer); buffer_current_size->store(-1); //end of read signal pthread_mutex_unlock(&lock_buffer); pthread_exit(null); }
buffer controller , worker threads caller:
void *main_consumer(void *threadconsumer) //consumer caller , buffer controll { //some declarations... for(int j=0; j<numthreads; j++) { pthread_create(&threads[j],&attr,worker,(void *) &workerargs[j]); } //buffer controller pthread_mutex_lock(&lock_buffer); while(*buffer_current_size!=-1){ //while read hasn't ended pthread_mutex_unlock(&lock_buffer); //unlock , lock again let other threads hold lock while pthread_mutex_lock(&lock_buffer); it=buffer->begin(); //get first element of buffer if(it->cnt == numthreads){ buffer->pop_front(); //delete first element (*buffer_current_size)--; //decrease size pthread_cond_signal(&can_produce); //producer can produce } } pthread_mutex_unlock(&lock_buffer); for(int i=0; i<numthreads; i++) { pthread_join(threads[i],null); } }
worker:
void *worker(void *threadwoker) { //some declarations... pthread_mutex_lock(&lock_buffer); //lock begin it=buffer->begin(); while(!(*buffer_current_size==-1 && it==args->buffer->end())) { pthread_mutex_unlock(&lock_buffer); //insert matrix... pthread_mutex_lock(&lock_buffer); //unified lock , cnt, solving issue (it->cnt)++; it++; pthread_mutex_unlock(&lock_buffer); pthread_mutex_lock(&mutex); while (*buffer_current_size==0) { //wait if buffer empty pthread_cond_wait(&can_consume, &mutex); } pthread_mutex_unlock(&mutex); pthread_mutex_lock(&lock_buffer); //locking while arguments } pthread_mutex_unlock(&lock_buffer); pthread_exit(null); }
as can see, used int counter on each element of buffer check if worker threads have read it. when condition becomes true, buffer controller erases first element queue. bounded locks, guarantee data integrity.
the problem is, code doesn't work, either seg fault or mutex error. can enlighten ideas why?
firstly, not clear data structures being protected each mutex. suggest initial implementation @ least, simplify down 1 mutex protecting of shared state - buffer size counter, buffer itself, , counter in work items.
as specific issues:
- the producer should re-test condition after
pthread_cond_wait()
(it shouldwhile ()
loop ratherif ()
statement); - when producer finishes, accesses
*buffer_current_size
without lock held , doesn't signal waiting consumers; - the buffer controller accesses
*buffer_current_size
without lock held; - the worker accesses
buffer->begin()
,buffer->end()
without lock held; - the worker accesses
*buffer_current_size
without lock held; - the worker calls
pthread_mutex_trylock(&mutex)
without checking result, means access shared state , unlock mutex without having locked; - the worker needs re-check condition it's waiting after calling
pthread_cond_wait()
; - the worker accesses iterator
it
without lock held, problematic because of other threads modifying underlying::list
- since thread has incremented counter, buffer controller have deleted item iterator points to, means can't increment iterator.
Comments
Post a Comment