2012-04-27 14 views
7

Così il Play2.0 Enumeratee page mostra un esempio di utilizzo di un metodo &> o through modificare una Enumerator[String] in un Enumerator[Int]:Come scrivere un enumeratee al pezzo un enumeratore lungo diversi confini

val toInt: Enumeratee[String,Int] = Enumeratee.map[String]{ s => s.toInt } 
val ints: Enumerator[Int] = strings &> toInt 

C'è anche un enumeratee Enumeratee.grouped per creare un enumeratore di blocchi da singoli elementi. Sembrava funzionare bene.

Ma quello che vedo è che il solito input sarebbe sotto forma di Array[Byte] (che viene restituito da Enumerator.fromFile e Enumerator.fromStream). Con questo in mente vorrei prendere quegli input Array[Byte] e trasformarli in un Enumerator[String], ad esempio dove ogni stringa è una linea (terminata da un '\n'). I contorni delle linee e gli elementi Array[Byte] di solito non corrispondono. Come posso scrivere un enumeratore in grado di convertire gli array chunked in stringhe chunked?

Lo scopo è quello di ridimensionare le linee al browser ogni volta che Array[Byte] diventa disponibile e mantenere i byte rimanenti che non facevano parte di una linea completa fino a quando non arriva il prossimo blocco di input.

Idealmente mi piacerebbe avere un metodo che dato un iter: Iteratee[Array[Byte], T] e un Enumerator[Array[Byte]] mi darà indietro un Enumerator[T], dove i miei elementi T sono state analizzate da iter.

Ulteriori informazioni: ho avuto un po 'di tempo per pulire il mio codice ed ecco un esempio specifico di ciò che sto cercando di fare. Ho le seguenti iteratees che rilevano la riga successiva:

import play.api.libs.iteratee._ 
type AB = Array[Byte] 

def takeWhile(pred: Byte => Boolean): Iteratee[AB, AB] = { 
    def step(e: Input[AB], acc: AB): Iteratee[AB, AB] = e match { 
    case Input.EOF => Done(acc, Input.EOF) 
    case Input.Empty => Cont(step(_, acc)) 
    case Input.El(arr) => 
     val (taking, rest) = arr.span(pred) 
     if (rest.length > 0) Done(acC++ taking, Input.El(rest)) 
     else Cont(step(_, acC++ taking)) 
    } 
    Cont(step(_, Array())) 
} 

val line = for { 
    bytes <- takeWhile(b => !(b == '\n' || b == '\r')) 
    _  <- takeWhile(b => b == '\n' || b == '\r') 
} yield bytes 

E quello che mi piacerebbe fare è qualcosa di simile:

Ok.stream(Enumerator.fromFile(filename) &> chunkBy(line)).as("text/plain") 

risposta

5

https://github.com/playframework/Play20/commit/f979006a7e2c1c08ca56ee0bae67b5463ee099c1#L3R131 Fa qualcosa di simile a quello che stai facendo. Ho riparato raggruppato per occuparmi dell'input restante. Il codice è fondamentalmente come:

val upToNewLine = 
    Traversable.splitOnceAt[String,Char](_ != '\n') &>> 
    Iteratee.consume() 

Enumeratee.grouped(upToNewLine) 

Inoltre devo risolvere ripetere nello stesso modo

+0

Cool. Sembrava che "grouped" avrebbe dovuto fare quello che volevo. – huynhjl

2

Ecco quello che ho dopo alcune ore di sperimentazione. Spero che qualcuno possa realizzare un'implementazione più elegante, dato che riesco a malapena a seguire il mio.

def chunkBy(chunker: Iteratee[AB, AB]) = new Enumeratee[AB, AB] { 
    def applyOn[A](inner: Iteratee[AB, A]): Iteratee[AB, Iteratee[AB, A]] = { 
    def step(e: Input[AB], in: Iteratee[AB, A], leftover: Input[AB]): 
      Iteratee[AB, Iteratee[AB, A]] = { 
     e match { 
     case Input.EOF => 
      // if we have a leftover and it's a chunk, then output it 
      leftover match { 
      case Input.EOF | Input.Empty => Done(in, leftover) 
      case Input.El(_) => 
       val lastChunk = Iteratee.flatten(Enumerator.enumInput(leftover) 
       >>> Enumerator.eof |>> chunker) 
       lastChunk.pureFlatFold(
       done = { (chunk, rest) => 
        val nextIn = Iteratee.flatten(Enumerator(chunk) |>> in) 
        nextIn.pureFlatFold(
        done = (a, e2) => Done(nextIn, e2), 
        // nothing more will come 
        cont = k => Done(nextIn, Input.EOF), 
        error = (msg, e2) => Error(msg, e2)) 
       }, 
       // not enough content to get a chunk, so drop content 
       cont = k => Done(in, Input.EOF), 
       error = (msg, e2) => Error(msg, e2)) 
      } 
     case Input.Empty => Cont(step(_, in, leftover)) 
     case Input.El(arr) => 
      // feed through chunker 
      val iChunks = Iteratee.flatten(
      Enumerator.enumInput(leftover) 
       >>> Enumerator(arr) 
       >>> Enumerator.eof // to extract the leftover 
       |>> repeat(chunker)) 
      iChunks.pureFlatFold(
      done = { (chunks, rest) => 
       // we have our chunks, feed them to the inner iteratee 
       val nextIn = Iteratee.flatten(Enumerator(chunks: _*) |>> in) 
       nextIn.pureFlatFold(
       done = (a, e2) => Done(nextIn, e2), 
       // inner iteratee needs more data 
       cont = k => Cont(step(_: Input[AB], nextIn, 
        // we have to ignore the EOF we fed to repeat 
        if (rest == Input.EOF) Input.Empty else rest)), 
       error = (msg, e2) => Error(msg, e2)) 
      }, 
      // not enough content to get a chunk, continue 
      cont = k => Cont(step(_: Input[AB], in, leftover)), 
      error = (msg, e2) => Error(msg, e2)) 
     } 
    } 
    Cont(step(_, inner, Input.Empty)) 
    } 
} 

Ecco la definizione di mia abitudine repeat:

// withhold the last chunk so that it may be concatenated with the next one 
def repeat(chunker: Iteratee[AB, AB]) = { 
    def loop(e: Input[AB], ch: Iteratee[AB, AB], acc: Vector[AB], 
     leftover: Input[AB]): Iteratee[AB, Vector[AB]] = e match { 
    case Input.EOF => ch.pureFlatFold(
     done = (a, e) => Done(acc, leftover), 
     cont = k => k(Input.EOF).pureFlatFold(
     done = (a, e) => Done(acc, Input.El(a)), 
     cont = k => sys.error("divergent iter"), 
     error = (msg, e) => Error(msg, e)), 
     error = (msg, e) => Error(msg, e)) 
    case Input.Empty => Cont(loop(_, ch, acc, leftover)) 
    case Input.El(_) => 
     val i = Iteratee.flatten(Enumerator.enumInput(leftover) 
      >>> Enumerator.enumInput(e) |>> ch) 
     i.pureFlatFold(
     done = (a, e) => loop(e, chunker, acc :+ a, Input.Empty), 
     cont = k => Cont(loop(_, i, acc, Input.Empty)), 
     error = (msg, e) => Error(msg, e)) 
    } 
    Cont(loop(_: Input[AB], chunker, Vector(), Input.Empty)) 
} 

Questo funziona su alcuni campioni compreso questo:

val source = Enumerator(
    "bippy".getBytes, 
    "foo\n\rbar\n\r\n\rbaz\nb".getBytes, 
    "azam\ntoto\n\n".getBytes) 
Ok.stream(source 
    &> chunkBy(line) 
    &> Enumeratee.map(l => l ++ ".\n".getBytes) 
).as("text/plain") 

che stampa:

bippyfoo. 
bar. 
baz. 
bazam. 
toto.