2016-02-28 24 views
7

Dopo aver letto diverse pagine di documentazione di Apache Flink (official documentation, dataartisans), così come gli esempi forniti nel official repository, continuo a vedere esempi in cui usano come fonte di dati per streamming un file già scaricato, collegandoti sempre all'host locale.Ottenere elementi JSON da un web con Apache Flink

Sto cercando di utilizzare Apache Flink per scaricare file JSON che contengono dati dinamici. La mia intenzione è di provare a stabilizzare l'url in cui posso accedere al file JSON come sorgente di input di Apache Flink, invece di scaricarlo con un altro sistema ed elaborare il file scaricato con Apache Flink.

È possibile stabilire questa connessione di rete con Apache Flink?

risposta

4

È possibile definire gli URL che si desidera scaricare come input DataStream e quindi scaricare i documenti dall'interno di uno MapFunction. Il seguente codice dimostra questo:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 

DataStream<String> inputURLs = env.fromElements("http://www.json.org/index.html"); 

inputURLs.map(new MapFunction<String, String>() { 
    @Override 
    public String map(String s) throws Exception { 
     URL url = new URL(s); 
     InputStream is = url.openStream(); 

     BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(is)); 

     StringBuilder builder = new StringBuilder(); 
     String line; 

     try { 
      while ((line = bufferedReader.readLine()) != null) { 
       builder.append(line + "\n"); 
      } 
     } catch (IOException ioe) { 
      ioe.printStackTrace(); 
     } 

     try { 
      bufferedReader.close(); 
     } catch (IOException ioe) { 
      ioe.printStackTrace(); 
     } 

     return builder.toString(); 
    } 
}).print(); 

env.execute("URL download job"); 
+0

Eseguo codice di esempio, ma viene eseguito solo una volta e letto tutto il file. Comunque Iit non è in streaming, ho pensato che contiune leggere quando c'è incease nel file JSON. – zt1983811

+0

Per quello dovresti usare il 'ContinuousFileMonitoringFunction'. Lo streaming di per sé non significa che il lavoro verrà eseguito infinitamente lungo. Questo succede solo se hai una fonte non finita. Ma in questo caso la funzione 'env.fromElements' produce una sorgente di streaming limitata. Una volta che questa fonte raggiunge la sua fine, il programma termina. –