2016-06-13 50 views
5

Ho una lista di file. Voglio:Stream di Akka: lettura di più file

  1. Per leggere da tutti come un'unica fonte.
  2. I file devono essere letti in sequenza, in ordine. (no round-robin)
  3. Non è necessario che nessun file sia interamente memorizzato.
  4. Un errore di lettura da un file dovrebbe comprimere il flusso.

si sentiva come questo dovrebbe funzionare: (Scala, Akka-stream v2.4.7)

val sources = Seq("file1", "file2").map(new File(_)).map(f => FileIO.fromPath(f.toPath) 
    .via(Framing.delimiter(ByteString(System.lineSeparator), 10000, allowTruncation = true)) 
    .map(bs => bs.utf8String) 
) 
val source = sources.reduce((a, b) => Source.combine(a, b)(MergePreferred(_))) 
source.map(_ => 1).runWith(Sink.reduce[Int](_ + _)) // counting lines 

Ma che si traduce in un errore di compilazione in quanto FileIO ha un valore materializzato ad esso associati, e Source.combine doesn lo sostengo

Mappatura del valore materializzato via mi fa chiedo come gli errori del file da leggere vengono trattati, ma non compilo:

val sources = Seq("file1", "file2").map(new File(_)).map(f => FileIO.fromPath(f.toPath) 
    .via(Framing.delimiter(ByteString(System.lineSeparator), 10000, allowTruncation = true)) 
    .map(bs => bs.utf8String) 
    .mapMaterializedValue(f => NotUsed.getInstance()) 
) 
val source = sources.reduce((a, b) => Source.combine(a, b)(MergePreferred(_))) 
source.map(_ => 1).runWith(Sink.reduce[Int](_ + _)) // counting lines 

Ma genera un IllegalArgumentException in fase di esecuzione:

java.lang.IllegalArgumentException: requirement failed: The inlets [] and outlets [MergePreferred.out] must correspond to the inlets [MergePreferred.preferred] and outlets [MergePreferred.out] 

risposta

8

Il codice seguente non è così conciso come potrebbe essere, al fine di modularizzare chiaramente le diverse preoccupazioni.

// Given a stream of bytestrings delimited by the system line separator we can get lines represented as Strings 
val lines = Framing.delimiter(ByteString(System.lineSeparator), 10000, allowTruncation = true).map(bs => bs.utf8String) 

// given as stream of Paths we read those files and count the number of lines 
val lineCounter = Flow[Path].flatMapConcat(path => FileIO.fromPath(path).via(lines)).fold(0l)((count, line) => count + 1).toMat(Sink.head)(Keep.right) 

// Here's our test data source (replace paths with real paths) 
val testFiles = Source(List("somePathToFile1", "somePathToFile2").map(new File(_).toPath)) 

// Runs the line counter over the test files, returns a Future, which contains the number of lines, which we then print out to the console when it completes 
testFiles runWith lineCounter foreach println 
+0

Stavo cercando modulare, quindi lo apprezzo. Stavo usando il conteggio delle righe come esempio di qualcosa che potrei fare con i file, e 'lineCounter' come scritto combina quello con la lettura dei file. (È un lavandino) Ma se sposto la piega e tutto quello che c'è dietro, sono rimasto con un Flow [Path, String, NotUsed], che è esattamente il pezzo che stavo cercando. – randomstatistic

+0

Potete fornire le importazioni con i vostri esempi, sono una parte essenziale del codice. –

+1

@OsskarWerrewka Dovrebbe essere tutto in akka.stream.scaladsl e java IO/NIO. Hai avuto un problema con esso? –

-1

Io ho una risposta fuori dal cancello - non utilizzare akka.FileIO. Questo sembra funzionare bene, ad esempio:

val sources = Seq("sample.txt", "sample2.txt").map(io.Source.fromFile(_).getLines()).reduce(_ ++ _) 
val source = Source.fromIterator[String](() => sources) 
val lineCount = source.map(_ => 1).runWith(Sink.reduce[Int](_ + _)) 

Vorrei ancora sapere se esiste una soluzione migliore.

+0

Utilizzando 'io.Source' si perde molta potenza. Per i file di piccole dimensioni questo potrebbe funzionare ma non è un'opzione per quelli di grandi dimensioni. – jarandaf

+0

@jarandaf Puoi chiarire? Avevo l'impressione che io.Source usasse appena un BufferedReader sotto il cofano, e l'iteratore getLines non caricasse l'intero file in una volta o qualcosa del genere. – randomstatistic

+0

meglio pensato, si potrebbe avere ragione (anche se 'FileIO' gestisce' ByteString' invece di 'String', che è destinato ad essere più performante). D'altra parte, con 'io.Source' bisogna sempre tenere a mente di chiudere la fonte (che non viene eseguita di default). – jarandaf

2

aggiornamento Oh, non ho visto la risposta accettata perché non ho aggiornare la pagina> _ <. Lascerò questo qui comunque poiché ho anche aggiunto alcune note sulla gestione degli errori.

Credo che il seguente programma fa quello che si vuole:

import akka.NotUsed 
import akka.actor.ActorSystem 
import akka.stream.{ActorMaterializer, IOResult} 
import akka.stream.scaladsl.{FileIO, Flow, Framing, Keep, Sink, Source} 
import akka.util.ByteString 
import scala.concurrent.{Await, Future} 
import scala.util.{Failure, Success} 
import scala.util.control.NonFatal 
import java.nio.file.Paths 
import scala.concurrent.duration._ 

object TestMain extends App { 
    implicit val actorSystem = ActorSystem("test") 
    implicit val materializer = ActorMaterializer() 
    implicit def ec = actorSystem.dispatcher 

    val sources = Vector("build.sbt", ".gitignore") 
    .map(Paths.get(_)) 
    .map(p => 
     FileIO.fromPath(p) 
     .viaMat(Framing.delimiter(ByteString(System.lineSeparator()), Int.MaxValue, allowTruncation = true))(Keep.left) 
     .mapMaterializedValue { f => 
      f.onComplete { 
      case Success(r) if r.wasSuccessful => println(s"Read ${r.count} bytes from $p") 
      case Success(r) => println(s"Something went wrong when reading $p: ${r.getError}") 
      case Failure(NonFatal(e)) => println(s"Something went wrong when reading $p: $e") 
      } 
      NotUsed 
     } 
    ) 
    val finalSource = Source(sources).flatMapConcat(identity) 

    val result = finalSource.map(_ => 1).runWith(Sink.reduce[Int](_ + _)) 
    result.onComplete { 
    case Success(n) => println(s"Read $n lines total") 
    case Failure(e) => println(s"Reading failed: $e") 
    } 
    Await.ready(result, 10.seconds) 

    actorSystem.terminate() 
} 

La chiave qui è il metodo flatMapConcat(): trasforma ogni elemento di un flusso in una fonte e restituisce un flusso di elementi prodotti dai queste fonti, se sono eseguiti sequenzialmente.

Per quanto riguarda la gestione degli errori, è possibile aggiungere un gestore per il futuro nell'argomento mapMaterializedValue, oppure è possibile gestire l'errore finale del ruscello che scorre mettendo un gestore sul Sink.foreach materializzato valore futuro. Ho fatto entrambi nell'esempio sopra, e se lo testate, per esempio, su un file inesistente, vedrete che lo stesso messaggio di errore verrà stampato due volte. Sfortunatamente, flatMapConcat() non raccoglie valori materializzati, e francamente non riesco a vedere il modo in cui potrebbe farlo in modo sano, quindi devi gestirli separatamente, se necessario.