2015-03-03 10 views
6

Contesto: Sto cercando di scrivere un Process1[ByteVector, spray.http.HttpResponsePart] con l'output ChunkedResponseStart(bytes), MessageChunk(bytes), MessageChunk(bytes), ..., ChunkedResponseEnd. Non ho ancora completamente avvolto la mia mente sul flusso di scalaz e sul suo vocabolario.scalaz-stream: come gestire "intestazione" (primi blocchi) in un modo diverso rispetto agli altri?

Come scrivere un processo in grado di gestire i blocchi n in modo diverso?

mi è venuta in mente questa (stringhe come esempio):

val headerChunk = process1.chunk[String](5).map(_.reduce(_ + _)) 

val headerChunkAndRest: Process1[String, String] = 
    headerChunk.take(1) ++ process1.id 

io.linesR(Files.newInputStream(Paths.get("testdata/fahrenheit.txt"))) 
    .pipe(headerChunkAndRest) 
    .to(io.stdOutLines) 
    .run.run 

Che è un idiomatica e, forse, un modo generalmente componibile a scrivere headerChunkAndRest?

+0

È necessario accedere ai contenuti dell'intestazione durante l'elaborazione del resto delle righe? –

+1

Scodec-streams (Scalaz stream per decodificare e codificare il contenuto binario) ha un buon esempio che potrebbe risolvere il problema: d1 ++ d2: Esegui d1 ed emette tutti i suoi valori decodificati, quindi esegui d2 sull'input rimanente ed emette i suoi valori. Esempio: decode.once (codecs.int32) ++ decode.advance (12) decodifica un singolo segno Int, quindi fa avanzare il cursore di 12 bit. (https://github.com/scodec/scodec-stream#decoding) –

+0

@TravisBrown no, il suo contenuto non è necessario per l'elaborazione del resto. Nel mio caso il primo blocco dovrebbe contenere lo stesso tipo di dati del resto dei blocchi (parti di un file), è solo un pezzo più piccolo, da inviare al client in precedenza. –

risposta

4

Considerazioni generali

Ci sono diversi modi per fare questo, fortemente a seconda dei dettagli delle vostre esigenze. È possibile utilizzare i seguenti metodi di supporto che fanno parte del scalaz-stream:

  1. foldWithIndex Questo vi dà l'indice corrente del pezzo come un numero. È possibile discriminare in base a tale indice
  2. zipWithState È possibile aggiungere uno stato da una chiamata del metodo a quella successiva e utilizzare questo stato per tenere traccia se si stanno ancora analizzando le intestazioni o a se si è raggiunto il corpo. Nel passaggio successivo è quindi possibile utilizzare questo stato per gestire l'intestazione e il corpo diversi
  3. repartition Utilizzare questo per raggruppare tutti gli elementi di intestazione e tutti gli elementi del corpo. È quindi possibile elaborarli nel passaggio successivo.
  4. zipWithNext Questa funzione presenta sempre l'elemento precedente raggruppato con l'elemento corrente. È possibile utilizzare questo per rilevare, quando si passa dall'intestazione al corpo e reagire di conseguenza.

Forse dovresti ripensare, quello di cui hai veramente bisogno. Per esattamente la tua domanda, sarebbe zipwithIndex e poi map. Ma se ripensi a pensare al tuo problema, probabilmente finirai con repartition o zipWithState.

codice Esempio

Facciamo un semplice esempio: Un client HTTP, che separa gli elementi di intestazione HTTP dal corpo (HTTP, non HTML). Nell'intestazione una cosa come i cookie, nel corpo è il vero "contenuto", come un'immagine o le fonti HTTP.

Un client HTTP semplice potrebbe essere la seguente:

import scalaz.stream._ 
import scalaz.concurrent.Task 
import java.net.InetSocketAddress 
import java.nio.channels.AsynchronousChannelGroup 

implicit val AG = nio.DefaultAsynchronousChannelGroup 

def httpGetRequest(hostname : String, path : String = "/"): Process[Nothing, String] = 
    Process(
    s"GET $path HTTP/1.1", 
    s"Host: $hostname", 
    "Accept: */*", 
    "User-Agent: scalaz-stream" 
).intersperse("\n").append(Process("\n\n")) 

def simpleHttpClient(hostname : String, port : Int = 80, path : String = "/")(implicit AG: AsynchronousChannelGroup) : Process[Task, String] = 
    nio.connect(new InetSocketAddress(hostname, port)).flatMap(_.run(httpGetRequest(hostname, path).pipe(text.utf8Encode))).pipe(text.utf8Decode).pipe(text.lines()) 

Ora possiamo utilizzare questo codice per righe di intestazione separati dal resto. In HTTP, l'intestazione è strutturata in linee. È separato dal corpo da una linea vuota. Quindi, prima, cerchiamo di contare il numero di righe nell'intestazione:

val demoHostName="scala-lang.org" // Hope they won't mind... 
simpleHttpClient(demoHostName).zipWithIndex.takeWhile(! _._1.isEmpty).runLast.run 
// res3: Option[(String, Int)] = Some((Content-Type: text/html,8)) 

Quando ho eseguito questo, ci sono stati 8 linee nell'intestazione.Diamo innanzitutto definire un'enumerazione, in modo classificano le parti della risposta:

object HttpResponsePart { 
    sealed trait EnumVal 
    case object HeaderLine extends EnumVal 
    case object HeaderBodySeparator extends EnumVal 
    case object Body extends EnumVal 
    val httpResponseParts = Seq(HeaderLine, HeaderBodySeparator, Body) 
} 

E poi cerchiamo di utilizzare zipWithIndex più map per classificare le parti della risposta:

simpleHttpClient(demoHostName).zipWithIndex.map{ 
    case (line, idx) if idx < 9 => (line, HeaderLine) 
    case (line, idx) if idx == 10 => (line, HeaderBodySeparator) 
    case (line, _) => (line, Body) 
}.take(15).runLog.run 

Per me, questo funziona bene. Ma ovviamente, la quantità di linee di intestazione può cambiare in qualsiasi momento senza preavviso. È molto più robusto usare un parser molto semplice che consideri la struttura della risposta. per questo io uso zipWithState:

simpleHttpClient(demoHostName).zipWithState(HeaderLine : EnumVal){ 
    case (line, HeaderLine) if line.isEmpty => HeaderBodySeparator 
    case (_, HeaderLine) => HeaderLine 
    case (_, HeaderBodySeparator) => Body 
    case (line, Body) => Body 
}.take(15).runLog.run 

Si può vedere, che entrambi gli approcci utilizzano una struttura simile e entrambi gli approcci dovrebbero portare allo stesso risultato. La cosa bella è che entrambi gli approcci sono facilmente riutilizzabili. Puoi semplicemente scambiare la fonte, ad es. con un file e non devi modificare nulla. Lo stesso con l'elaborazione dopo la classificazione. Il .take(15).runLog.run è esattamente lo stesso in entrambi gli approcci.