2016-07-02 74 views
5

La documentazione di Elasticsaerch riguarda solo il caricamento di un indice completo su Spark.Come interrogare un indice Elasticsearch utilizzando Pyspark e Dataframes

from pyspark.sql import SQLContext 
sqlContext = SQLContext(sc) 
df = sqlContext.read.format("org.elasticsearch.spark.sql").load("index/type") 
df.printSchema() 

Come si può eseguire una query per restituire i dati da un indice elasticsearch e caricarli a Spark come dataframe utilizzando pyspark?

risposta

4

Di seguito è come lo faccio.

impostazioni dell'ambiente generale e comando:

export SPARK_HOME=/home/ezerkar/spark-1.6.0-bin-hadoop2.6 
export PYSPARK_DRIVER_PYTHON=ipython2 

./spark-1.6.0-bin-hadoop2.6/bin/pyspark --driver-class-path=/home/eyald/spark-1.6.0-bin-hadoop2.6/lib/elasticsearch-hadoop-2.3.1.jar 

Codice:

from pyspark import SparkConf 
from pyspark.sql import SQLContext 

conf = SparkConf().setAppName("ESTest") 
sc = SparkContext(conf=conf) 
sqlContext = SQLContext(sc) 

q ="""{ 
    "query": { 
    "filtered": { 
     "filter": { 
     "exists": { 
      "field": "label" 
     } 
     }, 
     "query": { 
     "match_all": {} 
     } 
    } 
    } 
}""" 

es_read_conf = { 
    "es.nodes" : "localhost", 
    "es.port" : "9200", 
    "es.resource" : "titanic/passenger", 
    "es.query" : q 
} 

es_rdd = sc.newAPIHadoopRDD(
    inputFormatClass="org.elasticsearch.hadoop.mr.EsInputFormat", 
    keyClass="org.apache.hadoop.io.NullWritable", 
    valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", 
    conf=es_read_conf) 

sqlContext.createDataFrame(es_rdd).collect() 

È inoltre possibile definire le colonne di dati-struttura. Per ulteriori informazioni, consultare Here.

Spero che sia d'aiuto!

+0

Questo è quello che sto facendo in questo momento, speravo che ci fosse un modo per recuperare direttamente un DataFrame filtrato –

+1

Non sono sicuro che sia possibile con l'ultima API del connettore ES-Hadoop Spark. –

+1

C'è un modo per scrivere un dataframe su elasticsearch usando anche questa API? –

0

Sto eseguendo il mio codice in un cluster EMR da Amazon utilizzando pyspark. Quindi, il modo in cui l'ho fatta io lavoro è stato la seguente procedura:

1) Inserisci questa azione bootstrap nella creazione di cluster (per creare server di elasticsearch localhost):

s3://awssupportdatasvcs.com/bootstrap-actions/elasticsearch/elasticsearch_install.4.0.0.rb 

2) ho eseguito questi comandi per popolare il database elasticsearch con alcuni dati:

curl -XPUT "http://localhost:9200/movies/movie/1" -d' { 
    "title": "The Godfather", 
    "director": "Francis Ford Coppola", 
    "year": 1972 
    }' 

È inoltre possibile eseguire altri comandi ricciolo se lo si desidera, come:

curl -XGET http://localhost:9200/_search?pretty=true&q={'matchAll':{''}} 

3) I Inited pyspark utilizzando i seguenti parametri:

pyspark --driver-memory 5G --executor-memory 10G --executor-cores 2 --jars=elasticsearch-hadoop-5.5.1.jar 

avevo scaricato il client elasticsearch pitone precedentemente

4) ho eseguito il seguente codice:

from pyspark import SparkConf 
from pyspark.sql import SQLContext 

q ="""{ 
    "query": { 
    "match_all": {} 
    } 
}""" 

es_read_conf = { 
    "es.nodes" : "localhost", 
    "es.port" : "9200", 
    "es.resource" : "movies/movie", 
    "es.query" : q 
} 

es_rdd = sc.newAPIHadoopRDD(
    inputFormatClass="org.elasticsearch.hadoop.mr.EsInputFormat", 
    keyClass="org.apache.hadoop.io.NullWritable", 
    valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", 
    conf=es_read_conf) 

sqlContext.createDataFrame(es_rdd).collect() 

Poi finalmente ho potuto risultato positivo dal comando.

0

Ho riscontrato un problema simile a questo per ottenere dati geo-filtrati in un DataSource PySpark. Sto usando elasticsearch-spark-20_2.11-5.2.2.jar con Spark versione 2.1.1 e ES versione 5.2. Sono stato in grado di caricare i dati in un dataframe specificando la mia domanda come opzione durante la creazione del dataframe

mio geo-query

q ="""{ 
    "query": { 
     "bool" : { 
      "must" : { 
       "match_all" : {} 
      }, 
      "filter" : { 
       "geo_distance" : { 
        "distance" : "100km", 
        "location" : { 
         "lat" : 35.825, 
         "lon" : -87.99 
        } 
       } 
      } 
     } 
    } 
}""" 

Ho usato il seguente comando per caricare i dati in dataframe

spark_df = spark.read.format("es").option("es.query", q).load("index_name")