È possibile raccogliere i valori di stato unici e semplicemente mappare gli array risultante:
val states = df.select("State").distinct.collect.flatMap(_.toSeq)
val byStateArray = states.map(state => df.where($"State" <=> state))
o per mappare:
val byStateMap = states
.map(state => (state -> df.where($"State" <=> state)))
.toMap
La stessa cosa in Python:
from itertools import chain
from pyspark.sql.functions import col
states = chain(*df.select("state").distinct().collect())
# PySpark 2.3 and later
# In 2.2 and before col("state") == state)
# should give the same outcome, ignoring NULLs
# if NULLs are important
# (lit(state).isNull() & col("state").isNull()) | (col("state") == state)
df_by_state = {state:
df.where(col("state").eqNullSafe(state)) for state in states}
L'ovvio il problema qui è che richiede una scansione completa dei dati per ogni livello, quindi è un'operazione costosa. Se siete alla ricerca di un modo per dividere solo l'uscita puoi anche How do I split an RDD into two or more RDDs?
In particolare è possibile scrivere Dataset
partizionato dalla colonna di interesse:
val path: String = ???
df.write.partitionBy("State").parquet(path)
e leggere di nuovo, se necessario:
// Depend on partition prunning
for { state <- states } yield spark.read.parquet(path).where($"State" === state)
// or explicitly read the partition
for { state <- states } yield spark.read.parquet(s"$path/State=$state")
A seconda della dimensione dei dati, il numero di livelli di divisione, archiviazione e livello di persistenza dell'input potrebbe essere più veloce o più lento rispetto a più filtri.
Perché avete bisogno di dividere il dataframe in più dataframes ?. Come probabilmente sai, puoi filtrare e trasformare il tuo dataFrame in: [(AL, Seq (24 AL, 4 34 AL)), (MN, Seq (35 MN, 5 78 MN)), (FL, Seq (46 FL 6 99 FL))] Utilizzo di groupBy. –
groupBy indica il tipo di Data di gruppo, come posso convertirlo in Array? – user1735076
puoi spiegare cosa stai cercando di fare con quella matrice? – lev