10

Eseguo un modello produttore/consumatore piuttosto tipico su diversi compiti.È un lavoro per TPL Dataflow?

Task1: legge batch di byte [] da file binari e avvia una nuova attività per ogni raccolta di matrici di byte. (l'operazione è in batch per scopi di gestione della memoria).

Attività 2-n: Quelle sono attività di lavoro e ciascuna opera nella raccolta passata (da Attività1) di matrici di byte e deserializza gli array di byte, li ordina in base a determinati criteri e quindi memorizza una raccolta di oggetti risultanti (ogni array di byte deserializza in tale oggetto) in un dizionario concorrente.

Attività (n + 1) Ho scelto un dizionario concorrente perché il lavoro di questa attività è quello di unire le raccolte memorizzate nel dizionario simultaneo nello stesso ordine rispetto a come sono state originate da Task1. Lo raggiungo passando un collectionID (è di tipo int e incrementato per ogni nuova raccolta all'interno di Task1) fino in fondo da Task1 a questa attività. Questa attività controlla essenzialmente se il successivo ID raccolta previsto è già memorizzato nel dizionario concorrente e se sì, lo estrae, lo aggiunge a una coda finale e controlla la successiva raccolta nel dizionario concorrente.

Ora, da quello che ho letto e dai video che ho visto mi sembra che TPL Dataflow possa essere il candidato perfetto per tale modello produttore/consumatore. Non riesco proprio a concepire un design e quindi a iniziare perché non ho mai lavorato con TPL Dataflow. In termini di throughput e latenza, questa libreria è all'altezza del compito? Attualmente elabora array da 2,5 milioni di byte e quindi oggetti al secondo nelle raccolte risultanti. TPL Dataflow può aiutare a semplificare? Sono particolarmente interessato alla risposta alla seguente domanda: È possibile che TPL Dataflow mantenga l'ordine dei batch di raccolta dall'attività 1 quando si generano le attività di lavoro e le si uniscono nuovamente una volta che le attività dell'operatore hanno svolto il proprio lavoro? Ottimizza le cose? Avendo profilato l'intera struttura, ritengo che ci sia un bel po 'di tempo perso a causa della rotazione e di troppe collezioni concorrenti.

Qualche idea, pensieri?

risposta

12

MODIFICA: Si è verificato un errore. TransformBlockfa restituisce gli articoli nello stesso ordine in cui sono entrati, anche se è configurato per il parallelismo. Per questo motivo, il codice nella mia risposta originale è completamente inutile e al suo posto è possibile utilizzare il normale TransformBlock.


risposta originale:

Per quanto ne so solo una costrutto parallelismo .NET supporta restituzione degli articoli trattati nell'ordine in cui è venuto in: PLINQ con AsOrdered(). Ma mi sembra che PLINQ non si adatti a quello che vuoi.

TPL Dataflow, d'altra parte, si adatta bene, penso, ma non ha un blocco che sosterrebbe parallelismo e restituire gli articoli in ordine allo stesso tempo (TransformBlock supporta entrambi, ma non al contemporaneamente). Fortunatamente, i blocchi Dataflow sono stati progettati tenendo presente la componibilità, quindi possiamo creare il nostro blocco che lo fa.

Ma prima, dobbiamo capire come ordinare i risultati. Usare un dizionario concorrente, come suggerito tu, insieme a qualche meccanismo di sincronizzazione, funzionerebbe sicuramente. Ma penso che ci sia una soluzione più semplice: utilizzare una coda di Task s. Nell'attività di output, deseleziona un Task, attendi il completamento (in modo asincrono) e quando lo fa, invii i suoi risultati lungo.Abbiamo ancora bisogno di sincronizzazione per il caso in cui la coda è vuota, ma possiamo ottenerla gratuitamente se scegliamo quale coda usare in modo intelligente.

Quindi, l'idea generale è questa: quello che stiamo scrivendo sarà un IPropagatorBlock, con qualche input e qualche output. Il modo più semplice per creare un numero personalizzato IPropagatorBlock consiste nel creare un blocco che elabora l'input, un altro blocco che produce i risultati e trattarli come uno utilizzando DataflowBlock.Encapsulate().

Il blocco di input dovrà elaborare gli elementi in arrivo nell'ordine corretto, quindi non vi è alcuna parallelizzazione. Creerà un nuovo Task (in realtà, un TaskCompletionSource, in modo che possiamo impostare il risultato dello Task in seguito), aggiungerlo alla coda e quindi inviare l'elemento per l'elaborazione, insieme ad un modo per impostare il risultato corretto Task . Perché non abbiamo bisogno di collegare questo blocco a nulla, possiamo usare uno ActionBlock.

Il blocco di uscita dovrà prendere dalla coda, attendere in modo asincrono e quindi inviarli. Ma poiché tutti i blocchi hanno una coda incorporata in essi, e i blocchi che accettano i delegati hanno un'asincrona attesa integrata, questo sarà molto semplice: new TransformBlock<Task<TOutput>, TOutput>(t => t). Questo blocco funzionerà sia come coda che come blocco di output. Per questo motivo, non dobbiamo gestire alcuna sincronizzazione.

L'ultimo pezzo del puzzle è in realtà l'elaborazione degli articoli in parallelo. Per questo, possiamo usare un altro ActionBlock, questa volta con il set MaxDegreeOfParallelism. Prenderà l'input, lo elaborerà e imposterà il risultato del corretto Task nella coda.

messi insieme, potrebbe assomigliare a questo:

public static IPropagatorBlock<TInput, TOutput> 
    CreateConcurrentOrderedTransformBlock<TInput, TOutput>(
    Func<TInput, TOutput> transform) 
{ 
    var queue = new TransformBlock<Task<TOutput>, TOutput>(t => t); 

    var processor = new ActionBlock<Tuple<TInput, Action<TOutput>>>(
     tuple => tuple.Item2(transform(tuple.Item1)), 
     new ExecutionDataflowBlockOptions 
     { 
      MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded 
     }); 

    var enqueuer = new ActionBlock<TInput>(
     async item => 
     { 
      var tcs = new TaskCompletionSource<TOutput>(); 
      await processor.SendAsync(
       new Tuple<TInput, Action<TOutput>>(item, tcs.SetResult)); 
      await queue.SendAsync(tcs.Task); 
     }); 

    enqueuer.Completion.ContinueWith(
     _ => 
     { 
      queue.Complete(); 
      processor.Complete(); 
     }); 

    return DataflowBlock.Encapsulate(enqueuer, queue); 
} 

Dopo tanto parlare, che è piuttosto una piccola quantità di codice, credo.

Sembra che ti interessino molto le prestazioni, quindi potresti dover perfezionare questo codice. Ad esempio, potrebbe essere opportuno impostare MaxDegreeOfParallelism del blocco processor su qualcosa come Environment.ProcessorCount, per evitare la sottoscrizione in eccesso. Inoltre, se la latenza è più importante della velocità effettiva, potrebbe essere opportuno impostare MaxMessagesPerTask dello stesso blocco su 1 (o un altro piccolo numero) in modo che al termine dell'elaborazione di un elemento venga immediatamente inviato all'output.

Inoltre, se si desidera limitare gli elementi in entrata, è possibile impostare BoundedCapacity di enqueuer.

+0

Wow un bel po 'di chicche che vorrei prima digerire e provare. Grazie mille per quelli, per lo meno meritato un upvote ;-) Lasciami giocare con quelle idee e torno. Queuing Tasks ha molto senso e mi chiedo perché non l'abbia fatto prima. –

+0

ok Passo un po 'di tempo a leggere il tuo post ea leggere su TPL Dataflow, qui accoppiano le domande per comprendere appieno la tua soluzione proposta: (1) perché proponi un IPropagatorBlock personalizzato e IDataflowBlock.Encapsulate() dato un Transformblock già esistente? (2) Non riesco a vedere come pensi di collegare i blocchi. Parli prima di ActionBlocks e poi di TransformBlocks. Da quello che ho letto, ActionBlock non è il "punto finale" dell'intera architettura? –

+1

1. Questo è spiegato nel secondo paragrafo: 'TransformBlock' non è in grado di elaborare gli articoli in parallelo e restituirli in ordine allo stesso tempo. Può fare uno di loro, ma non entrambi. – svick