8

Sto cercando un'implementazione buffer circolare (o pseudo) in C con le seguenti caratteristiche:Guardando per l'attuazione buffer circolare a destra in C

  • produttore modello multiplo singolo consumatore (MPSC)
  • blocchi di consumo sulla vuoti
  • produttori bloccare sul pieno
  • (mi aspetto di alta contesa)

Finora ho' gratis lock- Ho lavorato solo con i buffer SPSC - uno per produttore - ma vorrei evitare la rotazione continua del consumatore per verificare la presenza di nuovi dati su tutti i suoi buffer di input (e forse per eliminare alcuni thread di marshalling nel mio sistema).

Sviluppo per Linux su macchine Intel.

+0

Non so in quale ambiente ci si trova, ma in Win32 è possibile utilizzare WaitForMultipleObjects per fare in modo che il consumatore attenda tutte le code contemporaneamente senza girare. –

+1

Mi dispiace, non ho detto che lavoro principalmente su Linux – ziu

+1

Nel caso in cui non si ottenga una risposta reale, sarà semplicissimo sincronizzare più buffer con questo: http://neosmart.net/ blog/2011/waitformultipleobjects-and-win32-events-per-linux-e-read-write-locks-for-windows/ –

risposta

2

Penso di avere quello che stai cercando. Si tratta di un'implementazione del buffer circolare senza blocco che blocca il produttore/consumatore. Hai solo bisogno di accedere alle primitive atomiche - in questo esempio userò le funzioni di gcc sync.

Ha un bug noto: se si supera il buffer di oltre il 100% non è garantito che la coda rimanga FIFO (continuerà a elaborarli tutti alla fine).

Questa implementazione si basa sulla lettura/scrittura degli elementi del buffer come un'operazione atomica (che è praticamente garantita per i puntatori)

struct ringBuffer 
{ 
    void** buffer; 
    uint64_t writePosition; 
    size_t size; 
    sem_t* semaphore; 
} 

//create the ring buffer 
struct ringBuffer* buf = calloc(1, sizeof(struct ringBuffer)); 
buf->buffer = calloc(bufferSize, sizeof(void*)); 
buf->size = bufferSize; 
buf->semaphore = malloc(sizeof(sem_t)); 
sem_init(buf->semaphore, 0, 0); 

//producer 
void addToBuffer(void* newValue, struct ringBuffer* buf) 
{ 
    uint64_t writepos = __sync_fetch_and_add(&buf->writePosition, 1) % buf->size; 

    //spin lock until buffer space available 
    while(!__sync_bool_compare_and_swap(&(buf->buffer[writePosition]), NULL, newValue)); 
    sem_post(buf->semaphore); 
}  

//consumer 
void processBuffer(struct ringBuffer* buf) 
{ 
    uint64_t readPos = 0; 
    while(1) 
    { 
     sem_wait(buf->semaphore); 

     //process buf->buffer[readPos % buf->size] 
     buf->buffer[readPos % buf->size] = NULL; 
     readPos++; 
    } 
} 
+1

Questo esempio è interessante. Fammi vedere se ho capito bene: l'incremento dell'indice e la scrittura sono senza blocco, mentre si utilizza un blocco nella forma di un semaforo per bloccare il consumatore quando non c'è nulla da consumare. Non capisco come sia possibile sovraccaricare questo buffer, però. Inoltre, hai usato questa struttura in un sistema in cui ti aspettavi un tempo di rotazione molto breve? Come è stato l'impatto del ciclo di rotazione? – ziu

+0

È possibile eseguire l'overflow del buffer scrivendo i dati più rapidamente di quanto possa essere elaborato. Alla fine l'indice di scrittura verrà in giro e "passerà" il lettore. A questo punto, lo scrittore deve attendere in un blocco di selezione mentre attende che il lettore raggiunga (altrimenti si sovrascriverebbe i dati nel buffer). Il bug si verifica se si eccede la coda di oltre il 100%. In questo scenario, hai più di 1 thread in attesa in uno spinlock affinché il buffer diventi disponibile. Non è garantito quale dei thread scriverà prima in coda. – charliehorse55

+2

Non sarebbe più semplice riscrivere il ciclo precedente come nel seguito? 'while (1) { while (__ sync_bool_compare_and_swap (& (buf-> buffer [writePosition]), NULL, newValue) == false); sem_post (buf-> semaforo); interruzione; } – ziu

4

Vedi liblfds, un free-blocco MPMC ringbuffer. Non bloccherà affatto le strutture di dati senza lock — non tendono a farlo, perché il punto di non blocco è di evitare il blocco; è necessario gestirlo quando la struttura dati viene restituita con un NULL — restituisce NULL se si tenta di leggere su vuoto, ma non corrisponde al requisito quando si scrive su pieno; qui, getterà via l'elemento più antico e te lo darà per la tua scrittura.

Tuttavia, sarebbe necessario solo una piccola modifica per ottenere tale comportamento.

Ma potrebbe esserci una soluzione migliore. La parte difficile di un ringbuffer è quando si ottiene pieno il più vecchio elemento precedente e riutilizzandolo. Non hai bisogno di questo. Penso che si possa prendere il buffer circolare con memoria SPSC solo circolare e riscriverlo usando operazioni atomiche. Quello sarà un lotto più performante che il buffer di chiamata MPMC in liblfds (che è una combinazione di una coda e uno stack).

+0

Finora, la mia implementazione SPSC è abbastanza banale: si basa solo sui contatori di posizione locali e globali per sincronizzare scrittore e lettore (i contatori locali sono lì per il batching degli elementi push/pull e per ridurre la condivisione falsa). Sono presenti variabili di condizione per ridurre la rotazione (se non sono disponibili dati non c'è nient'altro da fare/se la destinazione è piena contropressione è inevitabile). Senza adeguate barriere di memoria, la mia implementazione non funzionerà su un'altra architettura. Potresti per favore dettagli sul tuo ultimo punto? Alla fine, il buffer dell'anello sarà sempre SPSC, giusto? – ziu

+1

C'è un noto buffer circolare SPSC, usato per esempio nel kernel linux, che usa solo le barriere della memoria, restituisce NULL quando il buffer è pieno o vuoto.Sto indovinando che potrebbe essere reso MPMC usando le operazioni atomiche. –