Sono nuovo di RxJS e mi chiedevo se qualcuno potesse aiutarmi.Stream sincrono di risposte da un flusso di richieste con RxJS
Desidero creare un flusso sincrono di risposte (preferibilmente con le richieste corrispondenti) da un flusso di richieste (dati di carico utile).
Fondamentalmente voglio che le richieste vengano inviate una alla volta, ognuna in attesa della risposta dall'ultima.
Ho provato questo, ma manda tutto in una volta (jsbin):
var requestStream, responseStream;
requestStream = Rx.Observable.from(['a','b','c','d','e']);
responseStream = requestStream.flatMap(
sendRequest,
(val, response)=>{ return {val, response}; }
);
responseStream.subscribe(
item=>{
console.log(item);
},
err => {
console.err(err);
},
()=>{
console.log('Done');
}
);
function sendRequest(val) {
return new Promise((resolve,reject)=>{
setTimeout(()=>{resolve('result for '+val);},1000);
});
};
i seguenti lavori, in una certa misura, ma non usa flusso per i dati della richiesta (jsbin).
var data, responseStream;
data = ['a','b','c','d','e'];
responseStream = Rx.Observable.create(observer=>{
var sendNext = function(){
var val = data.shift();
if (!val) {
observer.onCompleted();
return;
}
sendRequest(val).then(response=>{
observer.onNext({val, response});
sendNext();
});
};
sendNext();
});
responseStream.subscribe(
item=>{
console.log(item);
},
err => {
console.err(err);
},
()=>{
console.log('Done');
}
);
function sendRequest(val) {
return new Promise((resolve,reject)=>{
setTimeout(()=>{resolve('response for '+val);},Math.random() * 2500 + 500);
});
};
Grazie!
EDIT:
Giusto per chiarire, questo è quello che volevo ottenere:
"Invia A, quando si riceve la risposta per A, inviare B, quando si riceve la risposta per la B, inviare C, ecc ..."
Utilizzando concatMap e rinviare, come suggerito da user3743222, sembra farlo (jsbin):
responseStream = requestStream.concatMap(
(val)=>{
return Rx.Observable.defer(()=>{
return sendRequest(val);
});
},
(val, response)=>{ return {val, response}; }
);
Grazie per la risposta. Ho provato la tua soluzione, ma le richieste sono ancora tutte inviate immediatamente. La documentazione suggerisce che flatMap può causare interleaving mentre concatMap no. Sembra che la differenza sia nell'ordinare. Ha senso usare concatMap ma non produce ancora il comportamento desiderato: Invia A, quando ricevi risposta per A, invia B, quando ricevi risposta per B, invia C, ecc. – jamesref
Forse ho frainteso ciò che volevi. Puoi provare in questo caso "differire"? Aggiornerò il codice – user3743222
Grazie! Sembra che funzioni. – jamesref