2012-03-09 11 views
6

Ho scritto un semplice server UDP con Netty che stampa semplicemente nei log i messaggi (frame) ricevuti. Per fare ciò, ho creato un semplice decodificatore di decodificatore di frame e un semplice gestore di messaggi. Ho anche un client che può inviare più richieste in sequenza e/o in parallelo.Lotto di richieste UDP perse nel server UDP con Netty

Quando configuro il tester del client per inviare ad esempio alcune centinaia di richieste in sequenza con un piccolo ritardo tra di esse, il mio server scritto con Netty li riceve tutti correttamente. Ma al momento aumento il numero di richieste simultanee nel mio client (100 per esempio) insieme a quelle sequenziali e poche ripetizioni, il mio server inizia a perdere molte richieste. Quando invio 50000 richieste, ad esempio, il mio server riceve solo circa 49000 quando si utilizza solo il semplice ChannelHandler che stampa il messaggio ricevuto.

E quando aggiungo il decodificatore di frame semplice (che stampa il frame e lo copia in un altro buffer) davanti a questo gestore, il server gestisce solo metà delle richieste !!

Ho notato che, indipendentemente dal numero di lavoratori che ho specificato per il NioDatagramChannelFactory creato, c'è sempre un solo thread che gestisce le richieste (sto usando l'Executors.newCachedThreadPool() consigliato come altro parametro).

Ho anche creato un altro server UDP semplice simile basato sul DatagramSocket che viene fornito con JDK e gestisce perfettamente ogni richiesta con 0 (zero) perso !! Quando invio 50000 richieste nel mio client (con 1000 thread, ad esempio), ho ricevuto 50000 richieste nel mio server.

Sto facendo qualcosa di sbagliato durante la configurazione del mio server UDP usando Netty? O forse Netty non è semplicemente progettata per supportare tale carico ?? Perché è presente un solo thread utilizzato dal pool di thread memorizzato nella cache (ho notato che solo un thread e sempre lo stesso è utilizzato guardando in jmonsole JMX e controllando il nome del thread nei log di output)? Penso che se più thread fossero usati come previsto, il server sarebbe in grado di gestire facilmente tale carico perché posso farlo senza problemi quando non utilizzo Netty!

Vedi il mio codice di inizializzazione di seguito:

... 

lChannelfactory = new NioDatagramChannelFactory(Executors.newCachedThreadPool(), nbrWorkers); 
lBootstrap = new ConnectionlessBootstrap(lChannelfactory); 

lBootstrap.setPipelineFactory(new ChannelPipelineFactory() { 
    @Override 
    public ChannelPipeline getPipeline() 
    { 
     ChannelPipeline lChannelPipeline = Channels.pipeline(); 
     lChannelPipeline.addLast("Simple UDP Frame Dump DECODER", new SimpleUDPPacketDumpDecoder(null));    
     lChannelPipeline.addLast("Simple UDP Frame Dump HANDLER", new SimpleUDPPacketDumpChannelHandler(lOuterFrameStatsCollector));    
     return lChannelPipeline; 
    } 
}); 

bindChannel = lBootstrap.bind(socketAddress); 

... 

E il contenuto del metodo decode() nel mio decoder:

protected Object decode(ChannelHandlerContext iCtx, Channel iChannel, ChannelBuffer iBuffer) throws Exception 
{ 
    ChannelBuffer lDuplicatedChannelBuffer = null; 
    sLogger.debug("Decode method called."); 

    if (iBuffer.readableBytes() < 8) return null; 
    if (outerFrameStatsCollector != null) outerFrameStatsCollector.incrementNbrRequests(); 

    if (iBuffer.readable()) 
    {   
     sLogger.debug(convertToAsciiHex(iBuffer.array(), iBuffer.readableBytes()));      
     lDuplicatedChannelBuffer = ChannelBuffers.dynamicBuffer(iBuffer.readableBytes());    
     iBuffer.readBytes(lDuplicatedChannelBuffer); 
    } 

    return lDuplicatedChannelBuffer; 
} 

E il contenuto del messageReceived() metodo nel mio gestore:

public void messageReceived(final ChannelHandlerContext iChannelHandlerContext, final MessageEvent iMessageEvent) throws Exception 
{ 
    ChannelBuffer lMessageBuffer = (ChannelBuffer) iMessageEvent.getMessage(); 
    if (outerFrameStatsCollector != null) outerFrameStatsCollector.incrementNbrRequests(); 

    if (lMessageBuffer.readable()) 
    {   
     sLogger.debug(convertToAsciiHex(lMessageBuffer.array(), lMessageBuffer.readableBytes()));    
     lMessageBuffer.discardReadBytes(); 
    } 
} 
+0

Siete consapevoli che UDP non ha garanzie di consegna a destra? –

+0

Sì, so che non ci sono garanzie di consegna di questo tipo, ma i miei test di carico sono eseguiti localmente e non perdo nulla con il mio semplice server che utilizza DatagramSocket al posto di Netty. Sto anche analizzando le richieste con WireShark per verificare che non si perda nulla in un caso (senza netty) e che i pacchetti vengano persi quando si utilizza netty. – The4Summers

+0

@ The4Summers Se riesci a vedere che i pacchetti vanno perduti con Netty, Netty non sta elaborando i pacchetti in entrata abbastanza velocemente, o non lo è, quindi il buffer di ricezione socket si riempie, quindi i pacchetti in entrata vengono scartati, non avendo nessun altro andare. Accelerare la ricezione o rallentare l'invio. – EJP

risposta

7

Non è stata configurata correttamente l'istanza ConnectionlessBootstrap.

  1. È necessario configurare i seguenti valori ottimali.

    dimensioni SO_SNDBUF, dimensioni SO_RCVBUF e ReceiveBufferSizePredictorFactory

    lBootstrap.setOption("sendBufferSize", 1048576); 
    
    lBootstrap.setOption("receiveBufferSize", 1048576); 
    
    lBootstrap.setOption("receiveBufferSizePredictorFactory", 
    new AdaptiveReceiveBufferSizePredictorFactory(MIN_SIZE, INITIAL_SIZE, MAX_SIZE)); 
    

    classe di controllo DefaultNioDatagramChannelConfig per maggiori dettagli.

  2. La pipeline sta facendo tutto utilizzando il thread di lavoro di Netty.Se il thread di lavoro è sovraccarico, ritarderà l'esecuzione del loop evento selettore e ci sarà un collo di bottiglia nella lettura/scrittura del canale . È necessario aggiungere un gestore di esecuzione come segue nella pipeline . Libererà il thread di lavoro per fare il proprio lavoro.

    ChannelPipeline lChannelPipeline = Channels.pipeline(); 
    
    lChannelPipeline.addFirst("execution-handler", new ExecutionHandler(
        new OrderedMemoryAwareThreadPoolExecutor(16, 1048576, 1048576)); 
    
    //add rest of the handlers here 
    
+0

Grazie per il suggerimento sulle opzioni della dimensione del buffer! Tuttavia, per quanto riguarda il gestore di esecuzione, l'ho provato dopo questo post e, sì, aiuta un po ', ma stavo ancora perdendo metà delle richieste quando viene utilizzato un decoder e un gestore. Sono sicuro che aumentare il buffer aiuterà molto di più. – The4Summers

+0

Tuttavia, non riesco ancora a capire perché è necessario specificare un executor del pool di thread e un numero di worker durante l'istanziazione della classe NioDatagramChannelFactory se non vengono mai utilizzati !! ?? – The4Summers

+0

Sono contento che abbia funzionato per te. Gli execution del pool di thread vengono utilizzati per creare thread di lavoro nelle pipeline di NioDatagramChannel. Un thread di lavoro può essere assegnato a un DatagramChannel per non bloccare la lettura/scrittura. Se l'applicazione è in ascolto su più di una porta, molti thread di lavoro (il valore predefinito è cpu size * 2) verranno creati e assegnati a DatagramChannel in modalità round robin. Se si desidera avere più thread di lavoro, è possibile specificare nel costruttore NioDatagramChannelFactory. –