Considerazioni generali
Ci sono diversi modi per fare questo, fortemente a seconda dei dettagli delle vostre esigenze. È possibile utilizzare i seguenti metodi di supporto che fanno parte del scalaz-stream:
foldWithIndex
Questo vi dà l'indice corrente del pezzo come un numero. È possibile discriminare in base a tale indice
zipWithState
È possibile aggiungere uno stato da una chiamata del metodo a quella successiva e utilizzare questo stato per tenere traccia se si stanno ancora analizzando le intestazioni o a se si è raggiunto il corpo. Nel passaggio successivo è quindi possibile utilizzare questo stato per gestire l'intestazione e il corpo diversi
repartition
Utilizzare questo per raggruppare tutti gli elementi di intestazione e tutti gli elementi del corpo. È quindi possibile elaborarli nel passaggio successivo.
zipWithNext
Questa funzione presenta sempre l'elemento precedente raggruppato con l'elemento corrente. È possibile utilizzare questo per rilevare, quando si passa dall'intestazione al corpo e reagire di conseguenza.
Forse dovresti ripensare, quello di cui hai veramente bisogno. Per esattamente la tua domanda, sarebbe zipwithIndex
e poi map
. Ma se ripensi a pensare al tuo problema, probabilmente finirai con repartition
o zipWithState
.
codice Esempio
Facciamo un semplice esempio: Un client HTTP, che separa gli elementi di intestazione HTTP dal corpo (HTTP, non HTML). Nell'intestazione una cosa come i cookie, nel corpo è il vero "contenuto", come un'immagine o le fonti HTTP.
Un client HTTP semplice potrebbe essere la seguente:
import scalaz.stream._
import scalaz.concurrent.Task
import java.net.InetSocketAddress
import java.nio.channels.AsynchronousChannelGroup
implicit val AG = nio.DefaultAsynchronousChannelGroup
def httpGetRequest(hostname : String, path : String = "/"): Process[Nothing, String] =
Process(
s"GET $path HTTP/1.1",
s"Host: $hostname",
"Accept: */*",
"User-Agent: scalaz-stream"
).intersperse("\n").append(Process("\n\n"))
def simpleHttpClient(hostname : String, port : Int = 80, path : String = "/")(implicit AG: AsynchronousChannelGroup) : Process[Task, String] =
nio.connect(new InetSocketAddress(hostname, port)).flatMap(_.run(httpGetRequest(hostname, path).pipe(text.utf8Encode))).pipe(text.utf8Decode).pipe(text.lines())
Ora possiamo utilizzare questo codice per righe di intestazione separati dal resto. In HTTP, l'intestazione è strutturata in linee. È separato dal corpo da una linea vuota. Quindi, prima, cerchiamo di contare il numero di righe nell'intestazione:
val demoHostName="scala-lang.org" // Hope they won't mind...
simpleHttpClient(demoHostName).zipWithIndex.takeWhile(! _._1.isEmpty).runLast.run
// res3: Option[(String, Int)] = Some((Content-Type: text/html,8))
Quando ho eseguito questo, ci sono stati 8 linee nell'intestazione.Diamo innanzitutto definire un'enumerazione, in modo classificano le parti della risposta:
object HttpResponsePart {
sealed trait EnumVal
case object HeaderLine extends EnumVal
case object HeaderBodySeparator extends EnumVal
case object Body extends EnumVal
val httpResponseParts = Seq(HeaderLine, HeaderBodySeparator, Body)
}
E poi cerchiamo di utilizzare zipWithIndex
più map
per classificare le parti della risposta:
simpleHttpClient(demoHostName).zipWithIndex.map{
case (line, idx) if idx < 9 => (line, HeaderLine)
case (line, idx) if idx == 10 => (line, HeaderBodySeparator)
case (line, _) => (line, Body)
}.take(15).runLog.run
Per me, questo funziona bene. Ma ovviamente, la quantità di linee di intestazione può cambiare in qualsiasi momento senza preavviso. È molto più robusto usare un parser molto semplice che consideri la struttura della risposta. per questo io uso zipWithState
:
simpleHttpClient(demoHostName).zipWithState(HeaderLine : EnumVal){
case (line, HeaderLine) if line.isEmpty => HeaderBodySeparator
case (_, HeaderLine) => HeaderLine
case (_, HeaderBodySeparator) => Body
case (line, Body) => Body
}.take(15).runLog.run
Si può vedere, che entrambi gli approcci utilizzano una struttura simile e entrambi gli approcci dovrebbero portare allo stesso risultato. La cosa bella è che entrambi gli approcci sono facilmente riutilizzabili. Puoi semplicemente scambiare la fonte, ad es. con un file e non devi modificare nulla. Lo stesso con l'elaborazione dopo la classificazione. Il .take(15).runLog.run
è esattamente lo stesso in entrambi gli approcci.
È necessario accedere ai contenuti dell'intestazione durante l'elaborazione del resto delle righe? –
Scodec-streams (Scalaz stream per decodificare e codificare il contenuto binario) ha un buon esempio che potrebbe risolvere il problema: d1 ++ d2: Esegui d1 ed emette tutti i suoi valori decodificati, quindi esegui d2 sull'input rimanente ed emette i suoi valori. Esempio: decode.once (codecs.int32) ++ decode.advance (12) decodifica un singolo segno Int, quindi fa avanzare il cursore di 12 bit. (https://github.com/scodec/scodec-stream#decoding) –
@TravisBrown no, il suo contenuto non è necessario per l'elaborazione del resto. Nel mio caso il primo blocco dovrebbe contenere lo stesso tipo di dati del resto dei blocchi (parti di un file), è solo un pezzo più piccolo, da inviare al client in precedenza. –