2015-04-07 2 views
9

ho un RDD (possiamo chiamarla myrdd) in cui ogni record nel RDD è nella forma:Creazione di un dataframe Spark da un RDD di liste

[('column 1',value), ('column 2',value), ('column 3',value), ... , ('column 100',value)] 

vorrei convertire questo in un dataframe in pyspark - qual è il modo più semplice per farlo?

+0

Non è esattamente chiaro dalla tua domanda dove hai problemi . È il fatto che tu abbia così tante colonne? O solo che i record del tuo RDD sono elenchi di tuple? –

risposta

29

Come circa l'uso del metodo toDF? Hai solo bisogno di aggiungere i nomi dei campi.

df = rdd.toDF(['column', 'value']) 
+0

questa risposta funziona, e la soluzione che ho postato qui sotto (in base alla tua risposta) convertirà un rdd come descritto sopra in un DataFrame – mgoldwasser

2

Dai uno sguardo allo DataFrame documentation per fare in modo che questo esempio funzioni per te, ma dovrebbe funzionare. Sto assumendo il tuo RDD si chiama my_rdd

from pyspark.sql import SQLContext, Row 
sqlContext = SQLContext(sc) 

# You have a ton of columns and each one should be an argument to Row 
# Use a dictionary comprehension to make this easier 
def record_to_row(record): 
    schema = {'column{i:d}'.format(i = col_idx):record[col_idx] for col_idx in range(1,100+1)} 
    return Row(**schema) 


row_rdd = my_rdd.map(lambda x: record_to_row(x)) 

# Now infer the schema and you have a DataFrame 
schema_my_rdd = sqlContext.inferSchema(row_rdd) 

# Now you have a DataFrame you can register as a table 
schema_my_rdd.registerTempTable("my_table") 

Non ho lavorato molto con DataFrames in Spark ma questo dovrebbe fare il trucco

+0

potrebbe essere necessario aggiungere una riga dopo aver creato sqlContext per caricare la libreria implicita: "import sqlContext .implicits._". Vedi https://spark.apache.org/docs/1.3.0/sql-programming-guide.html –

+0

Non è una cosa scala-unica? La mia risposta è scritta in Python –

8

La risposta da @dapangmao lo ha ottenuto a questa soluzione:

my_df = my_rdd.map(lambda l: Row(**dict(l))).toDF() 
1

In pyspark, diciamo che avete un dataframe chiamato come userDF.

>>> type(userDF) 
<class 'pyspark.sql.dataframe.DataFrame'> 

consente solo convertirlo in RDD (

userRDD = userDF.rdd 
>>> type(userRDD) 
<class 'pyspark.rdd.RDD'> 

e ora si può fare alcune manipolazioni e chiamare ad esempio mappa delle funzioni:

newRDD = userRDD.map(lambda x:{"food":x['favorite_food'], "name":x['name']}) 

Infine, consente di creare un dataframe da resiliente set di dati distribuiti (RDD).

newDF = sqlContext.createDataFrame(newRDD, ["food", "name"]) 

>>> type(ffDF) 
<class 'pyspark.sql.dataframe.DataFrame'> 

Questo è tutto.

sono stato colpito questo messaggio di avviso prima, quando ho provato a chiamare:

newDF = sc.parallelize(newRDD, ["food","name"] : 

.../spark-2.0.0-bin-hadoop2.7/python/pyspark/sql/session.py:336: UserWarning: Using RDD of dict to inferSchema is deprecated. Use pyspark.sql.Row inst warnings.warn("Using RDD of dict to inferSchema is deprecated. " 

Quindi nessun bisogno di farlo più ...