2015-03-03 27 views
5

Desidero limitare il numero di articoli pubblicati in una pipeline Dataflow. Il numero di elementi dipende dall'ambiente di produzione. Questi oggetti consumano una grande quantità di memoria (immagini) quindi vorrei pubblicarli quando l'ultimo blocco della pipeline ha fatto il suo lavoro.Flusso dati TPL: come limitare un'intera pipeline?

Ho provato a utilizzare un SemaphoreSlim per limitare il produttore e rilasciarlo nell'ultimo blocco della pipeline. Funziona, ma se durante il processo viene sollevata un'eccezione, il programma rimane in attesa per sempre e l'eccezione non viene intercettata.

Ecco un esempio simile al nostro codice. Come posso fare questo?

static void Main(string[] args) 
{ 
    SemaphoreSlim semaphore = new SemaphoreSlim(1, 2); 

    var downloadString = new TransformBlock<string, string>(uri => 
    { 
     Console.WriteLine("Downloading '{0}'...", uri); 
     return new WebClient().DownloadString(uri); 
    }); 

    var createWordList = new TransformBlock<string, string[]>(text => 
    { 
     Console.WriteLine("Creating word list..."); 

     char[] tokens = text.ToArray(); 
     for (int i = 0; i < tokens.Length; i++) 
     { 
      if (!char.IsLetter(tokens[i])) 
       tokens[i] = ' '; 
     } 
     text = new string(tokens); 

     return text.Split(new char[] { ' ' }, 
      StringSplitOptions.RemoveEmptyEntries); 
    }); 

    var filterWordList = new TransformBlock<string[], string[]>(words => 
    { 
     Console.WriteLine("Filtering word list..."); 
     throw new InvalidOperationException("ouch !"); // explicit for test 
     return words.Where(word => word.Length > 3).OrderBy(word => word) 
      .Distinct().ToArray(); 
    }); 

    var findPalindromes = new TransformBlock<string[], string[]>(words => 
    { 
     Console.WriteLine("Finding palindromes..."); 

     var palindromes = new ConcurrentQueue<string>(); 

     Parallel.ForEach(words, word => 
     { 
      string reverse = new string(word.Reverse().ToArray()); 

      if (Array.BinarySearch<string>(words, reverse) >= 0 && 
       word != reverse) 
      { 
       palindromes.Enqueue(word); 
      } 
     }); 

     return palindromes.ToArray(); 
    }); 

    var printPalindrome = new ActionBlock<string[]>(palindromes => 
    { 
     try 
     { 
      foreach (string palindrome in palindromes) 
      { 
       Console.WriteLine("Found palindrome {0}/{1}", 
        palindrome, new string(palindrome.Reverse().ToArray())); 
      } 
     } 
     finally 
     { 
      semaphore.Release(); 
     } 
    }); 

    downloadString.LinkTo(createWordList); 
    createWordList.LinkTo(filterWordList); 
    filterWordList.LinkTo(findPalindromes); 
    findPalindromes.LinkTo(printPalindrome); 


    downloadString.Completion.ContinueWith(t => 
    { 
     if (t.IsFaulted) 
      ((IDataflowBlock)createWordList).Fault(t.Exception); 
     else createWordList.Complete(); 
    }); 
    createWordList.Completion.ContinueWith(t => 
    { 
     if (t.IsFaulted) 
      ((IDataflowBlock)filterWordList).Fault(t.Exception); 
     else filterWordList.Complete(); 
    }); 
    filterWordList.Completion.ContinueWith(t => 
    { 
     if (t.IsFaulted) 
      ((IDataflowBlock)findPalindromes).Fault(t.Exception); // enter here when an exception throws 
     else findPalindromes.Complete(); 
    }); 
    findPalindromes.Completion.ContinueWith(t => 
    { 
     if (t.IsFaulted) 
      ((IDataflowBlock)printPalindrome).Fault(t.Exception); // the fault is propagated here but not catched 
     else printPalindrome.Complete(); 
    }); 

    try 
    { 
     for (int i = 0; i < 10; i++) 
     { 
      Console.WriteLine(i); 

      downloadString.Post("http://www.google.com"); 
      semaphore.Wait(); // waits here when an exception throws 
     } 

     downloadString.Complete(); 

     printPalindrome.Completion.Wait(); 
    } 
    catch (AggregateException agg) 
    { 
     Console.WriteLine("An error has occured : " + agg); 
    } 
    Console.WriteLine("Done"); 
    Console.ReadKey(); 
} 

risposta

2

Si dovrebbe semplicemente attendere insieme sia il semaforo che il compito di completamento. In questo modo, se il blocco termina prematuramente (per eccezione o per cancellazione), l'eccezione verrà ripubblicata e, in caso contrario, attenderete il semaforo fino a quando non vi sarà spazio per postare altro.

È possibile farlo con Task.WhenAny e SemaphoreSlim.WaitAsync:

for (int i = 0; i < 10; i++) 
{ 
    Console.WriteLine(i); 
    downloadString.Post("http://www.google.com"); 

    if (printPalindrome.Completion.IsCompleted) 
    { 
     break; 
    } 

    Task.WhenAny(semaphore.WaitAsync(), printPalindrome.Completion).Wait(); 
} 

Nota: usando Task.Wait è opportuno solo in questo caso si tratta di Main. Di solito questo dovrebbe essere un metodo async e si dovrebbe await l'attività restituita da Task.WhenAny.

+0

Grazie, questa parte funziona alla grande. Tuttavia, il ciclo continua a produrre elementi nei primi due blocchi che non sono contrassegnati come guasti. Se modifiy questo pezzo di codice: 'findPalindromes.Completion.ContinueWith (t => { se (t.IsFaulted) { ((IDataflowBlock) printPalindrome) .Fault (t.Exception); ((IDataflowBlock) downloadString) .Fault (t.Exception); // contrassegna il primo blocco con errore } else printPalindrome.Complete(); }); funziona. Ma non sono sicuro che sia il modo migliore per farlo. – n3bula

+0

Non procedendo su questa rotta, il codice verrà eseguito in modo sincrono poiché sta solo dicendo al thread principale di attendere? – moarboilerplate

+0

@ n3bula È sufficiente verificare se l'attività di completamento è stata completata. Guarda il mio aggiornamento. – i3arnon

0

In questo modo ho gestito la limitazione o solo 10 elementi nel blocco sorgente in qualsiasi momento. Potresti modificarlo per avere 1. Assicurati di limitare anche altri blocchi nella pipeline, altrimenti potresti ottenere il blocco sorgente con 1 e il blocco successivo con molto altro.

var sourceBlock = new BufferBlock<string>(
    new ExecutionDataflowBlockOptions() { 
     SingleProducerConstrained = true, 
     BoundedCapacity = 10 }); 

Poi il produttore fa questo:

sourceBlock.SendAsync("value", shutdownToken).Wait(shutdownToken); 

Se si utilizza async/attendono, attendono solo la chiamata SendAsync.