2016-04-02 38 views
5

Sto analizzando i record delle prestazioni in tempo reale dei voli nazionali statunitensi a partire dal 2015. Devo raggruppare per numero di coda e archiviare una lista ordinata di tutti i voli per ciascun numero di coda in un database, che sarà recuperata dalla mia domanda. Non sono sicuro di quale delle due opzioni per ottenere questo è la migliore.Qual è il modo più efficiente per eseguire una riduzione ordinata in PySpark?

# Load the parquet file 
on_time_dataframe = sqlContext.read.parquet('../data/on_time_performance.parquet') 

# Filter down to the fields we need to identify and link to a flight 
flights = on_time_dataframe.rdd.map(lambda x: 
    (x.Carrier, x.FlightDate, x.FlightNum, x.Origin, x.Dest, x.TailNum) 
) 

posso realizzare questo in una sorta di ridurre ...

# Group flights by tail number, sorted by date, then flight number, then 
origin/dest 
flights_per_airplane = flights\ 
    .map(lambda nameTuple: (nameTuple[5], [nameTuple]))\ 
    .reduceByKey(lambda a, b: sorted(a + b, key=lambda x: (x[1],x[2],x[3],x[4]))) 

O posso raggiungerlo in un lavoro mappa successiva ...

# Do same in a map step, more efficient or does pySpark know how to optimize the above? 
flights_per_airplane = flights\ 
    .map(lambda nameTuple: (nameTuple[5], [nameTuple]))\ 
    .reduceByKey(lambda a, b: a + b)\ 
    .map(lambda tuple: 
    (
     tuple[0], sorted(tuple[1], key=lambda x: (x[1],x[2],x[3],x[4]))) 
    ) 

Fare questo nel ridurre sembra davvero inefficiente, ma in realtà entrambi sono molto lenti. sort() sembra il modo per farlo nei documenti di PySpark, quindi mi chiedo se PySpark non rende questo kasher internamente? Qual è l'opzione più efficiente o la scelta migliore per qualche altro motivo?

Il mio codice è anche in un gist qui: https://gist.github.com/rjurney/af27f70c76dc6c6ae05c465271331ade

Se siete curiosi di sapere i dati, è dal Bureau of Transportation Statistics, qui: http://www.transtats.bts.gov/DL_SelectFields.asp?Table_ID=236&DB_Short_Name=On-Time

risposta

3

Purtroppo in entrambi i modi sono sbagliati prima di anche iniziare a ordinare e non c'è un modo efficace e semplice per farlo in Spark. Tuttavia, il primo è significativamente peggiore dell'altro.

Perché entrambi i modi sono sbagliati? Perché è solo un altro groupByKey ed è semplicemente un'operazione costosa. Ci sono alcuni modi in cui puoi provare a migliorare le cose (specialmente per evitare la riduzione laterale della mappa) ma alla fine della giornata devi solo pagare il prezzo di una mescolata completa e se non vedi fallimenti probabilmente non ne vale la pena tutto il trambusto.

Ancora, il secondo approccio è molto meglio algoritmicamente *. Se si desidera mantenere la struttura ordinata fino in fondo come nel primo tentativo, è necessario disporre di strumenti dedicati (aggregateByKey con bisect.insort sarebbe una buona scelta) ma non c'è davvero nulla da guadagnare qui.

Se l'output raggruppato è un requisito importante, la cosa migliore che puoi fare è keyBy, groupByKey e ordinare. Non migliorerà le prestazioni sopra la seconda soluzione, ma senza dubbio migliorerà la leggibilità:

(flights 
    .keyBy(lambda x: x[5]) 
    .groupByKey() 
    .mapValues(lambda vs: sorted(vs, key=lambda x: x[1:5]))) 

* Anche se si assume la migliore delle ipotesi per Timsort il primo approccio è N volte O (N) mentre il secondo è O (N log N) nello scenario peggiore.

+0

Volevo solo dire che non mi aspettavo una risposta diretta a questa domanda ed è davvero fantastico che ne abbiate dato uno! – rjurney