2016-02-18 16 views
5

Come terminare le richieste concatenate in Rx Vert.X?Come terminare le richieste http concatenate in RxJava Vert.x?

HttpClient client = Vertx.vertx().createHttpClient(); 
     HttpClientRequest request = client.request(HttpMethod.POST, 
       "someURL") 
       .putHeader("content-type", "application/x-www-form-urlencoded") 
       .putHeader("content-length", Integer.toString(jsonData.length())).write(jsonData); 
     request.toObservable(). 
       //flatmap HttpClientResponse -> Observable<Buffer> 
         flatMap(httpClientResponse -> { //something 
        return httpClientResponse.toObservable(); 
       }). 
         map(buffer -> {return buffer.toString()}). 
       //flatmap data -> Observable<HttpClientResponse> 
         flatMap(postData -> client.request(HttpMethod.POST, 
         someURL") 
         .putHeader("content-type", "application/x-www-form-urlencoded") 
         .putHeader("content-length", Integer.toString(postData.length())).write(postData).toObservable()). 
       //flatmap HttpClientResponse -> Observable<Buffer> 
         flatMap(httpClientResponse -> { 
        return httpClientResponse.toObservable(); 
       })......//other operators 
request.end(); 

Avviso che ho. end() per la richiesta principale. Come posso terminare la richiesta che si trova all'interno di. flatmap? Ho anche bisogno di finirlo?

risposta

1

Penso che si possa fare qualcosa come il seguente codice.

L'idea principale è che non si utilizzi direttamente lo HttpClientRequest come ottenuto dal client Vertx. Invece si crea un altro flowable che invocherà end() non appena viene ricevuto il primo abbonamento.

Qui, ad esempio, è possibile ottenere la richiesta tramite un paio di metodi personalizzati: in questo caso request1() e request2(). Entrambi usano doOnSubscribe() per attivare lo end() necessario. Read its description on the ReactiveX page.

Questo examle utilizza VertX e reactivex, spero si potrebbe usare questo set up.

import io.reactivex.Flowable; 
import io.vertx.core.http.HttpMethod; 
import io.vertx.reactivex.core.Vertx; 
import io.vertx.reactivex.core.buffer.Buffer; 
import io.vertx.reactivex.core.http.HttpClient; 
import io.vertx.reactivex.core.http.HttpClientRequest; 
import io.vertx.reactivex.core.http.HttpClientResponse; 
import org.junit.Test; 

public class StackOverflow { 

    @Test public void test(){ 

     Buffer jsonData = Buffer.buffer("..."); // the json data. 

     HttpClient client = Vertx.vertx().createHttpClient(); // the vertx client. 

     request1(client) 
      .flatMap(httpClientResponse -> httpClientResponse.toFlowable()) 
      .map(buffer -> buffer.toString()) 
      .flatMap(postData -> request2(client, postData)) 
      .forEach(httpResponse -> { 
       // do something with returned data); 
      }); 

    } 

    private Flowable<HttpClientResponse> request1(HttpClient client) { 
     HttpClientRequest request = client.request(HttpMethod.POST,"someURL"); 
     return request 
       .toFlowable() 
       .doOnSubscribe(subscription -> request.end()); 
    } 

    private Flowable<HttpClientResponse> request2(HttpClient client, String postData) { 
     HttpClientRequest request = client.request(HttpMethod.POST,"someURL"); 
     // do something with postData 
     return request 
       .toFlowable() 
       .doOnSubscribe(subscription -> request.end()); 
    } 

} 
2

Ci sono diversi modi per garantire di chiamare request.end(). Ma vorrei scavare nella documentazione di Vert.x o semplicemente open source code se ce n'è uno, per vedere se chiama end() per te. Altrimenti uno potrebbe essere

final HttpClientRequest request = ... 
request.toObservable() 
     .doOnUnsubscribe(new Action0() { 
      @Override 
      public void call() { 
       request.end(); 
      } 
     });