2016-05-27 27 views
10

questo momento il mio disconnessione da un server WebLogic JMS assomiglia a questoDisconnessione dal weblogic JMS

import java.util.Hashtable; 
import javax.jms.*; 
import javax.naming.*; 
import javax.transaction.*; 
import java.util.Vector; 
import javax.rmi.PortableRemoteObject; 
import clojure.java.api.Clojure; 
import clojure.lang.IFn; 
import org.apache.log4j.Logger; 
import weblogic.jndi.*; 

public class WebLogicListener implements MessageListener, ExceptionListener{ 
    public InitialContext ctx; 
    public TopicConnectionFactory conFactory; 
    public TopicConnection tCon; 
    public TopicSession tSession; 
    public TopicSubscriber tSub; 
    public Boolean development; 
    public Topic topic; 
    /*clojure function objects*/ 
    public IFn publish; 
    public IFn close; 
    public IFn incrementMetric; 
    public IFn logMessage; 
    public IFn resync; 

    public Object channel; 
    public ExceptionListener exception; 
    public String topicName; 
    public String subName; 
    public String username; 
    public String password; 
    public String clientId; 
    public String factoryJNDI; 
    public String topicJNDI; 
    public Vector nms; 
    public Hashtable<Object,Object> env; 
    public boolean running = false; 

    public WebLogicListener (String topicName, String host, String username, String password, String factoryJNDI, 
          String topicJNDI, String clientId, String subName, String ns, String fnName, 
          boolean development, Vector nms){ 
    this.username = username; 
    this.password = password; 
    this.clientId = clientId; 
    this.topicName = topicName; 
    this.subName = subName; 
    this.development = development; 
    this.topicJNDI = topicJNDI; 
    this.factoryJNDI = factoryJNDI; 
    this.nms = nms; 
    /*Clojure interop handlers*/ 
    IFn chan = Clojure.var("clojure.core.async", "chan"); 
    resync = Clojure.var("cenx.baldr.api", "resync!"); 
    publish = Clojure.var(ns, fnName); 
    incrementMetric = Clojure.var(ns, "log-metric"); 
    logMessage = Clojure.var (ns, "log-message"); 
    close = Clojure.var("clojure.core.async","close!"); 
    /*populate envrionment*/ 
    env = new Hashtable<Object,Object>(); 
    env.put(Context.PROVIDER_URL, host); 
    env.put(Context.INITIAL_CONTEXT_FACTORY, "weblogic.jndi.WLInitialContextFactory"); 
    env.put(Context.SECURITY_PRINCIPAL, username); 
    env.put(Context.SECURITY_CREDENTIALS, password); 
    env.put("weblogic.jndi.createIntermediateContexts", "true"); 
    /*open communication channel for clojure daemon*/ 
    channel = chan.invoke(); 
    } 

    private void initListener() throws JMSException, NamingException{ 
    try{ 
     if (!running && !development){ 
     ctx = new InitialContext(env); 
     topic = (Topic) ctx.lookup(topicJNDI); 
     conFactory = (TopicConnectionFactory)PortableRemoteObject.narrow(ctx.lookup(factoryJNDI), TopicConnectionFactory.class); 
     tCon = (TopicConnection) conFactory.createTopicConnection(); 
     tCon.setExceptionListener(this); 
     tCon.setClientID(clientId); 
     tSession = (TopicSession) tCon.createTopicSession(false, 1); 
     tSub = tSession.createDurableSubscriber(topic, subName); 
     tSub.setMessageListener(this); 
     tCon.start(); 
     running = true; 
     }else{ 
     if (running){ 
      logMessage.invoke("error", String.format("Listener is already running")); 
     } 
     if (development){ 
      logMessage.invoke("info", "Running in development mode, no connection established"); 
     } 
     } 
    } catch(Exception e){ 
     logMessage.invoke("error", String.format("Unable to start listener \n %s", e.toString())); 
    } 
    } 

    public void startListener(){ 
    if (!development && env != null){ 
     try { 
     initListener(); 
     }catch(Exception e){ 
     logMessage.invoke("error", String.format("Unable to start Listener \n %s", e.toString())); 
     } 
    } else { 
     if (development){ 
     logMessage.invoke("info", "Running in development mode, no connection established"); 
     } 
     if (env == null){ 
     logMessage.invoke("error", "Environment variable is null"); 
     } 
    } 
    } 

    ///Closes the JMS connection and the channel 
    public void stopListener(){ 
    if (!development){ 
     try{ 
     tSub.close(); 
     tSession.close(); 
     tCon.close(); 
     incrementMetric.invoke("JMS-disconnect-count"); 
     }catch(Exception e){ 
     logMessage.invoke("error", String.format("Error while stopping the listener \n %s", e.toString())); 
     }finally{ 
     running = false; 
     } 
    } else { 
     logMessage.invoke("info", "Listener not started, running in development mode"); 
    } 
    } 

    public Object getChannel(){ 
    return channel; 
    } 

    //re-initializes the channel in case of error 
    public void initializeChannel(){ 
    if (channel == null){ 
     IFn chan = Clojure.var("clojure.core.async", "chan"); 
     channel = chan.invoke(); 
    } else { 
     logMessage.invoke("info", "Channel is already initialized"); 
    } 
    } 
    //accessors for debugging 

    public void closeSubscription(){ 
    try{ 
     tSub.close(); 
    }catch (Exception e){ 
     logMessage.invoke("error", "unable to close topic subscription"); 
     logMessage.invoke("error", e.toString()); 
    } 
    } 

    public void closeSession(){ 
    try{ 
     tSession.unsubscribe(subName); 
     tSession.close(); 
    }catch (Exception e){ 
     logMessage.invoke("error", "unable to close topic session"); 
     logMessage.invoke("error", e.toString()); 
    } 
    } 

    public void closeConnection(){ 
    try{ 
     tCon.close(); 
    }catch (Exception e){ 
     logMessage.invoke("error", "unable to close topic connection"); 
     logMessage.invoke("error", e.toString()); 
    } 
    } 

    public void closeContext(){ 
    try { 
     ctx.close(); 
    }catch (Exception e){ 
     logMessage.invoke("error", "unable to close context"); 
     logMessage.invoke("error", e.toString()); 
    } 
    } 

    public Boolean isRunning(){ 
    return running; 
    } 

    public Context getContext(){ 
    return ctx; 
    } 

    public TopicConnectionFactory getFactory(){ 
    return conFactory; 
    } 

    public TopicConnection getTopicConnection(){ 
    return tCon; 
    } 

    public TopicSession getTopicSession(){ 
    return tSession; 
    } 

    public Boolean getDevelopmentMode(){ 
    return development; 
    } 

    public TopicSubscriber getTopicSubscriber(){ 
    return tSub; 
    } 

    public Topic getTopic(){ 
    return topic; 
    } 

    /*Interface methods*/ 

    public void onMessage(Message message){ 
    publish.invoke(channel, message); 
    } 
    /*attempt a resync after an exception connection*/ 
    private void resync(){ 
    resync.invoke(nms); 
    } 

    private void attemptReconnect() throws Exception{ 
    if (!development){ 
     //clean up any portions of the connection that managed to establish 
     stopListener(); 
     //incase of stopListener exceptioning out set running to false 
     running = false; 
     do{ 
     try{ 
      initListener(); 
      if (running){ 
      resync(); 
      } 
     }catch(Exception e){ 
      logMessage.invoke("error", 
          String.format("Unable to establish connection to JMS server \n %s", e.toString())); 
     }finally{ 
      Thread.sleep(30000); 
     } 
     } while (!running); 
    } else { 
     logMessage.invoke("info", "Running in development mode, no connection established"); 
    } 
    } 

    public void onException(JMSException e){ 
    logMessage.invoke("error", 
         String.format("A JMS Exception has occurred, attempting to re-establish topic connection \n %s", e.toString())); 
    try{ 
     incrementMetric.invoke("JMS-disconnect-count"); 
     attemptReconnect(); 
    }catch(Exception g){ 
     logMessage.invoke("error", 
         String.format("Unable to start Listener \n %s", g.toString())); 
    } 
    } 

    /* Test functions */ 
    public void testException() throws JMSException{ 
    onException(new JMSException("testing exception function")); 
    } 

    public void testChannel (String message){ 
    if (development){ 
     publish.invoke(channel, message); 
    } 
    } 
} 

Quando creo il collegamento che uso netstat per verificare se il server è collegato

netstat - un | grep 8001 tcp 0 0 ip-address: 59730
ip-address: 8001 ISTITUITO

Poi io chiamo il mio .stopListener in aggiunta al metodo .closeContext e tornare a controllare di nuovo il mio legame con netstat e ottengo lo stesso risultato

netstat -an | grep 8001 tcp 0 0 ip-address: 59730
ip-address: 8001 STABILITO

Perché sarebbe la chiusura della sessione, abbonato, e connessione non distruggere la connessione al server JMS. La documentazione che ho trovato non mi ha dato alcuna spiegazione del motivo per cui non posso distruggere completamente la connessione.

+0

Qual è il valore del flag "sviluppo"? –

+0

è impostato su falso. Se fosse vero, una connessione non sarebbe mai stata stabilita. Vedo i messaggi di registro dei miei blocchi alla fine della disconnessione. – jrahme

+1

È possibile verificare se la connessione non viene stabilita da un altro componente prima di creare la connessione/sessione jms. Non dimenticare di chiudere anche il contesto jndi. –

risposta

0

Non sono sicuro che vi state avvicinando correttamente. Vedo che hai un listener di eccezione sulla connessione.

Su weblogic, il listener verrà richiamato più volte per ogni evento di errore, quindi non si dovrebbe tentare di disconnettersi su ciascuna chiamata. Sarebbe stato chiamato una volta per ogni utente registrato e una volta per ogni connessione monitorata. È necessario disconnettere solo se l'eccezione rappresenta un ServerConnectionLost.

Inoltre, nel gestore degli errori è sufficiente chiudere la connessione. Se hai fatto connection.close(), ciò avrebbe chiuso anche la sessione e gli ascoltatori. Non c'è bisogno di chiuderli in ordine inverso come fai tu.

E un'altra cosa. Non dovresti avere il codice "sviluppo" o "debug" o "test" nel tuo codice di produzione.

Quella parte che dice "if (! Sviluppo & & env! = Null) {" ... non dovrebbe farlo.

Ora tornando alla tua domanda, perché la connessione effettiva non è chiusa. Vedo che si sta facendo

try{ 
    tSub.close(); 
    tSession.close(); 
    tCon.close(); 
    incrementMetric.invoke("JMS-disconnect-count"); 
} catch... 

Se tSub.close() o tSession.close() sono stati per errore fuori, la connessione sarebbe mai chiusa. Avvolgi ognuno in un try/catch indipendente.