2015-06-15 22 views
6

Sto usando il HTTPSource in Flume per ricevere POST eventi in formato json come segue:Come inserire JSON in HDFS utilizzando Flume correttamente

{"username":"xyz","password":"123"} 

La mia domanda è: Devo modificare la fonte di gli eventi (intendo quella che invia il JSON al Flume) in modo che il JSON, ha il seguente formato:

[{ 
    "headers" : { 
      "timestamp" : "434324343", 
      "host" : "random_host.example.com" 
      }, 
    "body" : "{"username":"xyz","password":"123"}" 
}] 

Questo è il modo migliore per farlo? O posso modificarlo ovunque?

Il mio file conf per il flume agent è:

## Componentes 
SomeAgent.sources = SomeHTTP 
SomeAgent.channels = MemChannel 
SomeAgent.sinks = SomeHDFS 

## Fuente e Interceptores 
SomeAgent.sources.SomeHTTP.type = http 
SomeAgent.sources.SomeHTTP.port = 5140 
SomeAgent.sources.SomeHTTP.handler = org.apache.flume.source.http.JSONHandler 
SomeAgent.sources.SomeHTTP.channels = MemChannel 
SomeAgent.sources.SomeHTTP.interceptors = i1 i2 

## Interceptores 
SomeAgent.sources.SomeHTTP.interceptors.i1.type = timestamp 
SomeAgent.sources.SomeHTTP.interceptors.i2.type = host 
SomeAgent.sources.SomeHTTP.interceptors.i2.hostHeader = hostname 

## Canal 
SomeAgent.channels.MemChannel.type = memory 
SomeAgent.channels.MemChannel.capacity = 10000 
SomeAgent.channels.MemChannel.transactionCapacity = 1000 

## Sumidero 
SomeAgent.sinks.SomeHDFS.type = hdfs 
SomeAgent.sinks.SomeHDFS.channel = MemChannel 
SomeAgent.sinks.SomeHDFS.hdfs.path = /raw/logs/%Y-%m-%d 
SomeAgent.sinks.SomeHDFS.hdfs.fileType = DataStream 
SomeAgent.sinks.SomeHDFS.hdfs.filePrefix = SomeLogs- 
SomeAgent.sinks.SomeHDFS.hdfs.writeFormat = Text 
SomeAgent.sinks.SomeHDFS.hdfs.batchSize = 100 
SomeAgent.sinks.SomeHDFS.hdfs.rollSize = 0 
SomeAgent.sinks.SomeHDFS.hdfs.rollCount = 10000 
SomeAgent.sinks.SomeHDFS.hdfs.rollInterval = 600 
SomeAgent.sinks.SomeHDFS.hdfs.useLocalTimeStamp = true 

Esecuzione del cat di hadoop fs

$ hadoop fs -ls -R /raw/logs/somes 
drwxr-xr-x - flume-agent supergroup   0 2015-06-16 12:43 /raw/logs/arquimedes/2015-06-16 
-rw-r--r-- 3 flume-agent supergroup  3814 2015-06-16 12:33 /raw/logs/arquimedes/2015-06-16/SomeLogs.1434471803369 
-rw-r--r-- 3 flume-agent supergroup  3719 2015-06-16 12:43 /raw/logs/arquimedes/2015-06-16/SomeLogs.1434472404774 


$ hadoop fs -cat /raw/logs/somes/2015-06-16/SomeLogs.1434471803369 | head 




$ 

(si guarda in modo corretto, righe vuote)

Se ora guardo il file (usando la vista binaria di HUE per esempio):

0000000: 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a ................ 
0000010: 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a ................ 
0000020: 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a ................ 

risposta

4

Se ho capito bene, si desidera serializzare sia i dati che le intestazioni. In tal caso, non è necessario modificare l'origine dati, ma utilizzare alcuni elementi Flume standard e creare il serializzatore personalizzato per HDFS.

Il primo passo è ottenere Flume per creare la struttura JSON desiderata, ovvero intestazioni + corpo. Flume è in grado di farlo per voi, basta usare JSONHandler a vostra HTTPSource, in questo modo:

a1.sources = r1 
a1.sources.r1.hnadler = org.apache.flume.source.http.JSONHandler 

In realtà, non è necessario configurare il gestore JSON dal momento che è quello di default per HTTPSource.

Quindi, utilizzare sia Timestamp Interceptor e Host Interceptor per aggiungere le intestazioni desiderate. L'unico trucco è l'agente Flume deve essere eseguito nella stessa macchina che il processo mittente, necessario l'host intercettata è lo stesso di quello del mittente:

a1.sources.r1.interceptors = i1 i2 
a1.sources.r1.interceptors.i1.type = timestamp 
a1.sources.r1.interceptors.i2.type = host 
a1.sources.r1.interceptors.i2.hostHeader = hostname 

A questo punto, si avrà l'evento desiderato. Tuttavia, i serializzatori standard per HDFS salvano solo il corpo, non le intestazioni. Quindi creare un serializzatore personalizzato che implementa org.apache.flume.serialization.EventSerializer. Esso si configura come:

a1.sinks = k1 
a1.sinks.k1.type = hdfs 
a1.sinks.k1.hdfs.serializer = my_custom_serializer 

HTH

+0

Ho trovato questo collegamento (http://grokbase.com/t/flume/user/128nspvnfg/can-hdfssink-write-headers-as-well) dicendo che i serializzatori di cuatom per HDFSSink possono essere creati solo per ' fileType = CompressedStream' o 'DataStream'. Non so se è attualmente fissato per 'SequenceFiles'. – frb

+0

frb grazie per la vostra risposta, ho appena incollato il file di configurazione, ma quando guardo (usando 'hadoop fs -cat/raw/log/2015-06-15/SomeLog-.1434410388430') non vedo nulla (a molte linee di mazzo vuote, che sospetto siano in binario) Potresti vedere l'errore? – nanounanue

+0

Ho aggiunto l'output come binario nella domanda ... Non sta registrando nulla ': (' – nanounanue

3

La risposta inviato da @frb era corretta, l'unico punto che manca è che il generatore JSON deve inviare la parte body (devo ammettere/lamentano del fatto che la docs non sono chiare in quel punto), quindi, il corretto modo di distacco del json è

[body:"{'username':'xyz','password':'123'}"] 

si prega di non te che il json di dati è ora una stringa.

Con questa modifica, lo json è ora visibile nello hdfs.

+0

Buon punto! Hai dovuto finalmente modificare la fonte degli eventi per inviare il JSON come stringa? O hai modificato in qualche modo il JSONHandler per stringificare i dati ricevuti? – frb

+0

Ho modificato la sorgente. La modifica era stringificare il json e aggiungere la chiave 'body' – nanounanue

1

Il Flume HTTPSource che utilizza il JSONHandler predefinito prevede un elenco di eventi Flume completamente formati nella rappresentazione JSON [{ headers: ..., body: ... }] da inviare all'endpoint; per creare un endpoint agente che può accettare una struttura a livello di applicazione come {"username":"xyz", "password":"123"}, è possibile sovrascrivere il gestore con una classe alternativa che implementa HTTPSourceHandler; vedi la fonte JSONHandler - non c'è molto da fare.

public List<Event> getEvents(HttpServletRequest request) throws ... 

In una consuetudine JSONHandler si potrebbe anche aggiungere intestazioni per l'evento in base alla richiesta HTTP, come ad esempio l'IP di origine, User-Agent, ecc (un Interceptor non avrà il contesto per questo). Si consiglia di convalidare il JSON fornito dall'applicazione a questo punto (anche se il gestore predefinito non lo fa).

Anche se, come hai trovato, è possibile passare solo la parte [{body: ...}], tale gestore personalizzato potrebbe anche essere utile se si vuole evitare un generatore di iniettare intestazioni per l'evento.