8

Desidero implementare una priorità ActionBlock<T>. In modo che io possa condizionalmente dare la priorità a alcuni articoli TInput utilizzando un Predicate<T>.
Ho letto Parallel Extensions Extras Samples e Guide to Implementing Custom TPL Dataflow Blocks.
Ma ancora non capisco come posso implementare questo scenario.
---------------------------- MODIFICA ------------------- --------
Ci sono alcune attività, di cui 5 possono essere eseguite contemporaneamente. Quando l'utente preme il pulsante, alcuni (dipende dalla funzione del predicato) devono essere eseguiti con la massima priorità.
Infatti scrivo questo codicePersonalizzazione di ActionBlock <T>

TaskScheduler taskSchedulerHighPriority; 
ActionBlock<CustomObject> actionBlockLow; 
ActionBlock<CustomObject> actionBlockHigh; 
... 
queuedTaskScheduler = new QueuedTaskScheduler(TaskScheduler.Default, 5); 
taskSchedulerHigh = queuedTaskScheduler.ActivateNewQueue(0); 
taskSchedulerLow = queuedTaskScheduler.ActivateNewQueue(1); 
... 
actionBlockHigh = new ActionBlock<CustomObject>(new Action<CustomObject>(method), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5, SingleProducerConstrained = false, TaskScheduler = taskSchedulerHigh }); 
actionBlockLow = new ActionBlock<CustomObject>(new Action<CustomObject>(method), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5, MaxMessagesPerTask = 1, TaskScheduler = taskSchedulerLow }); 
...  
if (predicate(customObject)) 
    actionBlockHigh.Post(customObject); 
else 
    actionBlockLow.Post(customObject); 

ma sembra priorità non tiene effettuata affatto.
---------------------------- MODIFICA ------------------
trovo il fatto che quando uso questa riga di codice:

actionBlockHigh = new ActionBlock<AvlHistory>(new Action<AvlHistory>(SemaphoreAction), new ExecutionDataflowBlockOptions { TaskScheduler = taskSchedulerHigh }); 
actionBlockLow = new ActionBlock<AvlHistory>(new Action<AvlHistory>(SemaphoreAction), new ExecutionDataflowBlockOptions { TaskScheduler = taskSchedulerLow }); 

applicazione Causa osservare le priorità dei compiti correttamente, ma solo un compito può essere eseguire alla volta, nel frattempo utilizzando il primo blocco di codice che viene mostrato in scorrendo, l'applicazione esegue 5 attività contemporaneamente ma in ordine di priorità inappropriato.

actionBlockHigh = new ActionBlock<AvlHistory>(new Action<AvlHistory>(SemaphoreAction), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5, TaskScheduler = taskSchedulerHigh }); 
actionBlockLow = new ActionBlock<AvlHistory>(new Action<AvlHistory>(SemaphoreAction), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5, TaskScheduler = taskSchedulerLow }); 

Aggiornamento:
Serbatoi a svick, dovrei specificare MaxMessagesPerTask per taskSchedulerLow.

+2

Qual è la dettatura della priorità? È qualcosa di non correlato a 'T' affatto? O è la priorità una proprietà inerente/derivata di 'T'? – casperOne

+0

È possibile creare un blocco buffer personalizzato che utilizza ConcurrentPriorityQueue o creare un blocco di transfromazione asincrono personalizzato. Entrambe le opzioni sono non banali. Concorda anche con @casperOne, cosa significa priorità nel tuo caso? –

risposta

7

La tua domanda non include molti dettagli, quindi quanto segue è solo una supposizione di ciò che potrebbe essere necessario.

Penso che il modo più semplice per farlo è quello di avere due ActionBlock s, in esecuzione su priorità diverse su QueuedTaskScheduler from ParallelExtensionsExtras. Dovresti collegare a quello ad alta priorità usando un predicato e poi quello a bassa priorità. Inoltre, per assicurarti che la priorità alta Task non sia in attesa, imposta MaxMessagesPerTask del blocco con priorità bassa.

Nel codice, sarebbe qualcosa di simile:

static ITargetBlock<T> CreatePrioritizedActionBlock<T>(
    Action<T> action, Predicate<T> isPrioritizedPredicate) 
{ 
    var buffer = new BufferBlock<T>(); 

    var scheduler = new QueuedTaskScheduler(1); 

    var highPriorityScheduler = scheduler.ActivateNewQueue(0); 
    var lowPriorityScheduler = scheduler.ActivateNewQueue(1); 

    var highPriorityBlock = new ActionBlock<T>(
     action, new ExecutionDataflowBlockOptions 
     { 
      TaskScheduler = highPriorityScheduler 
     }); 
    var lowPriorityBlock = new ActionBlock<T>(
     action, new ExecutionDataflowBlockOptions 
     { 
      TaskScheduler = lowPriorityScheduler, 
      MaxMessagesPerTask = 1 
     }); 

    buffer.LinkTo(highPriorityBlock, isPrioritizedPredicate); 
    buffer.LinkTo(lowPriorityBlock); 

    return buffer; 
} 

Questo è solo un abbozzo di quello che si potrebbe fare, ad esempio, Completion del blocco restituita non si comporta correttamente.

+0

si prega di leggere il tag modificato – Rzassar

+1

Nel codice, non si specifica 'MaxMessagesPerTask' per il blocco a bassa priorità. Come ho detto, farlo è abbastanza importante. – svick