2015-09-14 14 views
15

Secondo thisIl lavoro di push down previsto per spark con JDBC?

Catalyst vale ottimizzazioni logici come predicato pushdown. L'ottimizzatore può spingere i predicati del filtro verso il basso nell'origine dati, abilitando l'esecuzione fisica per ignorare i dati irrilevanti.

Spark supporta il push dei predicati verso l'origine dati. Questa funzione è disponibile/prevista anche per JDBC?

(Da ispezionare i registri DB posso vedere che non è il comportamento di default in questo momento - la query completa viene passata al DB, anche se è poi limitato dai filtri scintilla)

MAGGIORI DETTAGLI

Spark 1.5 Correndo con PostgreSQL 9.4

frammento di codice:

from pyspark import SQLContext, SparkContext, Row, SparkConf 
from data_access.data_access_db import REMOTE_CONNECTION 

sc = SparkContext() 
sqlContext = SQLContext(sc) 

url = 'jdbc:postgresql://{host}/{database}?user={user}&password={password}'.format(**REMOTE_CONNECTION) 
sql = "dummy" 

df = sqlContext.read.jdbc(url=url, table=sql) 
df = df.limit(1) 
df.show() 

SQL Trace:

< 2015-09-15 07:11:37.718 EDT >LOG: execute <unnamed>: SET extra_float_digits = 3                              
< 2015-09-15 07:11:37.771 EDT >LOG: execute <unnamed>: SELECT * FROM dummy WHERE 1=0                             
< 2015-09-15 07:11:37.830 EDT >LOG: execute <unnamed>: SELECT c.oid, a.attnum, a.attname, c.relname, n.nspname, a.attnotnull OR (t.typtype = 'd' AND t.typnotnull), pg_catalog.pg_get_expr(d.adbin, d.a 
drelid) LIKE '%nextval(%' FROM pg_catalog.pg_class c JOIN pg_catalog.pg_namespace n ON (c.relnamespace = n.oid) JOIN pg_catalog.pg_attribute a ON (c.oid = a.attrelid) JOIN pg_catalog.pg_type t ON (a.a 
tttypid = t.oid) LEFT JOIN pg_catalog.pg_attrdef d ON (d.adrelid = a.attrelid AND d.adnum = a.attnum) JOIN (SELECT 15218474 AS oid , 1 AS attnum UNION ALL SELECT 15218474, 3) vals ON (c.oid = vals.oid 
AND a.attnum = vals.attnum)                                            
< 2015-09-15 07:11:40.936 EDT >LOG: execute <unnamed>: SET extra_float_digits = 3                              
< 2015-09-15 07:11:40.964 EDT >LOG: execute <unnamed>: SELECT "id","name" FROM dummy                             

Mi aspetterei che l'ultimo di selezione includerà una clausola limit 1 - ma non

+1

Basta aggiungere il limite (o altri filtri) nel codice SQL stesso –

risposta

15

DataFrames Spark supportano predicato push-down con le fonti JDBC ma termine predicato è usato in modo rigoroso SQL senso. Significa che copre solo la clausola WHERE. Inoltre sembra che sia limitato alla congiunzione logica (no IN e OR temo) e ai predicati semplici.

Tutto il resto, come limiti, conteggi, ordini, gruppi e condizioni viene elaborato sul lato Spark. Un avvertimento, già coperto da SO, è che df.count() o sqlContext.sql("SELECT COUNT(*) FROM df") è tradotto in SELECT 1 FROM df e richiede sia il sostanziale trasferimento di dati sia l'elaborazione mediante Spark.

Significa che è una causa persa? Non esattamente. È possibile utilizzare una subquery arbitraria come argomento table. E 'meno conveniente di un Pushdown predicato ma per il resto funziona abbastanza bene:

n = ... # Number of rows to take 
sql = "(SELECT * FROM dummy LIMIT {0}) AS tmp".format(int(n)) 
df = sqlContext.read.jdbc(url=url, table=sql) 

Nota:

Questo comportamento può essere migliorato in futuro, una volta origine dati API v2 è pronto: