MODIFICA: Si è verificato un errore. TransformBlock
fa restituisce gli articoli nello stesso ordine in cui sono entrati, anche se è configurato per il parallelismo. Per questo motivo, il codice nella mia risposta originale è completamente inutile e al suo posto è possibile utilizzare il normale TransformBlock
.
risposta originale:
Per quanto ne so solo una costrutto parallelismo .NET supporta restituzione degli articoli trattati nell'ordine in cui è venuto in: PLINQ con AsOrdered()
. Ma mi sembra che PLINQ non si adatti a quello che vuoi.
TPL Dataflow, d'altra parte, si adatta bene, penso, ma non ha un blocco che sosterrebbe parallelismo e restituire gli articoli in ordine allo stesso tempo (TransformBlock
supporta entrambi, ma non al contemporaneamente). Fortunatamente, i blocchi Dataflow sono stati progettati tenendo presente la componibilità, quindi possiamo creare il nostro blocco che lo fa.
Ma prima, dobbiamo capire come ordinare i risultati. Usare un dizionario concorrente, come suggerito tu, insieme a qualche meccanismo di sincronizzazione, funzionerebbe sicuramente. Ma penso che ci sia una soluzione più semplice: utilizzare una coda di Task
s. Nell'attività di output, deseleziona un Task
, attendi il completamento (in modo asincrono) e quando lo fa, invii i suoi risultati lungo.Abbiamo ancora bisogno di sincronizzazione per il caso in cui la coda è vuota, ma possiamo ottenerla gratuitamente se scegliamo quale coda usare in modo intelligente.
Quindi, l'idea generale è questa: quello che stiamo scrivendo sarà un IPropagatorBlock
, con qualche input e qualche output. Il modo più semplice per creare un numero personalizzato IPropagatorBlock
consiste nel creare un blocco che elabora l'input, un altro blocco che produce i risultati e trattarli come uno utilizzando DataflowBlock.Encapsulate()
.
Il blocco di input dovrà elaborare gli elementi in arrivo nell'ordine corretto, quindi non vi è alcuna parallelizzazione. Creerà un nuovo Task
(in realtà, un TaskCompletionSource
, in modo che possiamo impostare il risultato dello Task
in seguito), aggiungerlo alla coda e quindi inviare l'elemento per l'elaborazione, insieme ad un modo per impostare il risultato corretto Task
. Perché non abbiamo bisogno di collegare questo blocco a nulla, possiamo usare uno ActionBlock
.
Il blocco di uscita dovrà prendere dalla coda, attendere in modo asincrono e quindi inviarli. Ma poiché tutti i blocchi hanno una coda incorporata in essi, e i blocchi che accettano i delegati hanno un'asincrona attesa integrata, questo sarà molto semplice: new TransformBlock<Task<TOutput>, TOutput>(t => t)
. Questo blocco funzionerà sia come coda che come blocco di output. Per questo motivo, non dobbiamo gestire alcuna sincronizzazione.
L'ultimo pezzo del puzzle è in realtà l'elaborazione degli articoli in parallelo. Per questo, possiamo usare un altro ActionBlock
, questa volta con il set MaxDegreeOfParallelism
. Prenderà l'input, lo elaborerà e imposterà il risultato del corretto Task
nella coda.
messi insieme, potrebbe assomigliare a questo:
public static IPropagatorBlock<TInput, TOutput>
CreateConcurrentOrderedTransformBlock<TInput, TOutput>(
Func<TInput, TOutput> transform)
{
var queue = new TransformBlock<Task<TOutput>, TOutput>(t => t);
var processor = new ActionBlock<Tuple<TInput, Action<TOutput>>>(
tuple => tuple.Item2(transform(tuple.Item1)),
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
});
var enqueuer = new ActionBlock<TInput>(
async item =>
{
var tcs = new TaskCompletionSource<TOutput>();
await processor.SendAsync(
new Tuple<TInput, Action<TOutput>>(item, tcs.SetResult));
await queue.SendAsync(tcs.Task);
});
enqueuer.Completion.ContinueWith(
_ =>
{
queue.Complete();
processor.Complete();
});
return DataflowBlock.Encapsulate(enqueuer, queue);
}
Dopo tanto parlare, che è piuttosto una piccola quantità di codice, credo.
Sembra che ti interessino molto le prestazioni, quindi potresti dover perfezionare questo codice. Ad esempio, potrebbe essere opportuno impostare MaxDegreeOfParallelism
del blocco processor
su qualcosa come Environment.ProcessorCount
, per evitare la sottoscrizione in eccesso. Inoltre, se la latenza è più importante della velocità effettiva, potrebbe essere opportuno impostare MaxMessagesPerTask
dello stesso blocco su 1 (o un altro piccolo numero) in modo che al termine dell'elaborazione di un elemento venga immediatamente inviato all'output.
Inoltre, se si desidera limitare gli elementi in entrata, è possibile impostare BoundedCapacity
di enqueuer
.
Wow un bel po 'di chicche che vorrei prima digerire e provare. Grazie mille per quelli, per lo meno meritato un upvote ;-) Lasciami giocare con quelle idee e torno. Queuing Tasks ha molto senso e mi chiedo perché non l'abbia fatto prima. –
ok Passo un po 'di tempo a leggere il tuo post ea leggere su TPL Dataflow, qui accoppiano le domande per comprendere appieno la tua soluzione proposta: (1) perché proponi un IPropagatorBlock personalizzato e IDataflowBlock.Encapsulate() dato un Transformblock già esistente? (2) Non riesco a vedere come pensi di collegare i blocchi. Parli prima di ActionBlocks e poi di TransformBlocks. Da quello che ho letto, ActionBlock non è il "punto finale" dell'intera architettura? –
1. Questo è spiegato nel secondo paragrafo: 'TransformBlock' non è in grado di elaborare gli articoli in parallelo e restituirli in ordine allo stesso tempo. Può fare uno di loro, ma non entrambi. – svick