2015-09-24 17 views
12

Ho un grandissimo pyspark.sql.dataframe.DataFrame denominato df. Ho bisogno di un modo per enumerare i record, quindi, essere in grado di accedere al record con un certo indice. (O gruppo selezionato di record con indici varia)PySpark DataFrames - modo per enumerare senza convertire in Panda?

In panda, ho potuto fare solo

indexes=[2,3,6,7] 
df[indexes] 

qui voglio qualcosa di simile, (e senza convertire dataframe di panda)

Il più vicino posso arrivare a è:

  • enumerazione tutti gli oggetti nella dataframe originale:

    indexes=np.arange(df.count()) 
    df_indexed=df.withColumn('index', indexes) 
    
    • Ricerca di valori che ho bisogno di utilizzare la funzione in cui().

DOMANDE:

  1. Perché non funziona e come farlo funzionare? Come aggiungere una riga a un dataframe?
  2. Funzionerebbe tardi per fare qualcosa di simile:

    indexes=[2,3,6,7] 
    df1.where("index in indexes").collect() 
    
  3. più veloce e più semplice modo per affrontarla?

risposta

11

Non funziona perché:

  1. il secondo argomento per withColumn dovrebbe essere un non Column una collezione. np.array non funziona qui
  2. quando si passa "index in indexes" come espressione SQL per whereindexes è fuori portata e non si risolve come un identificatore valido

PySpark> = 1.4.0

È possibile aggiungere numeri di riga utilizzando la rispettiva funzione della finestra e la query utilizzando il metodo Column.isin o una stringa di query correttamente formata:

from pyspark.sql.functions import col, rowNumber 
from pyspark.sql.window import Window 

w = Window.orderBy() 
indexed = df.withColumn("index", rowNumber().over(w)) 

# Using DSL 
indexed.where(col("index").isin(set(indexes))) 

# Using SQL expression 
indexed.where("index in ({0})".format(",".join(str(x) for x in indexes))) 

sembra che funzioni finestra chiamata senza PARTITION BY clausola di spostare tutti i dati per la singola partizione così sopra potrebbe non essere la soluzione migliore, dopo tutto.

Any faster and simpler way to deal with it?

Non proprio. Spark DataFrames non supporta l'accesso casuale alle righe.

PairedRDD è possibile accedere utilizzando il metodo lookup che è relativamente veloce se i dati vengono partizionati utilizzando HashPartitioner. C'è anche il progetto indexed-rdd che supporta ricerche efficienti.

Edit:

indipendente di versione PySpark si può provare qualcosa di simile:

from pyspark.sql import Row 
from pyspark.sql.types import StructType, StructField, LongType 

row = Row("char") 
row_with_index = Row("char", "index") 

df = sc.parallelize(row(chr(x)) for x in range(97, 112)).toDF() 
df.show(5) 

## +----+ 
## |char| 
## +----+ 
## | a| 
## | b| 
## | c| 
## | d| 
## | e| 
## +----+ 
## only showing top 5 rows 

# This part is not tested but should work and save some work later 
schema = StructType(
    df.schema.fields[:] + [StructField("index", LongType(), False)]) 

indexed = (df.rdd # Extract rdd 
    .zipWithIndex() # Add index 
    .map(lambda ri: row_with_index(*list(ri[0]) + [ri[1]])) # Map to rows 
    .toDF(schema)) # It will work without schema but will be more expensive 

# inSet in Spark < 1.3 
indexed.where(col("index").isin(indexes)) 
+0

Ciao @ zero323, ho provato lo snippet. Tutto funziona eccetto 'indexed.where (col (" index "). InSet (indexes))' che non funziona. Restituisce 'TypeError: 'Column' object is not callable' for me. Hai un aggiornamento sullo snippet se voglio interrogare più indici? – titipata

7

Se si desidera una serie di numeri che è garantito non entrare in collisione, ma non richiede una .over(partitionBy()) allora si può utilizzare monotonicallyIncreasingId().

from pyspark.sql.functions import monotonicallyIncreasingId 
df.select(monotonicallyIncreasingId().alias("rowId"),"*") 

Nota che i valori non sono particolarmente "accurati". Ad ogni partizione viene assegnato un intervallo di valori e l'output non sarà contiguo. Per esempio. 0, 1, 2, 8589934592, 8589934593, 8589934594.

Questo è stato aggiunto al Spark l'Apr 28, 2015 qui: https://github.com/apache/spark/commit/d94cd1a733d5715792e6c4eac87f0d5c81aebbe2

1

Si può certamente aggiungere una matrice per l'indicizzazione, una vasta gamma di scelta in effetti: In Scala, prima abbiamo bisogno di creare un array di indicizzazione:

val index_array=(1 to df.count.toInt).toArray 

index_array: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) 

Ora puoi aggiungere questa colonna al tuo DF. Innanzitutto, per questo, devi aprire il nostro DF e ottenerlo come array, quindi comprimerlo con il tuo array di indice e quindi convertire nuovamente il nuovo array in e RDD. Il passaggio finale è ottenere come DF:

final_df = sc.parallelize((df.collect.map(
    x=>(x(0),x(1))) zip index_array).map(
    x=>(x._1._1.toString,x._1._2.toString,x._2))). 
    toDF("column_name") 

L'indicizzazione sarebbe più chiara dopo.

+0

Questo è un modo abbastanza pratico, semplice ma bello di farlo :-) – Steve