Devo generare un elenco completo di numero riga per una tabella dati con molte colonne.Come si ottiene un numero di riga SQL equivalente per un RDD Spark?
In SQL, questo sarebbe simile a questa:
select
key_value,
col1,
col2,
col3,
row_number() over (partition by key_value order by col1, col2 desc, col3)
from
temp
;
Ora, diciamo che in Spark ho un RDD della forma (K, V), dove V = (col1, col2, col3), quindi le mie voci sono come
(key1, (1,2,3))
(key1, (1,4,7))
(key1, (2,2,3))
(key2, (5,5,5))
(key2, (5,5,9))
(key2, (7,5,5))
etc.
voglio ordinare questi utilizzando i comandi come sortby(), sortWith(), sortByKey(), zipWithIndex, ecc e avere un nuovo RDD con la corretta row_number
(key1, (1,2,3), 2)
(key1, (1,4,7), 1)
(key1, (2,2,3), 3)
(key2, (5,5,5), 1)
(key2, (5,5,9), 2)
(key2, (7,5,5), 3)
etc.
(Non mi interessa la parentesi, quindi il modulo può anche essere (K, (col1, col2, col3, rownum)))
Come faccio?
Ecco il mio primo tentativo:
val sample_data = Seq(((3,4),5,5,5),((3,4),5,5,9),((3,4),7,5,5),((1,2),1,2,3),((1,2),1,4,7),((1,2),2,2,3))
val temp1 = sc.parallelize(sample_data)
temp1.collect().foreach(println)
// ((3,4),5,5,5)
// ((3,4),5,5,9)
// ((3,4),7,5,5)
// ((1,2),1,2,3)
// ((1,2),1,4,7)
// ((1,2),2,2,3)
temp1.map(x => (x, 1)).sortByKey().zipWithIndex.collect().foreach(println)
// ((((1,2),1,2,3),1),0)
// ((((1,2),1,4,7),1),1)
// ((((1,2),2,2,3),1),2)
// ((((3,4),5,5,5),1),3)
// ((((3,4),5,5,9),1),4)
// ((((3,4),7,5,5),1),5)
// note that this isn't ordering with a partition on key value K!
val temp2 = temp1.???
Si noti inoltre che la funzione sortby non può essere applicata direttamente a un RDD, ma bisogna correre raccogliere() prima, e poi l'uscita non è un RDD, sia , ma una matrice
temp1.collect().sortBy(a => a._2 -> -a._3 -> a._4).foreach(println)
// ((1,2),1,4,7)
// ((1,2),1,2,3)
// ((1,2),2,2,3)
// ((3,4),5,5,5)
// ((3,4),5,5,9)
// ((3,4),7,5,5)
Ecco un po 'di più il progresso, ma ancora non partizionato:
val temp2 = sc.parallelize(temp1.map(a => (a._1,(a._2, a._3, a._4))).collect().sortBy(a => a._2._1 -> -a._2._2 -> a._2._3)).zipWithIndex.map(a => (a._1._1, a._1._2._1, a._1._2._2, a._1._2._3, a._2 + 1))
temp2.collect().foreach(println)
// ((1,2),1,4,7,1)
// ((1,2),1,2,3,2)
// ((1,2),2,2,3,3)
// ((3,4),5,5,5,4)
// ((3,4),5,5,9,5)
// ((3,4),7,5,5,6)
Questa domanda è un'estensione di diverse altre questioni in parte risposto, vale a dire http://stackoverflow.com/questions/23838614/how-to-sort-an-rdd-in-scala-spark, http://qnalist.com/questions/5086896/spark-sql-how-to-select-first-row-in-each-group -by-group, http://mail-archives.apache.org/mod_mbox/spark-user/201408.mbox/%3CD01B658B.2BF52%[email protected]%3E, http://stackoverflow.com/ domande/270220 59/filter-rdd-based-on-row-number, http://stackoverflow.com/questions/24677180/how-do-i-select-a-range-of-elements-in-spark-rdd –
I ' Sto anche cercando di rispondere a questa domanda. [Hive ha aggiunto funzioni analitiche (incluso 'row_number()') in 0.11] (https://issues.apache.org/jira/browse/HIVE-896), e Spark 1.1 supporta HiveQL/Hive 0.12. Quindi sembra che 'sqlContext.hql (" seleziona row_number() over (partition by ... 'dovrebbe funzionare, ma sto ricevendo un errore. – dnlbrky