2014-07-03 7 views
8

Stiamo cercando di utilizzare Apache Storm per l'elaborazione di grandi quantità di messaggi (falsi). Esempio Messaggio:Apache Storm java.nio.channels.ClosedChannelException: null

"{"clientName":"Sergey Bakulin","sum":12925,"group":"property","suspicious":false,"clientId":2,"dt":1404387303764,"coord":{"lat":55.767842588357645,"lon":37.46920361823332}}". 

Stiamo usando Apache Kafka come fonte di messaggi per il nostro gruppo Storm. Il nostro scopo è essere in grado di elaborare almeno 50k msg/sec/nodo. Nel caso in cui si usa più di un nodo costantemente bloccati con l'errore (frammento di log è da lavoratore - * log.):

2014-07-03 15:14:47 b.s.m.n.Client [INFO] failed to send requests to ip-172-31-23-123.eu-west-1.compute.internal/172.31.23.123:6701: java.nio.channels.ClosedChannelException: null 
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.cleanUpWriteBuffer(AbstractNioWorker.java:381) [netty-3.6.3.Final.jar:na] 
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.close(AbstractNioWorker.java:349) [netty-3.6.3.Final.jar:na] 
at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:93) [netty-3.6.3.Final.jar:na] 
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:107) [netty-3.6.3.Final.jar:na] 
at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:312) [netty-3.6.3.Final.jar:na] 
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:88) [netty-3.6.3.Final.jar:na] 
at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178) [netty-3.6.3.Final.jar:na] 
at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108) [netty-3.6.3.Final.jar:na] 
at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42) [netty-3.6.3.Final.jar:na] 
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [na:1.7.0_51] 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [na:1.7.0_51] 
at java.lang.Thread.run(Thread.java:744) [na:1.7.0_51] 
2014-07-03 15:14:47 b.s.m.n.StormClientErrorHandler [INFO] Connection failed Netty-Client-ip-172-31-23-123.eu-west-1.compute.internal/172.31.23.123:6701 

nostra attuale configurazione tempesta:

########### These MUST be filled in for a storm configuration 
storm.zookeeper.servers: 
    - "172.31.*.*" 

storm.local.dir: "/home/*/storm/data" 
nimbus.host: "127.0.0.1" 
supervisor.slots.ports: 
    - 6701 
    - 6702 

ui.port: 8090 

worker.childopts: "-Xmx6g -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=1%ID% -Dcom.sun.management.jmxremote.local.only=false -Dcom.sun$ 

supervisor.childopts: "-Xmx1024m -Djava.net.preferIPv4Stack=true" 
supervisor.worker.start.timeout.secs: 10 
supervisor.worker.timeout.secs: 10 
supervisor.monitor.frequency.secs: 3 
supervisor.heartbeat.frequency.secs: 5 
supervisor.enable: true 

storm.messaging.netty.server_worker_threads: 2 
storm.messaging.netty.client_worker_threads: 2 
storm.messaging.netty.buffer_size: 5242880 
storm.messaging.netty.max_retries: 25 
storm.messaging.netty.max_wait_ms: 1000 

nostro topologia tempesta:

Properties conf = Util.readProperties(ClientTopology.class, "storm.properties"); 

prepareRedisDB(conf); 

TopologyBuilder builder = new TopologyBuilder(); 

builder.setSpout("kafka_trans_spout", getKafkaSpout(conf, conf.getProperty("kafka_trans_topic")), 3); 
builder.setSpout("kafka_socevent_spout", getKafkaSpout(conf, conf.getProperty("kafka_socevent_topic")), 3); 

builder.setBolt("json_to_tuple_trans_bolt", new JSONToTupleBolt(Transaction.class), 6) 
     .shuffleGrouping("kafka_trans_spout"); 
builder.setBolt("json_to_tuple_socevent_bolt", new JSONToTupleBolt(SocialEvent.class), 3) 
     .shuffleGrouping("kafka_socevent_spout"); 

builder.setBolt("alert_bolt", new AlertBolt(conf), 3) 
     .fieldsGrouping("json_to_tuple_trans_bolt", new Fields("cl_id")) 
     .fieldsGrouping("json_to_tuple_socevent_bolt", new Fields("cl_id")); 
builder.setBolt("offer_bolt", new NearestOfferBolt(conf), 3) 
     .shuffleGrouping("json_to_tuple_trans_bolt"); 

run(builder, args, 6); 

private static KafkaSpout getKafkaSpout(Properties conf, String topic) { 
    SpoutConfig spoutConfig = new SpoutConfig(
      new ZkHosts(conf.getProperty("zk_host"), "/brokers"), 
      topic, 
      "/brokers", 
      conf.getProperty("kafka_consumer_group_id")); 
    List<String> zkServers = new ArrayList<String>(); 
    zkServers.add(conf.getProperty("zk_host")); 
    spoutConfig.zkServers = zkServers; 
    spoutConfig.zkPort = Integer.valueOf(conf.getProperty("zk_port")); 
    spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); 
    spoutConfig.forceFromStart = true; 
    spoutConfig.fetchSizeBytes = 5*1024*1024; 
    spoutConfig.bufferSizeBytes = 5*1024*1024; 
    storm.kafka.KafkaSpout kafkaSpout = new storm.kafka.KafkaSpout(spoutConfig); 
    return kafkaSpout; 
} 

Utilizziamo macchine c3.2xlarge AWS, Apache tempesta 0.9.2-incubazione, Apache Kafka 2.9.2-0.8.1.1.

+0

è possibile verificare che effettivamente si dispone di un servizio di ascolto su 172.31.23.123:6701 provare netstat -antp | grep 6701 su questa macchina – Pixou

+0

Hai trovato una soluzione per questo? Sto ottenendo lo stesso errore ora. – gjain

+0

L'eccezione sembra che la porta del supervisore non sia accessibile dal mondo esterno. Dai un'occhiata a questo link: https://gist.github.com/amontalenti/8ff0c31a7b95a6dea3d2 Hai provato Telnet a quella porta host? – Shams

risposta

1

test Ping e Telnet: assicurarsi che ogni macchina che corre tempesta ha il collegamento a tutte le altre macchine con Ping (tutti i lavoratori, Nimbus e zookeeper). prova a eseguire il ping tramite IP, nome host e FQDN e, se non funziona, modifica i file host (/ etc/hosts) in questo modo.

inoltre, telnet le macchine per controllare le porte aperte in storm.yaml (6701, 6702). Zookeeper (2181).

nel mio ambiente di test, le impostazioni storm.yaml funziona con le seguenti impostazioni Netty:

storm.messaging.netty.buffer_size: 5242880 
storm.messaging.netty.client_worker_threads: 1 
storm.messaging.netty.max_retries: 100 
storm.messaging.netty.max_wait_ms: 1000 
storm.messaging.netty.min_wait_ms: 100 
storm.messaging.netty.server_worker_threads: 1 
storm.messaging.transport: backtype.storm.messaging.netty.Context 
0

tenta di aggiungere il carico e poi eseguire la topologia, succede paio di volte con me come tema era nuovo e carico era assente.