Desidero limitare il numero di articoli pubblicati in una pipeline Dataflow. Il numero di elementi dipende dall'ambiente di produzione. Questi oggetti consumano una grande quantità di memoria (immagini) quindi vorrei pubblicarli quando l'ultimo blocco della pipeline ha fatto il suo lavoro.Flusso dati TPL: come limitare un'intera pipeline?
Ho provato a utilizzare un SemaphoreSlim
per limitare il produttore e rilasciarlo nell'ultimo blocco della pipeline. Funziona, ma se durante il processo viene sollevata un'eccezione, il programma rimane in attesa per sempre e l'eccezione non viene intercettata.
Ecco un esempio simile al nostro codice. Come posso fare questo?
static void Main(string[] args)
{
SemaphoreSlim semaphore = new SemaphoreSlim(1, 2);
var downloadString = new TransformBlock<string, string>(uri =>
{
Console.WriteLine("Downloading '{0}'...", uri);
return new WebClient().DownloadString(uri);
});
var createWordList = new TransformBlock<string, string[]>(text =>
{
Console.WriteLine("Creating word list...");
char[] tokens = text.ToArray();
for (int i = 0; i < tokens.Length; i++)
{
if (!char.IsLetter(tokens[i]))
tokens[i] = ' ';
}
text = new string(tokens);
return text.Split(new char[] { ' ' },
StringSplitOptions.RemoveEmptyEntries);
});
var filterWordList = new TransformBlock<string[], string[]>(words =>
{
Console.WriteLine("Filtering word list...");
throw new InvalidOperationException("ouch !"); // explicit for test
return words.Where(word => word.Length > 3).OrderBy(word => word)
.Distinct().ToArray();
});
var findPalindromes = new TransformBlock<string[], string[]>(words =>
{
Console.WriteLine("Finding palindromes...");
var palindromes = new ConcurrentQueue<string>();
Parallel.ForEach(words, word =>
{
string reverse = new string(word.Reverse().ToArray());
if (Array.BinarySearch<string>(words, reverse) >= 0 &&
word != reverse)
{
palindromes.Enqueue(word);
}
});
return palindromes.ToArray();
});
var printPalindrome = new ActionBlock<string[]>(palindromes =>
{
try
{
foreach (string palindrome in palindromes)
{
Console.WriteLine("Found palindrome {0}/{1}",
palindrome, new string(palindrome.Reverse().ToArray()));
}
}
finally
{
semaphore.Release();
}
});
downloadString.LinkTo(createWordList);
createWordList.LinkTo(filterWordList);
filterWordList.LinkTo(findPalindromes);
findPalindromes.LinkTo(printPalindrome);
downloadString.Completion.ContinueWith(t =>
{
if (t.IsFaulted)
((IDataflowBlock)createWordList).Fault(t.Exception);
else createWordList.Complete();
});
createWordList.Completion.ContinueWith(t =>
{
if (t.IsFaulted)
((IDataflowBlock)filterWordList).Fault(t.Exception);
else filterWordList.Complete();
});
filterWordList.Completion.ContinueWith(t =>
{
if (t.IsFaulted)
((IDataflowBlock)findPalindromes).Fault(t.Exception); // enter here when an exception throws
else findPalindromes.Complete();
});
findPalindromes.Completion.ContinueWith(t =>
{
if (t.IsFaulted)
((IDataflowBlock)printPalindrome).Fault(t.Exception); // the fault is propagated here but not catched
else printPalindrome.Complete();
});
try
{
for (int i = 0; i < 10; i++)
{
Console.WriteLine(i);
downloadString.Post("http://www.google.com");
semaphore.Wait(); // waits here when an exception throws
}
downloadString.Complete();
printPalindrome.Completion.Wait();
}
catch (AggregateException agg)
{
Console.WriteLine("An error has occured : " + agg);
}
Console.WriteLine("Done");
Console.ReadKey();
}
Grazie, questa parte funziona alla grande. Tuttavia, il ciclo continua a produrre elementi nei primi due blocchi che non sono contrassegnati come guasti. Se modifiy questo pezzo di codice: 'findPalindromes.Completion.ContinueWith (t => { se (t.IsFaulted) { ((IDataflowBlock) printPalindrome) .Fault (t.Exception); ((IDataflowBlock) downloadString) .Fault (t.Exception); // contrassegna il primo blocco con errore } else printPalindrome.Complete(); }); funziona. Ma non sono sicuro che sia il modo migliore per farlo. – n3bula
Non procedendo su questa rotta, il codice verrà eseguito in modo sincrono poiché sta solo dicendo al thread principale di attendere? – moarboilerplate
@ n3bula È sufficiente verificare se l'attività di completamento è stata completata. Guarda il mio aggiornamento. – i3arnon