2016-04-01 25 views
10

escome ciclicamente ogni fila di dataframe in pyspark

sqlContext = SQLContext(sc) 

sample=sqlContext.sql("select Name ,age ,city from user") 
sample.show() 

L'istruzione precedente stampa intera tabella sul terminale ma voglio accedere a ciascuna riga della tabella utilizzando per o mentre per eseguire ulteriori calcoli.

+0

Credo di aver fornito una risposta corretta. Puoi selezionare o fornire feedback per migliorare? – aaronsteers

risposta

13

Semplicemente non è possibile. DataFrames, come le altre strutture di dati distribuite, non sono iterable e si può accedere utilizzando solo la funzione di ordine superiore dedicata e/oi metodi SQL.

Ovviamente si può collect o convertire toLocalIterator e iterare localmente

for row in df.rdd.collect(): 
    do_something(row) 

ma batte tutti gli usi di utilizzare Spark.

2

Se si desidera eseguire un'operazione su ciascuna riga in un oggetto DataFrame, utilizzare map. Ciò ti consentirà di eseguire ulteriori calcoli su ciascuna riga. È l'equivalente del looping dell'intero set di dati da 0 a len(dataset)-1.

Si noti che questo restituirà un PipelinedRDD, non un DataFrame.

21

Definire una funzione personalizzata e utilizzare la mappa.

def customFunction(row): 

    return (row.name, row.age, row.city) 

sample2 = sample.rdd.map(customFunction) 

o

sample2 = sample.rdd.map(lambda x: (x.name, x.age, x.city)) 

La funzione personalizzata sarebbe poi essere applicato a tutte le righe della dataframe. Si noti che sample2 sarà un RDD, non un dataframe.

La mappa è necessaria se si eseguono calcoli più complessi. Se hai solo bisogno di aggiungere una colonna derivata, puoi usare withColumn, restituendo un dataframe.

sample3 = sample.withColumn('age2', sample.age + 2) 
2

Utilizzando list comprehension in Python, è possibile raccogliere un'intera colonna di valori in una lista con solo due linee:

df = sqlContext.sql("show tables in default") 
tableList = [x["tableName"] for x in df.rdd.collect()] 

Nell'esempio di cui sopra, torniamo un elenco delle tabelle nel database ' default ', ma lo stesso può essere adattato sostituendo la query utilizzata in sql().

O più abbreviati:

tableList = [x["tableName"] for x in sqlContext.sql("show tables in default").rdd.collect()] 

E per il vostro esempio di tre colonne, possiamo creare una lista di dizionari, e poi scorrere attraverso di loro in un ciclo for.

sql_text = "select name, age, city from user" 
tupleList = [{name:x["name"], age:x["age"], city:x["city"]} 
      for x in sqlContext.sql(sql_text).rdd.collect()] 
for row in tupleList: 
    print("{} is a {} year old from {}".format(
     row["name"], 
     row["age"], 
     row["city"])) 
0

sopra

tupleList = [{name:x["name"], age:x["age"], city:x["city"]} 

dovrebbe essere

tupleList = [{'name':x["name"], 'age':x["age"], 'city':x["city"]} 

per name, age, e city non sono variabili, ma semplicemente le chiavi del dizionario.