2012-08-13 3 views
11

Voglio utilizzare Play2 per caricare file CSV molto grandi (milioni di righe) su elasticsearch. Ho scritto il seguente codice che funziona bene.Play 2 Scala - Il modo migliore per caricare un grande file CSV con Iteratee per elaborare ogni riga in modo reattivo

Non sono soddisfatto del modo in cui ignoro l'intestazione della risposta http nel primo blocco. Ci dovrebbe essere un modo per concatenare questa iteratee con un primo Iteratee che salta l'header http e passa direttamente allo stato Done, ma non ho trovato il modo.

Se qualcuno può aiutare

object ReactiveFileUpload extends Controller { 
    def upload = Action(BodyParser(rh => new CsvIteratee(isFirst = true))) { 
    request => 
     Ok("File Processed") 
    } 
} 

case class CsvIteratee(state: Symbol = 'Cont, input: Input[Array[Byte]] = Empty, lastChunk: String = "", isFirst: Boolean = false) extends Iteratee[Array[Byte], Either[Result, String]] { 
    def fold[B](
       done: (Either[Result, String], Input[Array[Byte]]) => Promise[B], 
       cont: (Input[Array[Byte]] => Iteratee[Array[Byte], Either[Result, String]]) => Promise[B], 
       error: (String, Input[Array[Byte]]) => Promise[B] 
       ): Promise[B] = state match { 
    case 'Done => 
     done(Right(lastChunk), Input.Empty) 

    case 'Cont => cont(in => in match { 
     case in: El[Array[Byte]] => { 
     // Retrieve the part that has not been processed in the previous chunk and copy it in front of the current chunk 
     val content = lastChunk + new String(in.e) 
     val csvBody = 
      if (isFirst) 
      // Skip http header if it is the first chunk 
      content.drop(content.indexOf("\r\n\r\n") + 4) 
      else content 
     val csv = new CSVReader(new StringReader(csvBody), ';') 
     val lines = csv.readAll 
     // Process all lines excepted the last one since it is cut by the chunk 
     for (line <- lines.init) 
      processLine(line) 
     // Put forward the part that has not been processed 
     val last = lines.last.toList.mkString(";") 
     copy(input = in, lastChunk = last, isFirst = false) 
     } 
     case Empty => copy(input = in, isFirst = false) 
     case EOF => copy(state = 'Done, input = in, isFirst = false) 
     case _ => copy(state = 'Error, input = in, isFirst = false) 
    }) 

    case _ => 
     error("Unexpected state", input) 

    } 

    def processLine(line: Array[String]) = WS.url("http://localhost:9200/affa/na/").post(
    toJson(
     Map(
     "date" -> toJson(line(0)), 
     "trig" -> toJson(line(1)), 
     "code" -> toJson(line(2)), 
     "nbjours" -> toJson(line(3).toDouble) 
    ) 
    ) 
) 
} 
+1

Potreste dal interessa il non ufficiale play-iteratee-extras, che ha una [parser CSV] (https://github.com/jroper/play -iteratees-extras/blob/master/src/main/Scala/play/extras/iteratees/Csv.scala). –

risposta

1

Per catena due iteratees insieme, utilizzare flatMap.

val combinedIteratee = firstIteratee.flatMap(firstResult => secondIteratee) 

Oppure, utilizzando una di comprensione:

val combinedIteratee = for { 
    firstResult <- firstIteratee 
    secondResult <- secondIteratee 
} yield secondResult 

È possibile utilizzare flatMap di sequenziare il maggior numero di iteratees insieme come ti piace.

si potrebbe desiderare di fare qualcosa di simile:

val headerAndCsvIteratee = for { 
    headerResult <- headerIteratee 
    csvResult <- csvIteratee 
} yield csvResult