2015-07-20 34 views
8

Sto consumando messaggi dalla coda SQS di Amazon. Ho migliaia di messaggi in coda. Quando avvio l'applicazione (scritta in Java con framework Spring) Avvia i messaggi di polling dalla coda e dopo aver ricevuto 500 messaggi Si ferma. Se riavvio l'applicazione, consumerà altri 500 messaggi.Amazon SQS java sdk si interrompe dopo aver consumato 500 messaggi

Il mio codice è come ...

produzione connessioni

@Bean 
public DefaultJmsListenerContainerFactory jmsListenerContainerFactoryActiveMQ() { 
    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); 
    factory.setConnectionFactory(connectionFactory()); 
    factory.setConcurrency("3-15"); 
    factory.setReceiveTimeout(3000L); 
    return factory; 
} 

@Bean(name = "sqsJmsListenerContainerFactory") 
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory(CustomDestinationResolver resolver) { 
    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); 
    factory.setConnectionFactory(sqsConnectionFactory()); 
    factory.setConcurrency("3-15"); 
    factory.setReceiveTimeout(3000L); 
    return factory; 
} 

Listener

@JmsListener(containerFactory = "sqsJmsListenerContainerFactory", destination = "sqs.queue") 
public void onMessage(Message message) { 
    //Processing message 
} 

C'è qualcosa che non ho bisogno di configurare in coda di amazon o in fagioli produzione connessioni ?
Grazie :-)

Aggiornato: Aggiunto discarica filo

messaggi

mentre l'applicazione sta consumando
DefaultMessageListenerContainer in discarica discussione è come

"[email protected]" prio=5 tid=0x18 nid=NA runnable 
    java.lang.Thread.State: RUNNABLE 
     at java.net.SocketInputStream.socketRead0(SocketInputStream.java:-1) 
     at java.net.SocketInputStream.socketRead(SocketInputStream.java:116) 
     at java.net.SocketInputStream.read(SocketInputStream.java:170) 
     at java.net.SocketInputStream.read(SocketInputStream.java:141) 
     at sun.security.ssl.InputRecord.readFully(InputRecord.java:465) 
     at sun.security.ssl.InputRecord.read(InputRecord.java:503) 
     at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:961) 
     - locked <0x2230> (a java.lang.Object) 
     at sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:918) 
     at sun.security.ssl.AppInputStream.read(AppInputStream.java:105) 
     - locked <0x2231> (a sun.security.ssl.AppInputStream) 
     at org.apache.http.impl.io.AbstractSessionInputBuffer.fillBuffer(AbstractSessionInputBuffer.java:160) 
     at org.apache.http.impl.io.SocketInputBuffer.fillBuffer(SocketInputBuffer.java:84) 
     at org.apache.http.impl.io.AbstractSessionInputBuffer.readLine(AbstractSessionInputBuffer.java:273) 
     at org.apache.http.impl.conn.LoggingSessionInputBuffer.readLine(LoggingSessionInputBuffer.java:116) 
     at org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:140) 
     at org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:57) 
     at org.apache.http.impl.io.AbstractMessageParser.parse(AbstractMessageParser.java:260) 
     at org.apache.http.impl.AbstractHttpClientConnection.receiveResponseHeader(AbstractHttpClientConnection.java:283) 
     at org.apache.http.impl.conn.DefaultClientConnection.receiveResponseHeader(DefaultClientConnection.java:251) 
     at org.apache.http.impl.conn.ManagedClientConnectionImpl.receiveResponseHeader(ManagedClientConnectionImpl.java:197) 
     at org.apache.http.protocol.HttpRequestExecutor.doReceiveResponse(HttpRequestExecutor.java:271) 
     at com.amazonaws.http.protocol.SdkHttpRequestExecutor.doReceiveResponse(SdkHttpRequestExecutor.java:66) 
     at org.apache.http.protocol.HttpRequestExecutor.execute(HttpRequestExecutor.java:123) 
     at org.apache.http.impl.client.DefaultRequestDirector.tryExecute(DefaultRequestDirector.java:682) 
     at org.apache.http.impl.client.DefaultRequestDirector.execute(DefaultRequestDirector.java:486) 
     at org.apache.http.impl.client.AbstractHttpClient.doExecute(AbstractHttpClient.java:863) 
     at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:82) 
     at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:57) 
     at com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:685) 
     at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:460) 
     at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:295) 
     at com.amazonaws.services.sqs.AmazonSQSClient.invoke(AmazonSQSClient.java:2291) 
     at com.amazonaws.services.sqs.AmazonSQSClient.deleteMessage(AmazonSQSClient.java:1340) 
     at com.amazon.sqs.javamessaging.AmazonSQSMessagingClientWrapper.deleteMessage(AmazonSQSMessagingClientWrapper.java:127) 
     at com.amazon.sqs.javamessaging.acknowledge.AutoAcknowledger.acknowledge(AutoAcknowledger.java:33) 
     at com.amazon.sqs.javamessaging.acknowledge.AutoAcknowledger.notifyMessageReceived(AutoAcknowledger.java:42) 
     at com.amazon.sqs.javamessaging.SQSMessageConsumerPrefetch.messageHandler(SQSMessageConsumerPrefetch.java:477) 
     at com.amazon.sqs.javamessaging.SQSMessageConsumerPrefetch.receive(SQSMessageConsumerPrefetch.java:410) 
     at com.amazon.sqs.javamessaging.SQSMessageConsumer.receive(SQSMessageConsumer.java:157) 
     at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveMessage(AbstractPollingMessageListenerContainer.java:413) 
     at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:293) 
     at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:246) 
     at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1144) 
     at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1136) 
     at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1033) 
     at java.lang.Thread.run(Thread.java:745) 


ConsumerPrefetchThread in discarica thread viene come

"[email protected]" daemon prio=5 tid=0x1b nid=NA runnable 
    java.lang.Thread.State: RUNNABLE 
     at java.net.SocketInputStream.socketRead0(SocketInputStream.java:-1) 
     at java.net.SocketInputStream.socketRead(SocketInputStream.java:116) 
     at java.net.SocketInputStream.read(SocketInputStream.java:170) 
     at java.net.SocketInputStream.read(SocketInputStream.java:141) 
     at sun.security.ssl.InputRecord.readFully(InputRecord.java:465) 
     at sun.security.ssl.InputRecord.read(InputRecord.java:503) 
     at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:961) 
     - locked <0x23a7> (a java.lang.Object) 
     at sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:918) 
     at sun.security.ssl.AppInputStream.read(AppInputStream.java:105) 
     - locked <0x23a8> (a sun.security.ssl.AppInputStream) 
     at org.apache.http.impl.io.AbstractSessionInputBuffer.fillBuffer(AbstractSessionInputBuffer.java:160) 
     at org.apache.http.impl.io.SocketInputBuffer.fillBuffer(SocketInputBuffer.java:84) 
     at org.apache.http.impl.io.AbstractSessionInputBuffer.readLine(AbstractSessionInputBuffer.java:273) 
     at org.apache.http.impl.conn.LoggingSessionInputBuffer.readLine(LoggingSessionInputBuffer.java:116) 
     at org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:140) 
     at org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:57) 
     at org.apache.http.impl.io.AbstractMessageParser.parse(AbstractMessageParser.java:260) 
     at org.apache.http.impl.AbstractHttpClientConnection.receiveResponseHeader(AbstractHttpClientConnection.java:283) 
     at org.apache.http.impl.conn.DefaultClientConnection.receiveResponseHeader(DefaultClientConnection.java:251) 
     at org.apache.http.impl.conn.ManagedClientConnectionImpl.receiveResponseHeader(ManagedClientConnectionImpl.java:197) 
     at org.apache.http.protocol.HttpRequestExecutor.doReceiveResponse(HttpRequestExecutor.java:271) 
     at com.amazonaws.http.protocol.SdkHttpRequestExecutor.doReceiveResponse(SdkHttpRequestExecutor.java:66) 
     at org.apache.http.protocol.HttpRequestExecutor.execute(HttpRequestExecutor.java:123) 
     at org.apache.http.impl.client.DefaultRequestDirector.tryExecute(DefaultRequestDirector.java:682) 
     at org.apache.http.impl.client.DefaultRequestDirector.execute(DefaultRequestDirector.java:486) 
     at org.apache.http.impl.client.AbstractHttpClient.doExecute(AbstractHttpClient.java:863) 
     at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:82) 
     at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:57) 
     at com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:685) 
     at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:460) 
     at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:295) 
     at com.amazonaws.services.sqs.AmazonSQSClient.invoke(AmazonSQSClient.java:2291) 
     at com.amazonaws.services.sqs.AmazonSQSClient.receiveMessage(AmazonSQSClient.java:1021) 
     at com.amazon.sqs.javamessaging.AmazonSQSMessagingClientWrapper.receiveMessage(AmazonSQSMessagingClientWrapper.java:319) 
     at com.amazon.sqs.javamessaging.SQSMessageConsumerPrefetch.getMessages(SQSMessageConsumerPrefetch.java:216) 
     at com.amazon.sqs.javamessaging.SQSMessageConsumerPrefetch.run(SQSMessageConsumerPrefetch.java:180) 
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
     at java.lang.Thread.run(Thread.java:745) 

Quando applicazione si interrompe il consumo di messaggi
ConsumerPrefetchThread in discarica discussione è come

"[email protected]" prio=5 tid=0x18 nid=NA waiting 
    java.lang.Thread.State: WAITING 
     at java.lang.Object.wait(Object.java:-1) 
     at java.lang.Object.wait(Object.java:502) 
     at org.apache.commons.pool.impl.GenericKeyedObjectPool.borrowObject(GenericKeyedObjectPool.java:1151) 
     at org.apache.activemq.jms.pool.ConnectionPool.createSession(ConnectionPool.java:133) 
     at org.apache.activemq.jms.pool.PooledConnection.createSession(PooledConnection.java:167) 
     at com.ac.jms.senders.AbstractNoResponseSender.request(AbstractNoResponseSender.java:40) 
     at com.ac.mic.listener.AbstractMicQueueListener.onMessage(AbstractMicQueueListener.java:117) 
     at com.ac.mic.listener.MicQueueListener.onMessage(MicQueueListener.java:40) 
     at sun.reflect.GeneratedMethodAccessor240.invoke(Unknown Source:-1) 
     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
     at java.lang.reflect.Method.invoke(Method.java:497) 
     at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:185) 
     at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:104) 
     at org.springframework.jms.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:90) 
     at org.springframework.jms.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:66) 
     at org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:674) 
     at org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:634) 
     at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:605) 
     at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:308) 
     at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:246) 
     at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1144) 
     at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1136) 
     at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1033) 
     at java.lang.Thread.run(Thread.java:745) 


ConsumerPrefetchThread in discarica discussione è come

"[email protected]" daemon prio=5 tid=0x1b nid=NA waiting 
    java.lang.Thread.State: WAITING 
     at java.lang.Object.wait(Object.java:-1) 
     at java.lang.Object.wait(Object.java:502) 
     at com.amazon.sqs.javamessaging.SQSMessageConsumerPrefetch.waitForPrefetch(SQSMessageConsumerPrefetch.java:273) 
     at com.amazon.sqs.javamessaging.SQSMessageConsumerPrefetch.run(SQSMessageConsumerPrefetch.java:174) 
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
     at java.lang.Thread.run(Thread.java:745) 
+0

Provare a fare un dump del thread quando il cliente si ferma, per vedere cosa stanno facendo i thread del contenitore. Forse è bloccato nel tuo codice? –

+0

@GaryRussell Ho aggiunto il dump del thread. In questa applicazione, sto anche usando ActiveMQ. Sto spingendo i messaggi che otterrò da SQS alle code ActiveMQ. – Piyush

risposta

3

presenta come una sorta di piscina esaurimento nella vostra codice ...

at org.apache.commons.pool.impl.GenericKeyedObjectPool.borrowObject(GenericKeyedObjectPool.java:1151) 
    at org.apache.activemq.jms.pool.ConnectionPool.createSession(ConnectionPool.java:133) 
    at org.apache.activemq.jms.pool.PooledConnection.createSession(PooledConnection.java:167) 
    at com.ac.jms.senders.AbstractNoResponseSender.request(AbstractNoResponseSender.java:40) 
    at com.ac.mic.listener.AbstractMicQueueListener.onMessage(AbstractMicQueueListener.java:117) 
    at com.ac.mic.listener.MicQueueListener.onMessage(MicQueueListener.java:40) 

Il thread del contenitore è bloccato cercando di ottenere una sessione da PooledConnection.

Forse non stai restituendo le sessioni alla piscina?

Considerare l'utilizzo di JmsTemplate anziché il proprio codice per parlare con JMS. Evita tali problemi.

+1

Ho risolto questo problema. Non stavo chiudendo sessione e connessione dopo aver inviato il messaggio. Ho imparato a usare il dump del thread. Grazie @GaryRussell :) – Piyush