2014-11-20 19 views
17

Devo generare un elenco completo di numero riga per una tabella dati con molte colonne.Come si ottiene un numero di riga SQL equivalente per un RDD Spark?

In SQL, questo sarebbe simile a questa:

select 
    key_value, 
    col1, 
    col2, 
    col3, 
    row_number() over (partition by key_value order by col1, col2 desc, col3) 
from 
    temp 
; 

Ora, diciamo che in Spark ho un RDD della forma (K, V), dove V = (col1, col2, col3), quindi le mie voci sono come

(key1, (1,2,3)) 
(key1, (1,4,7)) 
(key1, (2,2,3)) 
(key2, (5,5,5)) 
(key2, (5,5,9)) 
(key2, (7,5,5)) 
etc. 

voglio ordinare questi utilizzando i comandi come sortby(), sortWith(), sortByKey(), zipWithIndex, ecc e avere un nuovo RDD con la corretta row_number

(key1, (1,2,3), 2) 
(key1, (1,4,7), 1) 
(key1, (2,2,3), 3) 
(key2, (5,5,5), 1) 
(key2, (5,5,9), 2) 
(key2, (7,5,5), 3) 
etc. 

(Non mi interessa la parentesi, quindi il modulo può anche essere (K, (col1, col2, col3, rownum)))

Come faccio?

Ecco il mio primo tentativo:

val sample_data = Seq(((3,4),5,5,5),((3,4),5,5,9),((3,4),7,5,5),((1,2),1,2,3),((1,2),1,4,7),((1,2),2,2,3)) 

val temp1 = sc.parallelize(sample_data) 

temp1.collect().foreach(println) 

// ((3,4),5,5,5) 
// ((3,4),5,5,9) 
// ((3,4),7,5,5) 
// ((1,2),1,2,3) 
// ((1,2),1,4,7) 
// ((1,2),2,2,3) 

temp1.map(x => (x, 1)).sortByKey().zipWithIndex.collect().foreach(println) 

// ((((1,2),1,2,3),1),0) 
// ((((1,2),1,4,7),1),1) 
// ((((1,2),2,2,3),1),2) 
// ((((3,4),5,5,5),1),3) 
// ((((3,4),5,5,9),1),4) 
// ((((3,4),7,5,5),1),5) 

// note that this isn't ordering with a partition on key value K! 

val temp2 = temp1.??? 

Si noti inoltre che la funzione sortby non può essere applicata direttamente a un RDD, ma bisogna correre raccogliere() prima, e poi l'uscita non è un RDD, sia , ma una matrice

temp1.collect().sortBy(a => a._2 -> -a._3 -> a._4).foreach(println) 

// ((1,2),1,4,7) 
// ((1,2),1,2,3) 
// ((1,2),2,2,3) 
// ((3,4),5,5,5) 
// ((3,4),5,5,9) 
// ((3,4),7,5,5) 

Ecco un po 'di più il progresso, ma ancora non partizionato:

val temp2 = sc.parallelize(temp1.map(a => (a._1,(a._2, a._3, a._4))).collect().sortBy(a => a._2._1 -> -a._2._2 -> a._2._3)).zipWithIndex.map(a => (a._1._1, a._1._2._1, a._1._2._2, a._1._2._3, a._2 + 1)) 

temp2.collect().foreach(println) 

// ((1,2),1,4,7,1) 
// ((1,2),1,2,3,2) 
// ((1,2),2,2,3,3) 
// ((3,4),5,5,5,4) 
// ((3,4),5,5,9,5) 
// ((3,4),7,5,5,6) 
+0

Questa domanda è un'estensione di diverse altre questioni in parte risposto, vale a dire http://stackoverflow.com/questions/23838614/how-to-sort-an-rdd-in-scala-spark, http://qnalist.com/questions/5086896/spark-sql-how-to-select-first-row-in-each-group -by-group, http://mail-archives.apache.org/mod_mbox/spark-user/201408.mbox/%3CD01B658B.2BF52%[email protected]%3E, http://stackoverflow.com/ domande/270220 59/filter-rdd-based-on-row-number, http://stackoverflow.com/questions/24677180/how-do-i-select-a-range-of-elements-in-spark-rdd –

+0

I ' Sto anche cercando di rispondere a questa domanda. [Hive ha aggiunto funzioni analitiche (incluso 'row_number()') in 0.11] (https://issues.apache.org/jira/browse/HIVE-896), e Spark 1.1 supporta HiveQL/Hive 0.12. Quindi sembra che 'sqlContext.hql (" seleziona row_number() over (partition by ... 'dovrebbe funzionare, ma sto ricevendo un errore. – dnlbrky

risposta

13

La funzionalità row_number() over (partition by ... order by ...) è stata aggiunta a Spark 1.4. Questa risposta utilizza PySpark/DataFrames.

creare un test dataframe:

from pyspark.sql import Row, functions as F 

testDF = sc.parallelize(
    (Row(k="key1", v=(1,2,3)), 
    Row(k="key1", v=(1,4,7)), 
    Row(k="key1", v=(2,2,3)), 
    Row(k="key2", v=(5,5,5)), 
    Row(k="key2", v=(5,5,9)), 
    Row(k="key2", v=(7,5,5)) 
    ) 
).toDF() 

Aggiungere il numero di riga partizionato:

from pyspark.sql.window import Window 

(testDF 
.select("k", "v", 
     F.rowNumber() 
     .over(Window 
       .partitionBy("k") 
       .orderBy("k") 
      ) 
     .alias("rowNum") 
     ) 
.show() 
) 

+----+-------+------+ 
| k|  v|rowNum| 
+----+-------+------+ 
|key1|[1,2,3]|  1| 
|key1|[1,4,7]|  2| 
|key1|[2,2,3]|  3| 
|key2|[5,5,5]|  1| 
|key2|[5,5,9]|  2| 
|key2|[7,5,5]|  3| 
+----+-------+------+ 
4

Questo è un problema interessante che stai sollevando. Risponderò a Python ma sono sicuro che sarai in grado di tradurre senza problemi su Scala.

Ecco come vorrei affrontarlo:

1- semplificare il vostro dati:

temp2 = temp1.map(lambda x: (x[0],(x[1],x[2],x[3]))) 

temp2 è ora una coppia chiave-valore "reale". Sembra che:

[ 
((3, 4), (5, 5, 5)), 
((3, 4), (5, 5, 9)), 
((3, 4), (7, 5, 5)), 
((1, 2), (1, 2, 3)), 
((1, 2), (1, 4, 7)), 
((1, 2), (2, 2, 3)) 

]

2- Poi, utilizzare il gruppo-by di riprodurre l'effetto della la partizione:

temp3 = temp2.groupByKey() 

TEMP3 ora è un RDD con 2 righe:

[((1, 2), <pyspark.resultiterable.ResultIterable object at 0x15e08d0>), 
((3, 4), <pyspark.resultiterable.ResultIterable object at 0x15e0290>)] 

3- Ora è necessario applicare una funzione di classificazione per ciascun valore dell'RDD.In Python, vorrei utilizzare la semplice funzione ordinato (l'enumerate creerà la colonna row_number):

temp4 = temp3.flatMap(lambda x: tuple([(x[0],(i[1],i[0])) for i in enumerate(sorted(x[1]))])).take(10) 

Si noti che per implementare il vostro ordine particolare, si avrebbe bisogno di alimentare il giusto argomento "chiave" (in python, vorrei solo creare una funzione lambda come quelli:

lambda tuple : (tuple[0],-tuple[1],tuple[2]) 

alla fine (senza la funzione argomento chiave, sembra che):

[ 
((1, 2), ((1, 2, 3), 0)), 
((1, 2), ((1, 4, 7), 1)), 
((1, 2), ((2, 2, 3), 2)), 
((3, 4), ((5, 5, 5), 0)), 
((3, 4), ((5, 5, 9), 1)), 
((3, 4), ((7, 5, 5), 2)) 

]

Spero che questo aiuti!

Buona fortuna.