2012-11-14 12 views
5

Ho difficoltà a trovare un programma di pianificazione delle attività su cui posso pianificare le attività prioritarie ma che può anche gestire attività "incapsulate". È qualcosa di simile a ciò che Task.Run tenta di risolvere, ma non è possibile specificare un pianificatore di attività su Task.Run. Ho utilizzato QueuedTaskScheduler da Parallel Extensions Extras Samples per risolvere il requisito della priorità dell'attività (suggerito anche da questo post).Programmatore di attività a livello di concorrenza limitato (con priorità delle attività) gestione delle attività avvolte

Ecco il mio esempio:

class Program 
{ 
    private static QueuedTaskScheduler queueScheduler = new QueuedTaskScheduler(targetScheduler: TaskScheduler.Default, maxConcurrencyLevel: 1); 
    private static TaskScheduler ts_priority1; 
    private static TaskScheduler ts_priority2; 
    static void Main(string[] args) 
    { 
     ts_priority1 = queueScheduler.ActivateNewQueue(1); 
     ts_priority2 = queueScheduler.ActivateNewQueue(2); 

     QueueValue(1, ts_priority2); 
     QueueValue(2, ts_priority2); 
     QueueValue(3, ts_priority2); 
     QueueValue(4, ts_priority1); 
     QueueValue(5, ts_priority1); 
     QueueValue(6, ts_priority1); 

     Console.ReadLine();   
    } 

    private static Task QueueTask(Func<Task> f, TaskScheduler ts) 
    { 
     return Task.Factory.StartNew(f, CancellationToken.None, TaskCreationOptions.HideScheduler | TaskCreationOptions.DenyChildAttach, ts); 
    } 

    private static Task QueueValue(int i, TaskScheduler ts) 
    { 
     return QueueTask(async() => 
     { 
      Console.WriteLine("Start {0}", i); 
      await Task.Delay(1000); 
      Console.WriteLine("End {0}", i); 
     }, ts); 
    } 
} 

L'uscita tipico dell'esempio di cui sopra è:

Start 4 
Start 5 
Start 6 
Start 1 
Start 2 
Start 3 
End 4 
End 3 
End 5 
End 2 
End 1 
End 6 

Quello che voglio è:

Start 4 
End 4 
Start 5 
End 5 
Start 6 
End 6 
Start 1 
End 1 
Start 2 
End 2 
Start 3 
End 3 

EDIT:

Penso che sto cercando un programmatore di compiti, simile a QueuedTaskScheduler, che risolverà questo problema. Ma ogni altro suggerimento è benvenuto.

+0

Bene, ciò che si desidera è gestire la priorità delle attività, ma non eseguirle in modalità parallela? potresti non limitare solo il numero di thread simultanei nel tuo scheduler? – Kek

+0

@Kek 'new QueuedTaskScheduler (targetScheduler: TaskScheduler.Default, maxConcurrencyLevel: 1);' sopra fa esattamente questo (limita il numero di thread simultanei a 1) –

risposta

2

La soluzione migliore che ho trovato è quello di rendere la mia versione del QueuedTaskScheduler (originale trovata nel codice sorgente di Parallel Extensions Extras Samples).

Ho aggiunto un parametro bool awaitWrappedTasks ai costruttori dello QueuedTaskScheduler.

public QueuedTaskScheduler(
     TaskScheduler targetScheduler, 
     int maxConcurrencyLevel, 
     bool awaitWrappedTasks = false) 
{ 
    ... 
    _awaitWrappedTasks = awaitWrappedTasks; 
    ... 
} 

public QueuedTaskScheduler(
     int threadCount, 
     string threadName = "", 
     bool useForegroundThreads = false, 
     ThreadPriority threadPriority = ThreadPriority.Normal, 
     ApartmentState threadApartmentState = ApartmentState.MTA, 
     int threadMaxStackSize = 0, 
     Action threadInit = null, 
     Action threadFinally = null, 
     bool awaitWrappedTasks = false) 
{ 
    ... 
    _awaitWrappedTasks = awaitWrappedTasks; 

    // code starting threads (removed here in example) 
    ... 
} 

Ho quindi modificato il metodo ProcessPrioritizedAndBatchedTasks() essere async

private async void ProcessPrioritizedAndBatchedTasks() 

Ho poi modificato il codice subito dopo la parte in cui viene eseguita l'operazione pianificata:

private async void ProcessPrioritizedAndBatchedTasks() 
{ 
    bool continueProcessing = true; 
    while (!_disposeCancellation.IsCancellationRequested && continueProcessing) 
    { 
     try 
     { 
      // Note that we're processing tasks on this thread 
      _taskProcessingThread.Value = true; 

      // Until there are no more tasks to process 
      while (!_disposeCancellation.IsCancellationRequested) 
      { 
       // Try to get the next task. If there aren't any more, we're done. 
       Task targetTask; 
       lock (_nonthreadsafeTaskQueue) 
       { 
        if (_nonthreadsafeTaskQueue.Count == 0) break; 
        targetTask = _nonthreadsafeTaskQueue.Dequeue(); 
       } 

       // If the task is null, it's a placeholder for a task in the round-robin queues. 
       // Find the next one that should be processed. 
       QueuedTaskSchedulerQueue queueForTargetTask = null; 
       if (targetTask == null) 
       { 
        lock (_queueGroups) FindNextTask_NeedsLock(out targetTask, out queueForTargetTask); 
       } 

       // Now if we finally have a task, run it. If the task 
       // was associated with one of the round-robin schedulers, we need to use it 
       // as a thunk to execute its task. 
       if (targetTask != null) 
       { 
        if (queueForTargetTask != null) queueForTargetTask.ExecuteTask(targetTask); 
        else TryExecuteTask(targetTask); 

        // ***** MODIFIED CODE START **** 
        if (_awaitWrappedTasks) 
        { 
         var targetTaskType = targetTask.GetType(); 
         if (targetTaskType.IsConstructedGenericType && typeof(Task).IsAssignableFrom(targetTaskType.GetGenericArguments()[0])) 
         { 
          dynamic targetTaskDynamic = targetTask; 
          // Here we await the completion of the proxy task. 
          // We do not await the proxy task directly, because that would result in that await will throw the exception of the wrapped task (if one existed) 
          // In the continuation we then simply return the value of the exception object so that the exception (stored in the proxy task) does not go totally unobserved (that could cause the process to crash) 
          await TaskExtensions.Unwrap(targetTaskDynamic).ContinueWith((Func<Task, Exception>)(t => t.Exception), TaskContinuationOptions.ExecuteSynchronously); 
         } 
        } 
        // ***** MODIFIED CODE END **** 
       } 
      } 
     } 
     finally 
     { 
      // Now that we think we're done, verify that there really is 
      // no more work to do. If there's not, highlight 
      // that we're now less parallel than we were a moment ago. 
      lock (_nonthreadsafeTaskQueue) 
      { 
       if (_nonthreadsafeTaskQueue.Count == 0) 
       { 
        _delegatesQueuedOrRunning--; 
        continueProcessing = false; 
        _taskProcessingThread.Value = false; 
       } 
      } 
     } 
    } 
} 

Il cambiamento di metodo ThreadBasedDispatchLoop era un po 'diverso, nel senso che non possiamo usare la parola chiave async altrimenti interromperà la funzionalità di ex ecuting delle attività pianificate nei thread dedicati. Così qui è la versione modificata di ThreadBasedDispatchLoop

private void ThreadBasedDispatchLoop(Action threadInit, Action threadFinally) 
{ 
    _taskProcessingThread.Value = true; 
    if (threadInit != null) threadInit(); 
    try 
    { 
     // If the scheduler is disposed, the cancellation token will be set and 
     // we'll receive an OperationCanceledException. That OCE should not crash the process. 
     try 
     { 
      // If a thread abort occurs, we'll try to reset it and continue running. 
      while (true) 
      { 
       try 
       { 
        // For each task queued to the scheduler, try to execute it. 
        foreach (var task in _blockingTaskQueue.GetConsumingEnumerable(_disposeCancellation.Token)) 
        { 
         Task targetTask = task; 
         // If the task is not null, that means it was queued to this scheduler directly. 
         // Run it. 
         if (targetTask != null) 
         { 
          TryExecuteTask(targetTask); 
         } 
         // If the task is null, that means it's just a placeholder for a task 
         // queued to one of the subschedulers. Find the next task based on 
         // priority and fairness and run it. 
         else 
         { 
          // Find the next task based on our ordering rules...          
          QueuedTaskSchedulerQueue queueForTargetTask; 
          lock (_queueGroups) FindNextTask_NeedsLock(out targetTask, out queueForTargetTask); 

          // ... and if we found one, run it 
          if (targetTask != null) queueForTargetTask.ExecuteTask(targetTask); 
         } 

         if (_awaitWrappedTasks) 
         { 
          var targetTaskType = targetTask.GetType(); 
          if (targetTaskType.IsConstructedGenericType && typeof(Task).IsAssignableFrom(targetTaskType.GetGenericArguments()[0])) 
          { 
           dynamic targetTaskDynamic = targetTask; 
           // Here we wait for the completion of the proxy task. 
           // We do not wait for the proxy task directly, because that would result in that Wait() will throw the exception of the wrapped task (if one existed) 
           // In the continuation we then simply return the value of the exception object so that the exception (stored in the proxy task) does not go totally unobserved (that could cause the process to crash) 
           TaskExtensions.Unwrap(targetTaskDynamic).ContinueWith((Func<Task, Exception>)(t => t.Exception), TaskContinuationOptions.ExecuteSynchronously).Wait(); 
          } 
         } 
        } 
       } 
       catch (ThreadAbortException) 
       { 
        // If we received a thread abort, and that thread abort was due to shutting down 
        // or unloading, let it pass through. Otherwise, reset the abort so we can 
        // continue processing work items. 
        if (!Environment.HasShutdownStarted && !AppDomain.CurrentDomain.IsFinalizingForUnload()) 
        { 
         Thread.ResetAbort(); 
        } 
       } 
      } 
     } 
     catch (OperationCanceledException) { } 
    } 
    finally 
    { 
     // Run a cleanup routine if there was one 
     if (threadFinally != null) threadFinally(); 
     _taskProcessingThread.Value = false; 
    } 
} 

Ho testato questo e dà l'output desiderato. Questa tecnica potrebbe anche essere utilizzata per qualsiasi altro programmatore. Per esempio. LimitedConcurrencyLevelTaskScheduler e OrderedTaskScheduler

+0

In attesa sull'attività nello scheduler, il valore dell'IO asincrono viene distrutto. Se non hai bisogno di IO asincrono, in ogni caso puoi passare ai task task sincroni. – usr

+0

+1 quindi. Ho imparato molto in questa domanda. Non del tutto convinto che questa soluzione sia preferibile ad un 'AsyncSemaphore', ma ci penserò su. – usr

+0

Si sta eseguendo un metodo 'async-void' da un'implementazione' TaskScheduler'? Spaventoso, mi chiedo che @StephenCleary non abbia niente da dire su questo. – springy76

0

Penso che sia impossibile raggiungere questo obiettivo. Un problema principale sembra essere che un TaskScheduler può essere utilizzato solo per eseguire il codice. Ma ci sono attività che non eseguono codice, come attività IO o attività timer. Non penso che l'infrastruttura TaskScheduler possa essere utilizzata per programmarli.

Dalla prospettiva di un TaskScheduler che appare così:

1. Select a registered task for execution 
2. Execute its code on the CPU 
3. Repeat 

Fase (2) è sincrono che significa che il Task da eseguire deve iniziare e finire come parte della fase (2). Ciò significa che questo Task non può eseguire l'I/O asincrono perché non sarebbe bloccante. In questo senso, TaskScheduler supporta solo il codice di blocco.

Penso che ti sarebbe servito meglio implementando te stesso una versione di AsyncSemaphore che rilascia i camerieri in ordine di priorità e la limitazione. I tuoi metodi asincroni possono attendere quel semaforo in un modo non bloccante. Tutto il lavoro della CPU può essere eseguito sul pool di thread predefinito, quindi non è necessario avviare thread personalizzati all'interno di uno standard TaskScheduler. Le attività IO possono continuare a utilizzare IO non bloccanti.

+0

quello che hai spiegato qui ho già provato e ha praticamente lo stesso output (come nel problema originale). Nel tuo suggerimento 'firstPartTask' è programmato sull'utilità di pianificazione in coda, ma è completo non appena raggiunge il primo' await' e lo scheduler esegue semplicemente la "prima parte" successiva nella coda anche se il precedente "task interno" (il reset dell'attività dopo il primo 'attendi') non è stato completato. Posso solo pensare che questo sarà risolto da un ** scheduler ** che gestisce questo scenario che sto cercando e che non può essere risolto da qualche magia al di fuori dello scheduler. –

+0

Vengo a credere che tu abbia ragione. Ho aggiunto alcuni pensieri e un suggerimento. Per favore fatemi sapere cosa ne pensate. – usr

+0

Grazie per il tuo aggiornamento. Il tuo suggerimento utilizzando un blocco semaforo è esattamente ciò che l'utente ha suggerito nella seguente [risposta] (http://stackoverflow.com/a/13379980/1514235) (vedi i miei commenti). Il tuo suggerimento che uno schedulatore esegua i suoi compiti solo in modo sincrono è in qualche modo vero, ma cosa succede se lo schedulatore attende l'attività "completata" di ogni attività prima di eseguire qualsiasi altra attività nella coda. Penso che questo mi abbia dato un'idea ... grazie (ti farò sapere se mi viene in mente qualcosa). –

3

Sfortunatamente, questo non può essere risolto con una TaskScheduler, perché lavorano sempre a livello Task, e un metodo di async contiene quasi sempre più Task s.

È necessario utilizzare un SemaphoreSlim in combinazione con uno schedulizzatore prioritario. In alternativa, è possibile utilizzare AsyncLock (che è anche incluso nel mio AsyncEx library).

class Program 
{ 
    private static QueuedTaskScheduler queueScheduler = new QueuedTaskScheduler(targetScheduler: TaskScheduler.Default, maxConcurrencyLevel: 1); 
    private static TaskScheduler ts_priority1; 
    private static TaskScheduler ts_priority2; 
    private static SemaphoreSlim semaphore = new SemaphoreSlim(1); 
    static void Main(string[] args) 
    { 
    ts_priority1 = queueScheduler.ActivateNewQueue(1); 
    ts_priority2 = queueScheduler.ActivateNewQueue(2); 

    QueueValue(1, ts_priority2); 
    QueueValue(2, ts_priority2); 
    QueueValue(3, ts_priority2); 
    QueueValue(4, ts_priority1); 
    QueueValue(5, ts_priority1); 
    QueueValue(6, ts_priority1); 

    Console.ReadLine();   
    } 

    private static Task QueueTask(Func<Task> f, TaskScheduler ts) 
    { 
    return Task.Factory.StartNew(f, CancellationToken.None, TaskCreationOptions.HideScheduler | TaskCreationOptions.DenyChildAttach, ts).Unwrap(); 
    } 

    private static Task QueueValue(int i, TaskScheduler ts) 
    { 
    return QueueTask(async() => 
    { 
     await semaphore.WaitAsync(); 
     try 
     { 
     Console.WriteLine("Start {0}", i); 
     await Task.Delay(1000); 
     Console.WriteLine("End {0}", i); 
     } 
     finally 
     { 
     semaphore.Release(); 
     } 
    }, ts); 
    } 
} 
+1

Questa sembra una soluzione interessante. Tuttavia, vedo un problema con questo. Sebbene la soluzione restituisca (inizialmente) l'output corretto (come in questa domanda), ma interromperà la priorità delle attività eseguite. Lo scheduler eseguirà tutte le attività (con la priorità corretta) fino all'attivazione del semaforo.WaitAsync(), ma le attività con priorità più alta non verranno rilasciate dal blocco prima delle attività con priorità più bassa. Ciò è particolarmente vero se le attività con priorità più alta sono pianificate dopo attività con priorità più bassa (che sono ancora in attesa di essere rilasciate dal blocco). –

+0

In tal caso, è necessario un blocco basato su priorità effettiva, che non esiste perché AFAIK nessun altro ne ha avuto bisogno. Dovrai costruirti il ​​tuo. –

+0

Ho aggiunto il mio [risposta] (http://stackoverflow.com/a/13414364/1514235). Si prega di dare un'occhiata e vedere cosa ne pensi. –