2015-07-28 1 views
11

Utilizzando Scala, come posso dividere dataFrame in più dataFrame (sia esso array o collezione) con lo stesso valore di colonna. Ad esempio voglio dividere il seguente dataframe:Come dividere un dataframe in dataframes con gli stessi valori di colonna?

ID Rate State 
1 24 AL 
2 35 MN 
3 46 FL 
4 34 AL 
5 78 MN 
6 99 FL 

a:

dati set 1

ID Rate State 
1 24 AL 
4 34 AL 

dati set 2

ID Rate State 
2 35 MN 
5 78 MN 

dati set 3

ID Rate State 
3 46 FL 
6 99 FL 
+1

Perché avete bisogno di dividere il dataframe in più dataframes ?. Come probabilmente sai, puoi filtrare e trasformare il tuo dataFrame in: [(AL, Seq (24 AL, 4 34 AL)), (MN, Seq (35 MN, 5 78 MN)), (FL, Seq (46 FL 6 99 FL))] Utilizzo di groupBy. –

+0

groupBy indica il tipo di Data di gruppo, come posso convertirlo in Array? – user1735076

+0

puoi spiegare cosa stai cercando di fare con quella matrice? – lev

risposta

11

È possibile raccogliere i valori di stato unici e semplicemente mappare gli array risultante:

val states = df.select("State").distinct.collect.flatMap(_.toSeq) 
val byStateArray = states.map(state => df.where($"State" <=> state)) 

o per mappare:

val byStateMap = states 
    .map(state => (state -> df.where($"State" <=> state))) 
    .toMap 

La stessa cosa in Python:

from itertools import chain 
from pyspark.sql.functions import col 

states = chain(*df.select("state").distinct().collect()) 

# PySpark 2.3 and later 
# In 2.2 and before col("state") == state) 
# should give the same outcome, ignoring NULLs 
# if NULLs are important 
# (lit(state).isNull() & col("state").isNull()) | (col("state") == state) 
df_by_state = {state: 
    df.where(col("state").eqNullSafe(state)) for state in states} 

L'ovvio il problema qui è che richiede una scansione completa dei dati per ogni livello, quindi è un'operazione costosa. Se siete alla ricerca di un modo per dividere solo l'uscita puoi anche How do I split an RDD into two or more RDDs?

In particolare è possibile scrivere Dataset partizionato dalla colonna di interesse:

val path: String = ??? 
df.write.partitionBy("State").parquet(path) 

e leggere di nuovo, se necessario:

// Depend on partition prunning 
for { state <- states } yield spark.read.parquet(path).where($"State" === state) 

// or explicitly read the partition 
for { state <- states } yield spark.read.parquet(s"$path/State=$state") 

A seconda della dimensione dei dati, il numero di livelli di divisione, archiviazione e livello di persistenza dell'input potrebbe essere più veloce o più lento rispetto a più filtri.

+0

Forse Tipo di domanda tardiva.Ma quando provo il codice python in Spark 2.2.0 ottengo sempre un errore "Column is not callable". Ho provato diversi approcci, ma ottengo lo stesso errore. Qualsiasi soluzione alternativa per questo? – inneb

-1

È molto semplice (se la versione spark è 2) se si crea il dataframe come tabella temporanea.

df1.createOrReplaceTempView("df1") 

E ora si possono fare le domande,

var df2 = spark.sql("select * from df1 where state = 'FL'") 
var df3 = spark.sql("select * from df1 where state = 'MN'") 
var df4 = spark.sql("select * from df1 where state = 'AL'") 

Ora è ottenuto il DF2, DF3, DF4. Se si desidera averli come lista, è possibile utilizzare,

df2.collect() 
df3.collect() 

o persino la funzione mappa/filtro. Si prega di fare riferimento https://spark.apache.org/docs/latest/sql-programming-guide.html#datasets-and-dataframes

Ash

+0

esiste la possibilità di eseguire il ciclo di query SQL in spark? Raccogli tutti i valori distinti prima e poi sostituisci "where state = 'FL'" con "where state = 'i'" o qualcosa del genere? – inneb

+0

Sarà sovraccarico, ma è comunque possibile gestirlo utilizzando Spark Dataframes e la codifica SCALA – ashK