Per utilizzare Rx, dovrete usare IObservable<T>
invece di IEnumerable<T>
.
public IObservable<T> ReadFiles()
{
return from filepath in lstOfFiles.ToObservable()
from item in Observable.Using(() => File.OpenRead(filepath), ReadStream)
select item;
}
Ogni volta che si chiama Subscribe
sul osservabile restituito da ReadFiles
, sarà iterare su tutte le stringhe in lstOfFiles
e, in parallelo *, leggere ogni flusso di file.
In sequenza, la query apre ogni flusso di file e lo passa a ReadStream
, che è responsabile della generazione della sequenza asincrona di elementi per un determinato flusso.
Il ReadFiles
query, che utilizza l'operatore SelectMany
scritta nella sintassi di query comprensione, fonde ogni "punto" che viene generato da tutti ReadStream
osservabili in una singola sequenza osservabile, rispettando l'asincronia della sorgente.
È consigliabile prendere in considerazione la scrittura di un async iterator per il metodo ReadStream
come illustrato qui; in caso contrario, se è necessario restituire IEnumerable<T>
, sarà necessario convertirlo applicando l'operatore ToObservable(scheduler)
con uno scheduler che introduce la concorrenza, che potrebbe essere meno efficiente.
public IObservable<Item> ReadStream(Stream stream)
{
return Observable.Create<Item>(async (observer, cancel) =>
{
// Here's one example of reading a stream with fixed item lengths.
var buffer = new byte[itemLength]; // TODO: Define itemLength
var remainder = itemLength;
int read;
do
{
read = await stream.ReadAsync(buffer, itemLength - remainder, remainder, cancel)
.ConfigureAwait(false);
remainder -= read;
if (read == 0)
{
if (remainder < itemLength)
{
throw new InvalidOperationException("End of stream unexpected.");
}
else
{
break;
}
}
else if (remainder == 0)
{
observer.OnNext(ReadItem(buffer)); // TODO: Define ReadItem
remainder = itemLength;
}
}
while (true);
});
}
* Rx non introduce alcuna concorrenza qui. La parallelizzazione è semplicemente il risultato della natura asincrona dell'API sottostante, quindi è molto efficiente. La lettura da un flusso di file in modo asincrono può far sì che Windows utilizzi una porta di completamento I/O come ottimizzazione, notificando su un pool di thread quando ogni buffer diventa disponibile. Ciò garantisce che Windows sia interamente responsabile della pianificazione dei callback per l'applicazione, piuttosto che per il TPL o per te stesso.
Rx è a thread libero, quindi ogni notifica per l'osservatore può essere su un thread in pool diverso; tuttavia, a causa del contratto di serializzazione di Rx (§4.2 Rx Design Guidelines), non si riceveranno notifiche sovrapposte nell'osservatore quando si chiama Subscribe
, quindi non è necessario fornire la sincronizzazione esplicita, come il blocco.
Tuttavia, a causa della natura parallela di questa query, è possibile osservare notifiche alternate rispetto a ciascun file, ma mai notifiche sovrapposte.
Se si preferisce ricevere tutti gli elementi per un dato file in una sola volta, come hai accennato nel tua domanda, allora si può semplicemente applicare l'operatore ToList
alla query e modificare il tipo di ritorno:
public IObservable<IList<T>> ReadFiles()
{
return from filepath in lstOfFiles.ToObservable()
from items in Observable.Using(() => File.OpenRead(filepath), ReadStream)
.ToList()
select items;
}
Se è necessario osservare le notifiche con affinità di thread (su un thread GUI, ad esempio), è necessario effettuare il marshalling delle notifiche poiché arriveranno su un thread in pool. Poiché questa query non introduce la concorrenza stessa, il modo migliore per ottenere ciò è applicare l'operatore ObserveOnDispatcher
(WPF, App Store, Telefono, Silverlight) o il sovraccarico ObserveOn(SynchronizationContext)
(WinForms, ASP.NET e così via). Non dimenticare di aggiungere un riferimento al pacchetto NuGet specifico della piattaforma; ad esempio, Rx-Wpf, Rx-WinForms, Rx-WindowsStore, ecc.
Potresti essere tentato di convertire il retro osservabile in un IEnumerable<T>
invece di chiamare Subscribe
. Non farlo. Nella maggior parte dei casi non è necessario, può essere inefficiente e nel peggiore dei casi potrebbe causare blocchi morti. Una volta entrato nel mondo dell'asincronia, dovresti provare a rimanere dentro. Questo non è solo vero per Rx ma anche per async/await
.
Ciao Nelson, thnx per fornire l'aiuto, sembra che l'errore di lancio del compilatore abbia tipo non può essere dedotto dall'argomento di specificazione dell'argomento use..try. – user145610
Cura di spiegare il downvote? –