Ho un attore scritto in Java che è responsabile della logica filtro/riprova su una risorsa esterna che potrebbe essere temporaneamente non disponibile. campi dell'attore e metodi comuni sono:Attesa indefinitamente per un messaggio che potrebbe non arrivare mai
public class MyActorImpl implements MyActor {
private static final long MINWAIT = 50;
private static final long MAXWAIT = 1000;
private static final long DEFAULTWAIT = 0;
private static final double BACKOFFMULTIPLIER = 1.5;
private long updateWait(long currentWait) {
return Math.min(Math.max((long) (currentWait * BACKOFFMULTIPLIER), MINWAIT), MAXWAIT);
}
// mutable
private long opWait = DEFAULTWAIT;
private final Queue<OpInput> opBuffer = new ArrayDeque<>();
// called from external actor
public void operation(OpInput opInput) {
operation(opInput, DEFAULTWAIT);
}
// called internally
public void operation(OpInput opInput, long currentWait);
}
L'attore ha diverse operazioni che hanno tutti più o meno la stessa logica di tentativi/tampone; ogni operazione ha i propri campi [op]Wait
e [op]Buffer
.
- L'attore padre chiama
void operation(OpInput opInput)
- Il metodo precedente richiede
void operation(OpInput opInput, long currentWait)
utilizzandoDEFAULTWAIT
per il secondo parametro - Se il parametro
currentWait
non è ugualeopWait
allora l'ingresso viene memorizzato inopBuffer
, altrimenti l'ingresso viene inviato la risorsa esterna. - Se la risorsa esterna restituisce un esito positivo, quindi
opWait
è impostato suDEFAULTWAIT
e il contenuto diopBuffer
viene rinviato tramite il metodooperation(opInput)
. Se la risorsa esterna (o più probabilmente la rete) restituisce un errore, aggiornoopWait = updateWait(opWait)
e programmaoperation(opInput, opWait)
sullo scheduler di sistema degli attori utilizzando un ritardo diopWait
ms.
I.e. Sto usando lo scheduler del sistema degli attori per implementare il backoff esponenziale; Sto usando il parametro currentWait
per identificare il messaggio che sto riprovando, e sto bufferizzando gli altri messaggi finché il messaggio principale non viene elaborato correttamente dalla risorsa esterna.
Il problema è che se il messaggio programmato viene perso, memorizzerò i messaggi per sempre perché la protezione currentWait == opWait
avrà esito negativo per tutti gli altri messaggi. Potrei usare qualcosa come spring-retry per implementare il backoff esponenziale ma non vedo un modo per unire i loop di tentativi delle operazioni, il che significa che potrei usare un thread per ciclo retry (mentre usare lo scheduler del sistema degli attori non mette molto di più di una tensione sul sistema).
Sto cercando un modo più fault tolerant per implementare il buffering e il backoff esponenziale sull'interfaccia tra un attore e una risorsa esterna senza dover allocare troppe risorse all'attività.
Per "chiamato esternamente" intendevo che il metodo era chiamato da un altro attore, mentre i metodi "internamente" sono chiamati solo tramite "self.operation (...)". E grazie per il suggerimento di utilizzare un futuro anziché un messaggio programmato, che semplifica enormemente il codice –