2015-09-13 15 views
19

Ho un attore scritto in Java che è responsabile della logica filtro/riprova su una risorsa esterna che potrebbe essere temporaneamente non disponibile. campi dell'attore e metodi comuni sono:Attesa indefinitamente per un messaggio che potrebbe non arrivare mai

public class MyActorImpl implements MyActor { 
    private static final long MINWAIT = 50; 
    private static final long MAXWAIT = 1000; 
    private static final long DEFAULTWAIT = 0; 
    private static final double BACKOFFMULTIPLIER = 1.5; 

    private long updateWait(long currentWait) { 
     return Math.min(Math.max((long) (currentWait * BACKOFFMULTIPLIER), MINWAIT), MAXWAIT); 
    } 

    // mutable 
    private long opWait = DEFAULTWAIT; 
    private final Queue<OpInput> opBuffer = new ArrayDeque<>(); 

    // called from external actor 
    public void operation(OpInput opInput) { 
     operation(opInput, DEFAULTWAIT); 
    } 

    // called internally 
    public void operation(OpInput opInput, long currentWait); 
} 

L'attore ha diverse operazioni che hanno tutti più o meno la stessa logica di tentativi/tampone; ogni operazione ha i propri campi [op]Wait e [op]Buffer.

  1. L'attore padre chiama void operation(OpInput opInput)
  2. Il metodo precedente richiede void operation(OpInput opInput, long currentWait) utilizzando DEFAULTWAIT per il secondo parametro
  3. Se il parametro currentWait non è uguale opWait allora l'ingresso viene memorizzato in opBuffer, altrimenti l'ingresso viene inviato la risorsa esterna.
  4. Se la risorsa esterna restituisce un esito positivo, quindi opWait è impostato su DEFAULTWAIT e il contenuto di opBuffer viene rinviato tramite il metodo operation(opInput). Se la risorsa esterna (o più probabilmente la rete) restituisce un errore, aggiorno opWait = updateWait(opWait) e programma operation(opInput, opWait) sullo scheduler di sistema degli attori utilizzando un ritardo di opWait ms.

I.e. Sto usando lo scheduler del sistema degli attori per implementare il backoff esponenziale; Sto usando il parametro currentWait per identificare il messaggio che sto riprovando, e sto bufferizzando gli altri messaggi finché il messaggio principale non viene elaborato correttamente dalla risorsa esterna.

Il problema è che se il messaggio programmato viene perso, memorizzerò i messaggi per sempre perché la protezione currentWait == opWait avrà esito negativo per tutti gli altri messaggi. Potrei usare qualcosa come spring-retry per implementare il backoff esponenziale ma non vedo un modo per unire i loop di tentativi delle operazioni, il che significa che potrei usare un thread per ciclo retry (mentre usare lo scheduler del sistema degli attori non mette molto di più di una tensione sul sistema).

Sto cercando un modo più fault tolerant per implementare il buffering e il backoff esponenziale sull'interfaccia tra un attore e una risorsa esterna senza dover allocare troppe risorse all'attività.

risposta

8

Se ti sto comprensione correttamente, se l'unico problema sta perdendo il messaggio in programma, perché non basta usare qualcosa come il Reliable Proxy Pattern per quel particolare messaggio, quindi se non riesce opWait = DEFAULTWAIT;

Ci sono qualcosa sul tuo codice che ottengo, non capisco cosa intendi quando dici che public void operation(OpInput opInput) è chiamato esternamente. Intendi che questo metodo sta interagendo con la rete, che utilizza risorse che a volte non sono disponibili?

Se posso posso suggerire un'alternativa. Da quello che capisco il tuo problema principale è che hai una risorsa che non è disponibile a volte, quindi hai qualche sorta di que/buffer che implementa con qualche sorta di logica di attesa in modo che il messaggio venga elaborato una volta che è di nuovo disponibile, che purtroppo comporta alcuni messaggi che potrebbero essere persi e provocare un'attesa infinita. Penso che potresti ottenere ciò che vuoi usando Futures con timeout. Quindi riprova se il futuro non è completato in quel certo periodo di tempo fino a dire 3 re-trys. È anche possibile regolare questa volta in base al carico del server e quanto tempo ci vuole per completare un messaggio. Spero possa aiutare.

+0

Per "chiamato esternamente" intendevo che il metodo era chiamato da un altro attore, mentre i metodi "internamente" sono chiamati solo tramite "self.operation (...)". E grazie per il suggerimento di utilizzare un futuro anziché un messaggio programmato, che semplifica enormemente il codice –