2011-12-15 2 views
8

Ho bisogno di elaborare documenti XML che consistono in un numero molto grande di record indipendenti, ad es.Come ottenere un Iterator in streaming [Nodo] da un grande documento XML?

<employees> 
    <employee> 
     <firstName>Kermit</firstName> 
     <lastName>Frog</lastName> 
     <role>Singer</role> 
    </employee> 
    <employee> 
     <firstName>Oscar</firstName> 
     <lastName>Grouch</lastName> 
     <role>Garbageman</role> 
    </employee> 
    ... 
</employees> 

In alcuni casi questi sono solo file di grandi dimensioni, ma in altri possono provenire da una fonte di streaming.

Non posso solo scala.xml.XmlLoader.load() perché non voglio tenere tutto il documento in memoria (o aspettare che il flusso di input si chiuda), quando devo solo lavorare con un record alla volta. So che posso usare XmlEventReader per lo streaming dell'input come una sequenza di XmlEvents. Questi sono comunque molto meno comodi da utilizzare rispetto a scala.xml.Node.

Quindi mi piacerebbe ottenere un Iterator [Nodo] pigro da questo in qualche modo, al fine di operare su ogni singolo record utilizzando la sintassi Scala, mantenendo l'utilizzo della memoria sotto controllo.

Per fare da solo, potrei iniziare con un XmlEventReader, creare un buffer di eventi tra ogni tag iniziale e finale corrispondente e quindi costruire un albero del nodo da quello. Ma c'è un modo più semplice che ho trascurato? Grazie per eventuali approfondimenti!

risposta

8

È possibile utilizzare il parser sottostante utilizzato da XMLEventReader tramite ConstructingParser ed elaborare i nodi dipendenti al di sotto del livello superiore con un callback. Devi solo stare attento scartando i dati appena elaborati:

import scala.xml._ 

def processSource[T](input: Source)(f: NodeSeq => T) { 
    new scala.xml.parsing.ConstructingParser(input, false) { 
    nextch // initialize per documentation 
    document // trigger parsing by requesting document 

    var depth = 0 // track depth 

    override def elemStart(pos: Int, pre: String, label: String, 
     attrs: MetaData, scope: NamespaceBinding) { 
     super.elemStart(pos, pre, label, attrs, scope) 
     depth += 1 
    } 
    override def elemEnd(pos: Int, pre: String, label: String) { 
     depth -= 1 
     super.elemEnd(pos, pre, label) 
    } 
    override def elem(pos: Int, pre: String, label: String, attrs: MetaData, 
     pscope: NamespaceBinding, nodes: NodeSeq): NodeSeq = { 
     val node = super.elem(pos, pre, label, attrs, pscope, nodes) 
     depth match { 
     case 1 => <dummy/> // dummy final roll up 
     case 2 => f(node); NodeSeq.Empty // process and discard employee nodes 
     case _ => node // roll up other nodes 
     } 
    } 
    } 
} 

quindi si utilizza in questo modo per elaborare ogni nodo al secondo livello in memoria costante (supponendo che i nodi al secondo livello non sono sempre un numero arbitrario di figli):

processSource(src){ node => 
    // process here 
    println(node) 
} 

Il vantaggio rispetto a XMLEventReader è che non si utilizzano due thread. Inoltre, non è necessario analizzare il nodo due volte rispetto alla soluzione proposta. Lo svantaggio è che questo dipende dal funzionamento interno di ConstructingParser.

+0

Brillante! Funziona alla grande Per ottenere da questo generatore di stile a un Iterator non è troppo difficile; vedi la mia altra risposta. Grazie mille! –

5

Per ottenere dalla soluzione generatore di huynhjl ad un TraversableOnce[Node], utilizzare this trick:

def generatorToTraversable[T](func: (T => Unit) => Unit) = 
    new Traversable[T] { 
    def foreach[X](f: T => X) { 
     func(f(_)) 
    } 
    } 

def firstLevelNodes(input: Source): TraversableOnce[Node] = 
    generatorToTraversable(processSource(input)) 

Il risultato di generatorToTraversable non è attraversabile più di una volta (anche se un nuovo ConstructingParser viene creata un'istanza per ogni chiamata foreach), perché l'ingresso stream è una Source, che è un Iterator. Tuttavia, non possiamo sovrascrivere Traversable.isTraversableAgain perché è definitivo.

Davvero vorremmo far rispettare questo semplicemente restituendo un Iterator. Tuttavia, sia Traversable.toIterator che Traversable.view.toIterator creano un flusso intermedio, che memorizzerà tutte le voci nella cache (annullando l'intero scopo di questo esercizio). Oh bene; Farò in modo che il flusso generi un'eccezione se vi si accede due volte.

Nota anche il tutto non è thread-safe.

Questo codice funziona alla grande, e credo che la soluzione complessiva sia sia pigra che non memorizzata nella cache (quindi memoria costante), sebbene non l'abbia ancora provata su un grande input.

+0

Non sapevo di questo fantastico trucco! – huynhjl