2014-12-27 7 views
5

Sto cercando aiuto su come utilizzare la parola chiave yield per restituire IEnumberable in blocchi paralleli o Task block. Di seguito è riportato il codice pseudocome utilizzare yield per restituire la raccolta di Item in parallel block o Task

public IEnumerable<List<T>> ReadFile() 
{ 
    foreach (string filepath in lstOfFiles) 
    { 
     var stream = new FileStream(filepath , FileMode.Open, FileAccess.Read); 
     foreach (var item in ReadStream(stream)) 
      yield return item; //where item is of type List<string> 
    } 
} 

voglio convertire sopra il codice di blocco in parallelo come qui di seguito

lstOfFiles.AsParallel() 
      .ForAll(filepath => 
{ 
    var stream = new FileStream(filepath , FileMode.Open, FileAccess.Read); 
    foreach (var item in ReadStream(Stream)) 
     yield return item; 
}); 

ma compilatore tiri errore rendimento non può essere utilizzato in blocchi paralleli o delegato anonimo. Ho provato con Task Block anche, resa non è consentita nel task delegato anonimo

Qualcuno mi suggerisce il modo semplice e migliore per avere la resa per restituire la raccolta di dati in blocchi o attività parallele.

Ho letto che RX 2.0 o TPL sono buoni da usare nello scenario sopra. Ho un dubbio se utilizzare la libreria RX o TPL per il ritorno asincrono del rendimento dei valori. Qualcuno mi può suggerire che è meglio sia Rx o TPL.

Se utilizzo Rx, è necessario creare subscribe e convertire il blocco parallelo AsObservable.

risposta

0

Sembra che tu voglia utilizzare SelectMany. Non è possibile utilizzare yield in un metodo anonimo, ma è possibile rompere questo fuori in un nuovo metodo, in questo modo:

IEnumerable<Item> items = lstOfFiles.AsParallel() 
    .SelectMany((filepath) => ReadItems(filepath)); 

IEnumerable<Item> ReadItems(string filePath) 
{ 
    using(var Stream = new FileStream(filePath, FileMode.Open, FileAccess.Read)) 
    { 
     foreach (var item in ReadStream(Stream)) 
      yield return item; 
    } 
} 
+0

Ciao Nelson, thnx per fornire l'aiuto, sembra che l'errore di lancio del compilatore abbia tipo non può essere dedotto dall'argomento di specificazione dell'argomento use..try. – user145610

+0

Cura di spiegare il downvote? –

1

Per utilizzare Rx, dovrete usare IObservable<T> invece di IEnumerable<T>.

public IObservable<T> ReadFiles() 
{ 
    return from filepath in lstOfFiles.ToObservable() 
     from item in Observable.Using(() => File.OpenRead(filepath), ReadStream) 
     select item; 
} 

Ogni volta che si chiama Subscribe sul osservabile restituito da ReadFiles, sarà iterare su tutte le stringhe in lstOfFiles e, in parallelo *, leggere ogni flusso di file.

In sequenza, la query apre ogni flusso di file e lo passa a ReadStream, che è responsabile della generazione della sequenza asincrona di elementi per un determinato flusso.

Il ReadFiles query, che utilizza l'operatore SelectMany scritta nella sintassi di query comprensione, fonde ogni "punto" che viene generato da tutti ReadStream osservabili in una singola sequenza osservabile, rispettando l'asincronia della sorgente.

È consigliabile prendere in considerazione la scrittura di un async iterator per il metodo ReadStream come illustrato qui; in caso contrario, se è necessario restituire IEnumerable<T>, sarà necessario convertirlo applicando l'operatore ToObservable(scheduler) con uno scheduler che introduce la concorrenza, che potrebbe essere meno efficiente.

public IObservable<Item> ReadStream(Stream stream) 
{ 
    return Observable.Create<Item>(async (observer, cancel) => 
    { 
    // Here's one example of reading a stream with fixed item lengths. 

    var buffer = new byte[itemLength]; // TODO: Define itemLength 
    var remainder = itemLength; 
    int read; 

    do 
    { 
     read = await stream.ReadAsync(buffer, itemLength - remainder, remainder, cancel) 
         .ConfigureAwait(false); 

     remainder -= read; 

     if (read == 0) 
     { 
     if (remainder < itemLength) 
     { 
      throw new InvalidOperationException("End of stream unexpected."); 
     } 
     else 
     { 
      break; 
     } 
     } 
     else if (remainder == 0) 
     { 
     observer.OnNext(ReadItem(buffer)); // TODO: Define ReadItem 

     remainder = itemLength; 
     } 
    } 
    while (true); 
    }); 
} 

* Rx non introduce alcuna concorrenza qui. La parallelizzazione è semplicemente il risultato della natura asincrona dell'API sottostante, quindi è molto efficiente. La lettura da un flusso di file in modo asincrono può far sì che Windows utilizzi una porta di completamento I/O come ottimizzazione, notificando su un pool di thread quando ogni buffer diventa disponibile. Ciò garantisce che Windows sia interamente responsabile della pianificazione dei callback per l'applicazione, piuttosto che per il TPL o per te stesso.

Rx è a thread libero, quindi ogni notifica per l'osservatore può essere su un thread in pool diverso; tuttavia, a causa del contratto di serializzazione di Rx (§4.2 Rx Design Guidelines), non si riceveranno notifiche sovrapposte nell'osservatore quando si chiama Subscribe, quindi non è necessario fornire la sincronizzazione esplicita, come il blocco.

Tuttavia, a causa della natura parallela di questa query, è possibile osservare notifiche alternate rispetto a ciascun file, ma mai notifiche sovrapposte.

Se si preferisce ricevere tutti gli elementi per un dato file in una sola volta, come hai accennato nel tua domanda, allora si può semplicemente applicare l'operatore ToList alla query e modificare il tipo di ritorno:

public IObservable<IList<T>> ReadFiles() 
{ 
    return from filepath in lstOfFiles.ToObservable() 
     from items in Observable.Using(() => File.OpenRead(filepath), ReadStream) 
           .ToList() 
     select items; 
} 

Se è necessario osservare le notifiche con affinità di thread (su un thread GUI, ad esempio), è necessario effettuare il marshalling delle notifiche poiché arriveranno su un thread in pool. Poiché questa query non introduce la concorrenza stessa, il modo migliore per ottenere ciò è applicare l'operatore ObserveOnDispatcher (WPF, App Store, Telefono, Silverlight) o il sovraccarico ObserveOn(SynchronizationContext) (WinForms, ASP.NET e così via). Non dimenticare di aggiungere un riferimento al pacchetto NuGet specifico della piattaforma; ad esempio, Rx-Wpf, Rx-WinForms, Rx-WindowsStore, ecc.

Potresti essere tentato di convertire il retro osservabile in un IEnumerable<T> invece di chiamare Subscribe. Non farlo. Nella maggior parte dei casi non è necessario, può essere inefficiente e nel peggiore dei casi potrebbe causare blocchi morti. Una volta entrato nel mondo dell'asincronia, dovresti provare a rimanere dentro. Questo non è solo vero per Rx ma anche per async/await.

+0

Hey Dave, sono nuovo Rx, ma come possiamo iscriversi al posto di tolist() che potrebbe causare deadlock come da istruzione ur vorrei sapere come iscriversi quando la clausola mutliple è disponibile – user145610

+0

L'operatore 'ToList' che io ho usato nel mio esempio è definito nella classe 'System.Reactive.Linq.Observable' e funziona su' IObservable ', non' IEnumerable '. Il valore restituito è 'IObservable >', quindi è sicuro. Inoltre, non chiamare "Subscribe" nel mezzo di una query. Chiamalo solo alla fine. –

+0

Thnx Dave, ho trovato il tolist di LINQ.Osservato dopo aver postato il mio commento, l'ho testato senza bloccarlo. Ho aggiunto la sottoscrizione come di seguito che funziona perfettamente ReadFiles (childFiles) .ObserveOn (Scheduler.Default) .Subscribe ((result) => {Console.WriteLine (result.Count); Console.WriteLine ("ID discussione sottoscrizione: {0} ", Thread.CurrentThread.ManagedThreadId);}); Ho ancora una domanda, quando eseguo il codice sopra, vedo sempre tutti i metodi "ReadStream" eseguiti in un thread e subscriber in un altro thread, È possibile eseguire tutti i file (dice 4 file) in thread diversi e thread di sottoscrizione singoli – user145610