2015-03-17 5 views
5

Sto mappando su una tabella HBase, generando un elemento RDD per riga HBase. Tuttavia, a volte la riga presenta dati non validi (generando NullPointerException nel codice di analisi), nel qual caso voglio solo saltarlo.Apache Spark: gestione di Option/Some/None in RDD

ho il mio mapper iniziale restituire un Option per indicare che restituisce 0 o 1 elementi, quindi filtrare per Some, quindi ottenere il valore contenuto:

// myRDD is RDD[(ImmutableBytesWritable, Result)] 
val output = myRDD. 
    map(tuple => getData(tuple._2)). 
    filter({case Some(y) => true; case None => false}). 
    map(_.get). 
    // ... more RDD operations with the good data 

def getData(r: Result) = { 
    val key = r.getRow 
    var id = "(unk)" 
    var x = -1L 

    try { 
    id = Bytes.toString(key, 0, 11) 
    x = Long.MaxValue - Bytes.toLong(key, 11) 
    // ... more code that might throw exceptions 

    Some((id, (List(x), 
      // more stuff ... 
     ))) 
    } catch { 
    case e: NullPointerException => { 
     logWarning("Skipping id=" + id + ", x=" + x + "; \n" + e) 
     None 
    } 
    } 
} 

C'è un modo più idiomatico per fare questo che è più breve? Mi sembra che questo sia piuttosto disordinato, sia in getData() sia nella danza map.filter.map che sto facendo.

Forse un flatMap potrebbe funzionare (generare 0 o 1 elementi in un Seq), ma non voglio che appiattisca le tuple che sto creando nella funzione mappa, basta eliminare i vuoti.

risposta

7

Se si modifica getData per restituire un scala.util.Try, è possibile semplificare notevolmente le trasformazioni. Qualcosa di simile potrebbe funzionare:

def getData(r: Result) = { 
    val key = r.getRow 
    var id = "(unk)" 
    var x = -1L 

    val tr = util.Try{ 
    id = Bytes.toString(key, 0, 11) 
    x = Long.MaxValue - Bytes.toLong(key, 11) 
    // ... more code that might throw exceptions 

    (id, (List(x) 
      // more stuff ... 
    )) 
    } 

    tr.failed.foreach(e => logWarning("Skipping id=" + id + ", x=" + x + "; \n" + e)) 
    tr 
} 

Poi la trasformazione potrebbe iniziare in questo modo:

myRDD. 
    flatMap(tuple => getData(tuple._2).toOption) 

Se il Try è un Failure sarà trasformato in un None via toOption e poi rimosso come parte del flatMap logica. A quel punto, il tuo prossimo passaggio nella trasformazione funzionerà solo con i casi riusciti, indipendentemente dal tipo sottostante che viene restituito da getData senza il wrapping (ovvero Option)

+0

Nizza! Darò uno sparo e vedrò come si integra. –

1

Se sei a posto con il dropping dei dati allora puoi semplicemente usare mapPartitions. Ecco un esempio:

import scala.util._ 
val mixedData = sc.parallelize(List(1,2,3,4,0)) 
mixedData.mapPartitions(x=>{ 
    val foo = for(y <- x) 
    yield { 
    Try(1/y) 
    } 
    for{goodVals <- foo.partition(_.isSuccess)._1} 
    yield goodVals.get 
}) 

Se si desidera visualizzare i valori negativi, quindi è possibile utilizzare un accumulator o fare il login come siete stati.

Il codice dovrebbe essere simile a questa:

val output = myRDD. 
    mapPartitions(tupleIter => getCleanData(tupleIter)) 
    // ... more RDD operations with the good data 

def getCleanData(iter: Iter[???]) = { 
    val triedData = getDataInTry(iter) 
    for{goodVals <- triedData.partition(_.isSuccess)._1} 
    yield goodVals.get 
} 

def getDataInTry(iter: Iter[???]) = { 
    for(r <- iter) yield { 
    Try{ 
     val key = r._2.getRow 
     var id = "(unk)" 
     var x = -1L 
     id = Bytes.toString(key, 0, 11) 
     x = Long.MaxValue - Bytes.toLong(key, 11) 
     // ... more code that might throw exceptions 
    } 
    } 
} 
+0

Non penso di capire cosa abbia a che fare con le partizioni - pensavo che le partizioni corrispondessero alle partizioni dei dati sottostanti, nel mio caso le regioni HBase. È sbagliato? –

+0

mapPartitions è il metodo .... significa che si ottiene l'intera partizione (sotto forma di un iteratore) con cui lavorare anziché una alla volta. È più spesso usato per operazioni costose (come l'apertura di una connessione) che si desidera eseguire una sola volta per partizione anziché per ciascun elemento. –

+0

Aha, vedo - si sta approfittando del fatto che ti dà un iteratore, e quindi non si aspetta una corrispondenza uno-a-uno tra input e output. –

7

Un'alternativa, e spesso trascurato modo, sarebbe utilizzando collect(PartialFunction pf), che si propone di 'selezionare' o 'raccogliere' elementi specifici nel RDD che sono definito nella funzione parziale.

Il codice sarebbe simile a questa:

val output = myRDD.collect{case Success(tuple) => tuple } 

def getData(r: Result):Try[(String, List[X])] = Try { 
     val id = Bytes.toString(key, 0, 11) 
     val x = Long.MaxValue - Bytes.toLong(key, 11) 
     (id, List(x)) 
} 
+0

Sembra una buona opzione, grazie. –

+0

Questa è una buona opzione, tuttavia sarei curioso di vedere come questo è diviso. Questo si traduce in una mescolanza? O si ridurrà allo stesso piano di esecuzione della mia risposta? –

+1

@JustinPihony non sono coinvolti shuffle. 'collect' può essere espresso come:' collection.filter (x => pf.isDefinedAt (x)). map (pf) ' – maasg