2013-04-19 19 views
5

Ho un programma in cui sto cercando di implementare un'impostazione a più produttori e a più utenti. Ho un codice che sembra funzionare bene quando ho un consumatore e più produttori, ma l'introduzione di più thread di consumo sembra sollevare alcuni problemi dispari.Problema di thread di consumo multiplo di più produttori

Ecco quello che ho in questo momento:

#include <stdio.h> 
#include <pthread.h> 
#include <unistd.h> 
#include <stdlib.h> 

#define MAX 10 

typedef struct travs { 
    int id; 
    int numBags; 
    int arrTime; 
    struct travs *next; 
} travs; 


travs *queue; 
//travs *servicing[MAX]; 

int produced; // The total # of produced in the queue 

pthread_mutex_t queue_lock; 
//pthread_mutex_t staff_lock; 
pthread_cond_t ct, cs; 

int CheckIn(){ 
    sleep(1); 
    if(produced != 0) return 1; 
    else return 0; 
} 



void *producerThread(void *args){ 
    travs *traveler = (travs *)args; 
    // Acquire the mutex 
    pthread_mutex_lock(&queue_lock); 
    produced++; 
// pthread_cond_signal(&cs); 
    pthread_cond_wait(&ct, &queue_lock); 
    printf("Producer %d is now checked in at time %d.\n", queue->id, (1+queue- >arrTime)); 
    queue = queue->next; 
    pthread_mutex_unlock(&queue_lock); 

    return; 
}  

int Producer(int id, int numBags, int arrTime){ 

    int ret; 
    pthread_t ttid; 
    travs *traveler = malloc(sizeof(travs)); 
    traveler->id = id; 
    traveler->numBags = numBags; 
    traveler->arrTime = arrTime; 
    sleep(arrTime); 
    pthread_mutex_lock(&queue_lock); 
    if(queue != NULL) { 
     travs *check_in = malloc(sizeof(travs)); 
     check_in = queue; 
     while(check_in->next != NULL){ 
      check_in = check_in->next; 
     } 
     check_in->next = traveler; 
    } 
    else { queue = traveler; } 
    pthread_mutex_unlock(&queue_lock); 
    // Create a new traveler thread 
    ret = pthread_create(&ttid, NULL, producerThread, (void *)traveler); 

    // Check if thread creation was successful 
    if(ret == 0) { 
     printf("Producer %d has entered the check-in line at time %d; s/he is at  position %d and has %d bags.\n", id, arrTime, produced, numBags); 
     pthread_cond_signal(&cs); 
     return 0; 
    } 
    else return -1; 

} 


void *consumerThread(void *arg){ 

    int i = 0; // travelers serviced 
    char *name = (char *)arg; 
    while(1) { // run iteratively 

     // If 20 producers have been served, the consumer's work is done. 
     if(i == 20) { 
      printf("Consumer %s's service has completed!\n", name); 
       pthread_exit(NULL); 
      } 
     // Sleep for 10s if 5 travelers have been checked in 
     if (((i+1) % 5) == 0) { 
       // Wake up sleeping travelers 
       printf("Consumer %s is taking a break.\n", name); 
       sleep(2); 
       printf("Consumer %s's break is over.\n", name); 
     } 

     if(CheckIn()) { 
      pthread_mutex_lock(&queue_lock); 
      int j = 1; 
        pthread_cond_wait(&cs, &queue_lock); 
        printf("Producer %d presents ticket to consumer  %s.\n", queue->id, name); 
        printf("Consumer %s gives boarding pass to producer  %d.\n", name, queue->id); 
        while(j <= queue->numBags){ 
         printf("Consumer %s checks in bag %d for  producer %d; baggage tag is _X_.\n", name, j, queue->id); 
         j++; 
       } 
      // Signal producer being serviced that their check in is complete. 
      i++; 
      pthread_mutex_unlock(&queue_lock); 
      produced--; 
      pthread_cond_signal(&ct); 
     } 
    sleep(3); 
    } 
} 

int Consumer(char *Name) { 

    sleep(5); 
    int ret; 
    pthread_t stid; 
    // Create a staff thread 

    ret = pthread_create(&stid, NULL, consumerThread, (void *)Name); 
    // Acquire the lock 
    if(ret == 0) { 
     printf("Producer %s's service has begun!\n", Name); 
     return 0; 
    } 
    else return -1; 
} 

int main() { 
    int ret = 0; 
    char *staff_name = malloc(sizeof(char)); 
    int staff_check = 0; 
    int trav_check = 0; 
    int id; 
    int bagnum; 
    int travtime; 
    FILE *consumer_fp; 
    FILE *producer_fp; 
    queue = malloc(sizeof(travs)); 
    queue = NULL; 
    /*while(ret < 10){ 
     servicing[ret] = malloc(sizeof(travs)); 
     servicing[ret] = NULL; 
    }*/ 

    // Initilize mutexes 
    pthread_mutex_init(&queue_lock, NULL); 
    //pthread_mutex_init(&staff_lock, NULL); 

    // Initialize condition variables 
    pthread_cond_init(&ct, NULL); 
    pthread_cond_init(&cs, NULL); 

    // Open the file so we can start reading from it 

    consumer_fp = fopen("staff.txt", "r"); 
    producer_fp = fopen("travelers.txt", "r"); 

    staff_check = fscanf(consumer_fp, "%s", staff_name); 
    trav_check = fscanf(producer_fp, "%d %d %d", &id, &bagnum, &travtime); 
    while(1){ 

     K: 
     while(staff_check == 1) { 
      Consumer(staff_name); 
      staff_check = fscanf(consumer_fp, "%s", staff_name); 
      goto L; 
     } 
     L: 
     while(trav_check == 3) { 
      Producer(id, bagnum, travtime); 
      trav_check = fscanf(producer_fp, "%d %d %d", &id, &bagnum,  &travtime); 
      goto K; 
     } 

    pthread_exit(NULL); 
    } 

} 

In questo contesto, ogni thread produttore vive solo per un breve periodo di tempo prima di tornare, e non fa alcuna reale calcolo in sé oltre ad aggiungere un nuovo elemento alla coda globale e poche righe di output opportunamente programmate.

Tuttavia, quando introduco più produttori, solo l'ultimo thread del produttore fa qualcosa.

Da quello che posso supporre, mi serve il seguente:

i) code separate per i produttori attesa di essere controllato-in e produttori che sono attualmente oggetto di analisi-in (commentate come travs * manutenzione [MAX] sopra)

ii) Un mutex separato per i consumatori.

Tuttavia, non sono sicuro di come implementarlo. Questa è l'idea che avevo in mente:

  1. CheckIn() un filo produttore e copia * * coda per la manutenzione [i] (in filo dei consumatori).

  2. Imposta coda = coda-> successiva (nella sequenza del produttore).

Ma, come posso garantire che quando copio * coda oltre che non ha già fatto un passo avanzato? Posso segnalare un thread in attesa con un blocco diverso da quello attualmente in attesa? E, cosa ancora più importante, in che modo avrei diverse fasce di consumatori che elaborano thread di viaggiatori diversi?

Qualsiasi aiuto sarebbe molto apprezzato!

+0

'travs * traveler = malloc (sizeof (travs)); traveler = (travs *) args; 'è una perdita di memoria ... – Sebivor

+0

Ah, grazie per la cattura. Aggiungerò una linea gratuita lì. – user991710

risposta

3

Utilizzare una coda.

Scrivere due funzioni, una per aggiungere un elemento esistente alla coda e l'altro per rimuovere un elemento dalla coda. Non utilizzare alcun blocco in queste funzioni. Provali in una singola applicazione a thread.

Quindi scrivere due wrapper per tali due add-and remove-funzioni. Quei wrapper dovrebbero assumere un mutex addizionale come argomento. Blocca questo mutex nei wrapper prima di chiamare la funzione add- o remove-e quindi sbloccare il mutex.

Scrivere la funzione thread produttore creando un nuovo elemento e chiamando il componente aggiuntivo-wrapper. Scrivi la funzione thread consumatore chiamando il wrapper remove-item e destorying l'elemento rimosso.

Impostare la funzione main() che dichiara e inizializza il mutex, quindi andare a creare varie istanze di produttori e consumatori, utilizzando pthread_create(). Passa il mutex come argomento alle funzioni del thread.

+0

Grazie per la risposta graduale. Ho riscritto l'intero programma partendo da piccoli passaggi e farlo funzionare correttamente per più produttori e un singolo consumatore. Ho ancora problemi con più utenti che non riesco a diagnosticare, ma potrei postarlo come una domanda diversa. In ogni caso, poiché questo mi ha portato a una soluzione semi-lavorativa, la accetterò come risposta. Grazie. – user991710

4

Come ho già detto, questo è un problema di memoria:

travs *traveler = malloc(sizeof(travs)); 
traveler = (travs *)args; 

Non ho intenzione di andare nei dettagli su "che cosa c'è di male perdite di memoria?". Se vuoi una risposta, chiedi a Google questa domanda. Probabilmente intendevi: travs *traveler = args;.


if(queue != NULL) { 
    travs *check_in = malloc(sizeof(travs)); 
    check_in = queue; 
    while(check_in->next != NULL){ 
     check_in = check_in->next; 
    } 
    check_in->next = traveler; 
} 
else { queue = traveler; } 

Perdita di memoria messo da parte, perché si coda mutex custodita in precedenza in altre funzioni, mentre non c'è mutex guardia affatto in questo codice? Sembra che tu abbia perso il punto dei mutex. Il tuo codice corre, qui.

Forse pthread_rwlock_t s sarebbe più adatto per questo tipo di codice.

+0

Grazie per aver notato la perdita e la mancanza di un mutex in quella sezione del codice, ma sfortunatamente questo non risponde ancora alla domanda sul design di più utenti! – user991710