2016-06-03 31 views
9

Sto usando logstash jdbc per mantenere le cose sincronizzate tra mysql e elasticsearch. Funziona bene per un tavolo. Ma ora voglio farlo per più tavoli. Ho bisogno di aprire più nel terminaleinput multipli su logstash jdbc

logstash agent -f /Users/logstash/logstash-jdbc.conf 

ciascuno con una query di selezione o abbiamo un modo migliore di farlo in modo che possiamo avere più tabelle in fase di aggiornamento.

mio file di configurazione

input { 
    jdbc { 
    jdbc_driver_library => "/Users/logstash/mysql-connector-java-5.1.39-bin.jar" 
    jdbc_driver_class => "com.mysql.jdbc.Driver" 
    jdbc_connection_string => "jdbc:mysql://localhost:3306/database_name" 
    jdbc_user => "root" 
    jdbc_password => "password" 
    schedule => "* * * * *" 
    statement => "select * from table1" 
    } 
} 
output { 
    elasticsearch { 
     index => "testdb" 
     document_type => "table1" 
     document_id => "%{table_id}" 
     hosts => "localhost:9200" 
    } 
} 
+0

si può avere una sola configurazione con più 'ingresso jdbc' e quindi parametrizzare il' 'index' e document_type' nel vostro' uscita elasticsearch' a seconda di quale tavolo l'evento proviene. – Val

+0

qualche esempio o campione che hai? – Autolycus

risposta

21

Si può sicuramente avere un unico configurazione con più jdbc input e quindi parametrizzare il index e document_type nella vostra elasticsearch uscita a seconda di quale tavolo l'evento proviene.

input { 
    jdbc { 
    jdbc_driver_library => "/Users/logstash/mysql-connector-java-5.1.39-bin.jar" 
    jdbc_driver_class => "com.mysql.jdbc.Driver" 
    jdbc_connection_string => "jdbc:mysql://localhost:3306/database_name" 
    jdbc_user => "root" 
    jdbc_password => "password" 
    schedule => "* * * * *" 
    statement => "select * from table1" 
    type => "table1" 
    } 
    jdbc { 
    jdbc_driver_library => "/Users/logstash/mysql-connector-java-5.1.39-bin.jar" 
    jdbc_driver_class => "com.mysql.jdbc.Driver" 
    jdbc_connection_string => "jdbc:mysql://localhost:3306/database_name" 
    jdbc_user => "root" 
    jdbc_password => "password" 
    schedule => "* * * * *" 
    statement => "select * from table2" 
    type => "table2" 
    } 
    # add more jdbc inputs to suit your needs 
} 
output { 
    elasticsearch { 
     index => "testdb" 
     document_type => "%{type}" # <- use the type from each input 
     hosts => "localhost:9200" 
    } 
} 
+0

hmm ... Penso che il problema sarebbe document_id => "% {table_id}" a meno che non riesca a generare automaticamente un documento_id unico – Autolycus

+0

Puoi elaborare per favore? Disponi di campi ID diversi per tabella che desideri utilizzare come ID documento? – Val

+0

sì, quindi ho ID diversi e preferisco creare ID di elasticsearch invece di provare a utilizzare l'ID mysql già esistente – Autolycus

0

Questo non creerà dati duplicati. e compatibile logstash 6x.

# YOUR_DATABASE_NAME : test 
# FIRST_TABLE : place 
# SECOND_TABLE : things  
# SET_DATA_INDEX : test_index_1, test_index_2 

input { 
    jdbc { 
     # The path to our downloaded jdbc driver 
     jdbc_driver_library => "/mysql-connector-java-5.1.44-bin.jar" 
     jdbc_driver_class => "com.mysql.jdbc.Driver" 
     # Postgres jdbc connection string to our database, YOUR_DATABASE_NAME 
     jdbc_connection_string => "jdbc:mysql://localhost:3306/test" 
     # The user we wish to execute our statement as 
     jdbc_user => "root" 
     jdbc_password => "" 
     schedule => "* * * * *" 
     statement => "SELECT @slno:[email protected]+1 aut_es_1, es_qry_tbl.* FROM (SELECT * FROM `place`) es_qry_tbl, (SELECT @slno:=0) es_tbl" 
     type => "place" 
     add_field => { "queryFunctionName" => "getAllDataFromFirstTable" } 
     use_column_value => true 
     tracking_column => "aut_es_1" 
    } 

    jdbc { 
     # The path to our downloaded jdbc driver 
     jdbc_driver_library => "/mysql-connector-java-5.1.44-bin.jar" 
     jdbc_driver_class => "com.mysql.jdbc.Driver" 
     # Postgres jdbc connection string to our database, YOUR_DATABASE_NAME 
     jdbc_connection_string => "jdbc:mysql://localhost:3306/test" 
     # The user we wish to execute our statement as 
     jdbc_user => "root" 
     jdbc_password => "" 
     schedule => "* * * * *" 
     statement => "SELECT @slno:[email protected]+1 aut_es_2, es_qry_tbl.* FROM (SELECT * FROM `things`) es_qry_tbl, (SELECT @slno:=0) es_tbl" 
     type => "things" 
     add_field => { "queryFunctionName" => "getAllDataFromSecondTable" } 
     use_column_value => true 
     tracking_column => "aut_es_2" 
    } 
} 

# install uuid plugin 'bin/logstash-plugin install logstash-filter-uuid' 
# The uuid filter allows you to generate a UUID and add it as a field to each processed event. 

filter { 

    mutate { 
      add_field => { 
        "[@metadata][document_id]" => "%{aut_es_1}%{aut_es_2}" 
      } 
    } 

    uuid { 
     target => "uuid" 
     overwrite => true 
    }  
} 

output { 
    stdout {codec => rubydebug} 
    if [type] == "place" { 
     elasticsearch { 
      hosts => "localhost:9200" 
      index => "test_index_1_12" 
      #document_id => "%{aut_es_1}" 
      document_id => "%{[@metadata][document_id]}" 
     } 
    } 
    if [type] == "things" { 
     elasticsearch { 
      hosts => "localhost:9200" 
      index => "test_index_2_13" 
      document_id => "%{[@metadata][document_id]}" 
      # document_id => "%{aut_es_2}" 
      # you can set document_id . otherwise ES will genrate unique id. 
     } 
    } 
}