2013-07-15 8 views
6

Sto scrivendo un server in netty, in cui ho bisogno di effettuare una chiamata a memcached. Sto usando spymemcached e posso facilmente fare la chiamata memcached sincrona. Vorrei che questa chiamata memcached fosse asincrona. È possibile? Gli esempi forniti con netty non sembrano essere utili.Come accedere a memcached asincronicamente in netty

Ho provato a utilizzare i callback: creato un pool ExecutorService nel gestore e inviato un worker di callback a questo pool. Come questo:

public class MyHandler estende ChannelInboundMessageHandlerAdapter <MyPOJO> implementa CallbackInterface {

... 
    private static ExecutorService pool = Executors.newFixedThreadPool(20); 


    @Override 
    public void messageReceived(ChannelHandlerContext ctx, MyPOJO pojo) { 
     ... 
     CallingbackWorker worker = new CallingbackWorker(key, this); 
     pool.submit(worker); 
     ... 
    } 
    public void myCallback() { 
     //get response 
     this.ctx.nextOutboundMessageBuf().add(response); 
    } 

}

CallingbackWorker assomiglia:

public class CallingbackWorker i mplements Callable {

public CallingbackWorker(String key, CallbackInterface c) { 
     this.c = c; 
     this.key = key; 
    } 
    public Object call() { 
    //get value from key 
    c.myCallback(value); 
    } 

Tuttavia, quando faccio questo, this.ctx.nextOutboundMessageBuf() in myCallback si blocca.

Quindi, nel complesso, la mia domanda è: come fare chiamate memcached asincrone in Netty?

+0

Credo che manchi qualcosa di fondamentale su Netty/NIO qui. Effettuare una chiamata di rete in modo asincrono dovrebbe essere semplice in un framework NIO. Qualsiasi suggerimento sarebbe d'aiuto. –

risposta

3

Ci sono due problemi qui: un problema di scarsa importanza con il modo in cui stai provando a codificarlo, e uno più grande con molte librerie che forniscono chiamate di servizio asincrone, ma non è un buon modo per trarne il massimo vantaggio in una struttura asincrona come Netty. Ciò costringe gli utenti a hack non ottimali come questo, o un approccio meno cattivo, ma non ancora ideale, che arriverò tra un momento.

Prima il problema di codifica. Il problema è che stai cercando di chiamare un metodo ChannelHandlerContext da un thread diverso da quello associato al tuo gestore, il che non è permesso. È abbastanza facile da risolvere, come mostrato di seguito. È possibile codificare un paio di altri modi, ma questo è probabilmente il più semplice:

private static ExecutorService pool = Executors.newFixedThreadPool(20); 

public void channelRead(final ChannelHandlerContext ctx, final Object msg) { 
    //... 

    final GetFuture<String> future = memcachedClient().getAsync("foo", stringTranscoder()); 

    // first wait for the response on a pool thread 
    pool.execute(new Runnable() { 
     public void run() { 
      String value; 
      Exception err; 
      try { 
       value = future.get(3, TimeUnit.SECONDS); // or whatever timeout you want 
       err = null; 
      } catch (Exception e) { 
       err = e; 
       value = null; 
      } 
      // put results into final variables; compiler won't let us do it directly above 
      final fValue = value; 
      final fErr = err; 

      // now process the result on the ChannelHandler's thread 
      ctx.executor().execute(new Runnable() { 
       public void run() { 
        handleResult(fValue, fErr); 
       } 
      }); 
     } 
    }); 
// note that we drop through to here right after calling pool.execute() and 
// return, freeing up the handler thread while we wait on the pool thread. 
} 

private void handleResult(String value, Exception err) { 
    // handle it 
} 

che funziona, e potrebbe essere sufficiente per l'applicazione. Ma hai un pool di thread di dimensioni fisse, quindi se hai intenzione di gestire più di 20 connessioni simultanee, questo diventerà un collo di bottiglia. Potresti aumentare le dimensioni del pool o utilizzarne uno illimitato, ma a quel punto potresti anche essere eseguito con Tomcat, poiché il consumo di memoria e il sovraccarico di commutazione del contesto iniziano a diventare problemi e perdi la scalabilità che era l'attrazione di Netty in primo luogo!

E il fatto è che Spymemcached è basato su NIO, basato su eventi e utilizza solo un thread per tutto il suo lavoro, ma non offre alcun modo per sfruttare appieno la sua natura basata sugli eventi. Mi aspetto che lo sistemeranno prima troppo a lungo, proprio come Netty 4 e Cassandra hanno recentemente fornito metodi callback (listener) sugli oggetti del futuro. Nel frattempo, essendo nella tua stessa barca, ho ricercato le alternative, e non essendo troppo felice con quello che ho trovato, ho scritto (ieri) a Future tracker class che può interrogare fino a migliaia di Futures a una velocità configurabile, e chiamare si torna sul thread (Executor) di tua scelta al completamento. Usa solo un thread per farlo. Ho put it up on GitHub se vuoi provarlo, ma ti avverto che è ancora bagnato, come si suol dire.L'ho testato parecchio nel giorno passato, e anche con 10000 oggetti Futuro simulati simultaneamente, eseguendo il polling una volta al millisecondo, il suo utilizzo della CPU è trascurabile, sebbene inizi a salire oltre il 10000. Usandolo, l'esempio sopra appare come questo :

// in some globally-accessible class: 

public static final ForeignFutureTracker FFT = new ForeignFutureTracker(1, TimeUnit.MILLISECONDS); 

// in a handler class: 

public void channelRead(final ChannelHandlerContext ctx, final Object msg) { 
// ... 

    final GetFuture<String> future = memcachedClient().getAsync("foo", stringTranscoder()); 

    // add a listener for the Future, with a timeout in 2 seconds, and pass 
    // the Executor for the current context so the callback will run 
    // on the same thread. 
    Global.FFT.addListener(future, 2, TimeUnit.SECONDS, ctx.executor(), 
    new ForeignFutureListener<String,GetFuture<String>>() { 

     public void operationSuccess(String value) { 
     // do something ... 
     ctx.fireChannelRead(someval); 
     } 
     public void operationTimeout(GetFuture<String> f) { 
     // do something ... 
     } 
     public void operationFailure(Exception e) { 
     // do something ... 
     } 
    }); 
} 

non si vuole più di uno o due FFT istanze attive in qualsiasi momento, oppure potrebbe diventare un salasso per CPU. Ma una singola istanza può gestire migliaia di Futures in circolazione; circa l'unica ragione per avere una seconda sarebbe quella di gestire le chiamate a latenza più alta, come S3, a un ritmo di polling più lento, diciamo 10-20 millisecondi.

Uno svantaggio dell'approccio di polling è che aggiunge una piccola quantità di latenza. Ad esempio, eseguendo il polling una volta al millisecondo, in media verranno aggiunti 500 microsecondi al tempo di risposta. Questo non sarà un problema per la maggior parte delle applicazioni, e penso che sia più che compensato dal risparmio di memoria e CPU rispetto all'approccio del pool di thread.

Mi aspetto che entro un anno questo non sarà un problema, poiché più client asincroni forniscono meccanismi di callback, consentendo di sfruttare appieno NIO e il modello basato sugli eventi.

+0

il tipo di Graal che stavo cercando per interrogare dal blocco della chiamata, sì sulla stessa barca, bella pecora che ci hai spedito THX molto –