2015-06-17 20 views
20

Il mio problema è che non riesco a ottenere lo streaming infinito con Retrofit. Dopo aver ottenuto le credenziali per la richiesta di poll() iniziale - faccio la richiesta di polling() iniziale. Ogni richiesta di poll() risponde in 25 secondi se non ci sono cambiamenti, o prima se ci sono cambiamenti - return changed_data []. Ogni risposta contiene i dati timestamp necessari per la successiva richiesta di polling: dovrei fare una nuova richiesta di poll() dopo ogni risposta poll(). Qui è il mio codice:RxJava + Retrofit polling lungo

getServerApi().getLongPollServer() 
    .flatMap(longPollServer -> getLongPollServerApi(longPollServer.getServer()).poll("a_check", Config.LONG_POLLING_SERVER_TIMEOUT, 2, longPollServer.getKey(), longPollServer.getTs(), "") 
    .take(1) 
    .flatMap(longPollEnvelope -> getLongPollServerApi(longPollServer.getServer()).poll("a_check", Config.LONG_POLLING_SERVER_TIMEOUT, 2, longPollServer.getKey(), longPollEnvelope.getTs(), ""))) 
    .retry() 
    .subscribe(longPollEnvelope1 -> { 
    processUpdates(longPollEnvelope1.getUpdates()); 
}); 

Sono nuovo di RxJava, forse non capisco qualcosa, ma non riesco a ottenere flusso infinito. Ottengo 3 chiamate, quindi onNext e onComplete.

P.S. Forse c'è una soluzione migliore per implementare il polling lungo su Android?

+0

Nel tuo caso, prenderei in considerazione l'implementazione del mio 'Osservabile' con' Observable.create() ' –

risposta

11

Sebbene non sia l'ideale, credo che potresti utilizzare gli effetti collaterali di RX per ottenere un risultato desiderato (operazioni "doOn").

Observable<CredentialsWithTimestamp> credentialsProvider = Observable.just(new CredentialsWithTimestamp("credentials", 1434873025320L)); // replace with your implementation 

Observable<ServerResponse> o = credentialsProvider.flatMap(credentialsWithTimestamp -> { 
    // side effect variable 
    AtomicLong timestamp = new AtomicLong(credentialsWithTimestamp.timestamp); // computational steering (inc. initial value) 
    return Observable.just(credentialsWithTimestamp.credentials) // same credentials are reused for each request - if invalid/onError, the later retry() will be called for new credentials 
      .flatMap(credentials -> api.query("request", credentials, timestamp.get())) // this will use the value from previous doOnNext 
      .doOnNext(serverResponse -> timestamp.set(serverResponse.getTimestamp())) 
      .repeat(); 
}) 
     .retry() 
     .share(); 

private static class CredentialsWithTimestamp { 

    public final String credentials; 
    public final long timestamp; // I assume this is necessary for you from the first request 

    public CredentialsWithTimestamp(String credentials, long timestamp) { 
     this.credentials = credentials; 
     this.timestamp = timestamp; 
    } 
} 

Quando si sottoscrive a "o", l'osservabile interno si ripeterà. Se si verifica un errore, "o" riproverà e richiederà nuovamente il flusso di credenziali.

Nell'esempio, lo sterzo computazionale viene ottenuto aggiornando la variabile timestamp, che è necessaria per la richiesta successiva.

+0

grazie per la tua risposta. Tuttavia, ottengo un timestamp da API e dovrei inviarlo di nuovo con una nuova chiamata sondaggio(). – localhost

+0

Ho aggiornato la risposta, si spera, più vicino alla tua situazione. Si può vedere che quando si ottiene una risposta del server, si sta semplicemente impostando una variabile. "doOnNext" rende esplicito l'effetto collaterale. La mia preoccupazione è che questo non è bello e avremmo bisogno di vedere il tuo codice per dare una risposta migliore. – snodnipper

+0

Ho un problema simile e ho risolto l'uso del codice ma nel mio caso voglio memorizzare anche il valore alla prima volta. Dove posso inserire quel codice? – Krutik