2015-12-29 19 views
6
Aggiornamento

: Sembra che i miei errori siano probabilmente dovuti a come ho installato Spark e/o Hive. Lavorare con le funzioni della finestra sembra piuttosto semplice in un notebook Databricks (ospitato). Devo capire come installarlo localmente.Come posso ottenere un DataSource PySpark creato usando HiveContext in Spark 1.5.2?

Possiedo uno Spark DataFrame su cui ho bisogno di utilizzare una funzione Window attiva. * Ho provato a seguire le istruzioni su here, ma ho riscontrato alcuni problemi.

Impostazione mio ambiente:

import os 
import sys 
import datetime as dt 

os.environ["SPARK_HOME"] = '/usr/bin/spark-1.5.2' 
os.environ["PYTHONPATH"] = '/usr/bin/spark-1.5.2/python/lib/py4j-0.8.2.1-src.zip' 
sys.path.append('/usr/bin/spark-1.5.2/python') 
sys.path.append('/usr/bin/spark-1.5.2/python/lib/py4j-0.8.2.1-src.zip') 

import pyspark 
sc = pyspark.SparkContext() 
hiveContext = pyspark.sql.HiveContext(sc) 
sqlContext = pyspark.sql.SQLContext(sc) 
from pyspark.sql import Row 
from pyspark.sql.functions import struct 
from pyspark.sql import DataFrame 
from collections import OrderedDict 

Impostazione miei dati:

test_ts = {'adminDistrict': None, 
'city': None, 
'country': {'code': 'NA', 'name': 'UNKNOWN'}, 
'data': [{'timestamp': '2005-08-25T00:00:00Z', 'value': 369.89}, 
    {'timestamp': '2005-08-26T00:00:00Z', 'value': 362.44}, 
    {'timestamp': '2005-08-29T00:00:00Z', 'value': 368.3}, 
    {'timestamp': '2005-08-30T00:00:00Z', 'value': 382.6}, 
    {'timestamp': '2005-08-31T00:00:00Z', 'value': 377.84}, 
    {'timestamp': '2005-09-01T00:00:00Z', 'value': 380.74}, 
    {'timestamp': '2005-09-02T00:00:00Z', 'value': 370.33}, 
    {'timestamp': '2005-09-05T00:00:00Z', 'value': 370.33}, 
    {'timestamp': '2005-09-06T00:00:00Z', 'value': 361.5}, 
    {'timestamp': '2005-09-07T00:00:00Z', 'value': 352.79}, 
    {'timestamp': '2005-09-08T00:00:00Z', 'value': 354.3}, 
    {'timestamp': '2005-09-09T00:00:00Z', 'value': 353.0}, 
    {'timestamp': '2005-09-12T00:00:00Z', 'value': 349.35}, 
    {'timestamp': '2005-09-13T00:00:00Z', 'value': 348.82}, 
    {'timestamp': '2005-09-14T00:00:00Z', 'value': 360.24}, 
    {'timestamp': '2005-09-15T00:00:00Z', 'value': 357.61}, 
    {'timestamp': '2005-09-16T00:00:00Z', 'value': 347.14}, 
    {'timestamp': '2005-09-19T00:00:00Z', 'value': 370.0}, 
    {'timestamp': '2005-09-20T00:00:00Z', 'value': 362.82}, 
    {'timestamp': '2005-09-21T00:00:00Z', 'value': 366.11}, 
    {'timestamp': '2005-09-22T00:00:00Z', 'value': 364.46}, 
    {'timestamp': '2005-09-23T00:00:00Z', 'value': 351.8}, 
    {'timestamp': '2005-09-26T00:00:00Z', 'value': 360.74}, 
    {'timestamp': '2005-09-27T00:00:00Z', 'value': 356.63}, 
    {'timestamp': '2005-09-28T00:00:00Z', 'value': 363.64}, 
    {'timestamp': '2005-09-29T00:00:00Z', 'value': 366.05}], 
'maxDate': '2015-12-28T00:00:00Z', 
'minDate': '2005-08-25T00:00:00Z', 
'name': 'S&P GSCI Crude Oil Spot', 
'offset': 0, 
'resolution': 'DAY', 
'sources': ['trf'], 
'subtype': 'Index', 
'type': 'Commodities', 
'uid': 'TRF_INDEX_Z39824_PI'} 

Una funzione a sua volta che JSON in un dataframe:

def ts_to_df(ts): 
    data = [] 
    for line in ts['data']: 
     data.append((dt.datetime.strptime(line['timestamp'][:10], '%Y-%m-%d').date(), line['value'])) 
    return sc.parallelize(data).toDF(['Date', ts['name'].replace('&', '').replace(' ', '_')]) 

Ottenere un dataframe e tenendo uno sguardo a cosa c'è dentro:

test_df = ts_to_df(test_ts) 
test_df.show() 

Questo mi mostra questo:

+----------+----------------------+ 
|  Date|SP_GSCI_Crude_Oil_Spot| 
+----------+----------------------+ 
|2005-08-25|    369.89| 
|2005-08-26|    362.44| 
|2005-08-29|     368.3| 
|2005-08-30|     382.6| 
|2005-08-31|    377.84| 
|2005-09-01|    380.74| 
|2005-09-02|    370.33| 
|2005-09-05|    370.33| 
|2005-09-06|     361.5| 
|2005-09-07|    352.79| 
|2005-09-08|     354.3| 
|2005-09-09|     353.0| 
|2005-09-12|    349.35| 
|2005-09-13|    348.82| 
|2005-09-14|    360.24| 
|2005-09-15|    357.61| 
|2005-09-16|    347.14| 
|2005-09-19|     370.0| 
|2005-09-20|    362.82| 
|2005-09-21|    366.11| 
+----------+----------------------+ 

E qui è dove ho idea di quello che sto facendo e tutto comincia ad andare storto:

from pyspark.sql.functions import lag, col, lead 
from pyspark.sql.window import Window 

w = Window().partitionBy().orderBy(col('Date')) 
test_df.select(lead(test_df.Date, count=1, default=None).over(w).alias("Next_Date")).show() 

che mi dà questo errore:

Py4JJavaError: An error occurred while calling o59.select. : org.apache.spark.sql.AnalysisException: Could not resolve window function 'lead'. Note that, using window functions currently requires a HiveContext;

Quindi sembra che abbia bisogno di un HiveContext, giusto? Devo creare il mio DataFrame usando un HiveContext? Allora mi permetta di provare a creare un dataframe esplicitamente utilizzando HiveContext:

def ts_to_hive_df(ts): 
    data = [] 
    for line in ts['data']: 
     data.append({'Date':dt.datetime.strptime(line['timestamp'][:10], '%Y-%m-%d').date(), 
       ts['name'].replace('&', '').replace(' ', '_'):line['value']}) 
    temp_rdd = sc.parallelize(data).map(lambda x: Row(**x)) 
    return hiveContext.createDataFrame(temp_rdd) 

test_df = ts_to_hive_df(test_ts) 
test_df.show() 

Ma che mi dà questo errore:

TypeError: 'JavaPackage' object is not callable

così come faccio a utilizzare le funzioni della finestra? Devo creare i DataFrames usando un HiveContext? Se è così, allora come faccio? Qualcuno può dirmi cosa sto sbagliando?

* Ho bisogno di sapere se ci sono lacune nei miei dati. Ho la colonna 'Data' e per ogni riga, ordinata per Data, voglio sapere cosa c'è nella riga successiva, e se ho giorni mancanti o dati cattivi, allora voglio usare i dati dell'ultimo giorno su quella riga. Se conosci un modo migliore per farlo, fammi sapere. Ma mi piacerebbe ancora sapere come far funzionare queste funzioni di Windows.

+0

spiacenti. Aggiunto codice specifico. Spero che ci porti da qualche parte. Grazie per dare un'occhiata. – Nathaniel

+1

Va bene, sembra che qualcosa potrebbe essere incasinato con come ho Spark (o Hive?) Installato localmente, perché posso farlo funzionare in un notebook DataBricks. DataBricks non vuole che creiamo i nostri HiveContexts o SQLContexts. Per farlo funzionare, ho omesso la creazione dei miei contesti personali e ho utilizzato la funzione ts_to_hive_df di cui sopra, sostituendo il mio hiveContext con il loro sqlContext. Dovrò farlo funzionare nella mia installazione alla fine. Tornerò e scriverò una soluzione quando la scoprirò. – Nathaniel

+1

Sembra che i binari Spark siano stati creati senza supporto Hive. – zero323

risposta

0

Questa è una domanda più vecchia e quindi discutibile visto che probabilmente ci si è trasferiti sulle nuove versioni di Spark. Sto eseguendo la scintilla 2.0 da solo, quindi questo potrebbe essere un imbroglio.

Ma a seguire: 2 possibili problemi. Nel primo esempio, penso che lo .toDF() sia di default in SQLContext poiché entrambi avete chiamato. Nel secondo, quando refactored, potrebbe essere che stai chiamando il hivecontext all'interno della funzione?

Se refactoring la tua seconda funzione ts_to_df per avere hivecontext chiamato all'esterno della funzione, tutto va bene.

def ts_to_df(ts): 
    data = [] 
    for line in ts['data']: 
     data.append({'Date':dt.datetime.strptime(line['timestamp'][:10], '%Y-%m-%d').date(), 
       ts['name'].replace('&', '').replace(' ', '_'):line['value']}) 
    return data 

data = ts_to_df(test_ts) 
test_rdd = sc.parallelize(data).map(lambda x: Row(**x)) 
test_df = hiveContext.createDataFrame(test_rdd) 

from pyspark.sql.functions import lag, col, lead 
from pyspark.sql.window import Window 

w = Window().partitionBy().orderBy(col('Date')) 
test_df.select(lead(test_df.Date, count=1, default=None).over(w).alias("Next_Date")).show() 

ottengo l'uscita

+----------+ 
| Next_Date| 
+----------+ 
|2005-08-26| 
|2005-08-29| 
|2005-08-30| 
|2005-08-31| 
|2005-09-01| 
|2005-09-02| 
.....