Ho una lista di file. Voglio:Stream di Akka: lettura di più file
- Per leggere da tutti come un'unica fonte.
- I file devono essere letti in sequenza, in ordine. (no round-robin)
- Non è necessario che nessun file sia interamente memorizzato.
- Un errore di lettura da un file dovrebbe comprimere il flusso.
si sentiva come questo dovrebbe funzionare: (Scala, Akka-stream v2.4.7)
val sources = Seq("file1", "file2").map(new File(_)).map(f => FileIO.fromPath(f.toPath)
.via(Framing.delimiter(ByteString(System.lineSeparator), 10000, allowTruncation = true))
.map(bs => bs.utf8String)
)
val source = sources.reduce((a, b) => Source.combine(a, b)(MergePreferred(_)))
source.map(_ => 1).runWith(Sink.reduce[Int](_ + _)) // counting lines
Ma che si traduce in un errore di compilazione in quanto FileIO
ha un valore materializzato ad esso associati, e Source.combine
doesn lo sostengo
Mappatura del valore materializzato via mi fa chiedo come gli errori del file da leggere vengono trattati, ma non compilo:
val sources = Seq("file1", "file2").map(new File(_)).map(f => FileIO.fromPath(f.toPath)
.via(Framing.delimiter(ByteString(System.lineSeparator), 10000, allowTruncation = true))
.map(bs => bs.utf8String)
.mapMaterializedValue(f => NotUsed.getInstance())
)
val source = sources.reduce((a, b) => Source.combine(a, b)(MergePreferred(_)))
source.map(_ => 1).runWith(Sink.reduce[Int](_ + _)) // counting lines
Ma genera un IllegalArgumentException in fase di esecuzione:
java.lang.IllegalArgumentException: requirement failed: The inlets [] and outlets [MergePreferred.out] must correspond to the inlets [MergePreferred.preferred] and outlets [MergePreferred.out]
Stavo cercando modulare, quindi lo apprezzo. Stavo usando il conteggio delle righe come esempio di qualcosa che potrei fare con i file, e 'lineCounter' come scritto combina quello con la lettura dei file. (È un lavandino) Ma se sposto la piega e tutto quello che c'è dietro, sono rimasto con un Flow [Path, String, NotUsed], che è esattamente il pezzo che stavo cercando. – randomstatistic
Potete fornire le importazioni con i vostri esempi, sono una parte essenziale del codice. –
@OsskarWerrewka Dovrebbe essere tutto in akka.stream.scaladsl e java IO/NIO. Hai avuto un problema con esso? –