2013-03-25 13 views
6

Ho un servizio JSON-RPC che per una delle richieste restituisce un flusso continuo di oggetti JSON.Flusso di pacchetto continuo HTTP con Indy

I.e. :

{id:'1'} 
{id:'2'} 
//30 minutes of no data 
{id:'3'} 
//... 

Naturalmente, non c'è Lunghezza del contenuto perché lo streaming è infinito.

Sto utilizzando un discendente TStream personalizzato per ricevere e analizzare i dati. Ma internamente TIdHttp memorizza i dati e non me li trasmette fino a quando non vengono ricevuti i byte RecvBufferSize.

Questo si traduce in:

{id:'1'} //received 
{id:'2'} //buffered by Indy but not received 
//30 minutes of no data 
{id:'3'} //this is where Indy commits {id:'2'} to me 

Ovviamente questo non lo farà perché il messaggio che contava 30 minuti fa avrebbe dovuto essere consegnato 30 minuti fa.

Mi piacerebbe che Indy faccia solo quello che fanno i socket: leggi fino a RecvBufferSize o meno se ci sono dati disponibili e torna immediatamente.

Ho trovato this discussion dal 2005 dove qualche povera anima ha cercato di spiegare il problema agli sviluppatori di Indy ma non lo hanno capito. (Leggilo, è uno spettacolo triste)

In ogni caso, ha lavorato attorno a questo scrivendo discendente personalizzato di IOHandler, ma questo era nel 2005, forse ci sono alcune soluzioni pronte oggi?

risposta

2

Durante l'utilizzo del flusso TCP era un'opzione, alla fine sono andato con la soluzione originale di scrittura discendente personalizzata TIdIOHandlerStack.

La motivazione era che con TIdHTTP so che cosa non funziona e devo solo correggerlo, mentre passando a TCP di livello inferiore si possono verificare nuovi problemi.

Here's the code that I'm using, e ho intenzione di discutere i punti chiave qui.

Nuovo TIdStreamIoHandler deve ereditare da TIdIOHandlerStack.

due funzioni devono essere riscritta: ReadBytes e ReadStream:

function TryReadBytes(var VBuffer: TIdBytes; AByteCount: Integer; 
    AAppend: Boolean = True): integer; virtual; 
procedure ReadStream(AStream: TStream; AByteCount: TIdStreamSize = -1; 
    AReadUntilDisconnect: Boolean = False); override; 

Entrambi sono modificate funzioni Indy che possono essere trovati in IdIOHandler.TIdIOHandler. In ReadBytes la clausola while deve essere sostituita con una richiesta singola ReadFromSource(), in modo che TryReadBytes ritorni dopo aver letto fino a AByteCount byte in una volta.

Sulla base di questo, ReadStream deve gestire tutte le combinazioni di AByteCount (> 0, < 0) e ReadUntilDisconnect (true, false) per leggere ciclicamente e quindi scrivere per trasmettere blocchi di dati che arrivano dalla presa.

Nota che ReadStream non deve terminare prematuramente anche in questa versione di flusso se solo una parte dei dati richiesti è disponibile nel socket. Deve solo scrivere quella parte sullo stream immediatamente invece di memorizzarla nella cache in FInputBuffer, quindi bloccare e attendere la successiva parte di dati.

+0

come Indy è open source, le fonti modificate possono (e, se utili per gli altri, dovrebbero) essere rese pubbliche – mjn

+0

@mjn: Non lo sapevo, grazie. Aggiunto il codice. – himself

2

Non è necessario scrivere un discendente IOHandler, è già possibile con la classe TIdTCPClient. Espone un oggetto TIdIOHandler, che ha metodi per leggere dal socket. Questi metodi ReadXXX bloccano fino a quando i dati richiesti non sono stati letti o si verifica un timeout. Finché esiste la connessione, ReadXXX può essere eseguito in un ciclo e ogni volta che riceve un nuovo oggetto JSON, passarlo alla logica dell'applicazione.

L'esempio mostra che tutti gli oggetti JSON hanno una sola linea. Gli oggetti JSON possono tuttavia essere multi-linea, in questo caso il codice cliente deve sapere come sono separati.


Update: in una domanda StackOverflow simili (per Net) per un 'streaming servizio Web HTTP JSON, la soluzione più upvoted utilizzato un client TCP di livello inferiore, invece di un client HTTP: Reading data from an open HTTP stream

4

Mi sembra un compito WebSocket, dal momento che la tua connessione non è più orientata verso domande/risposte HTTP, ma un flusso di contenuti.

Vedere WebSocket server implementations for Delphi per alcuni codici.

C'è at least one based on Indy, dall'autore di AsmProfiler.

AFAIK ci sono due tipi di streaming in websocket: binario e testo. Sospetto che il tuo stream JSON sia un contenuto di testo, dal punto di vista di Websocket.

Un'altra opzione è quella di utilizzare long-pooling o alcuni protocolli più vecchi, che sono più compatibili con il rooter - quando la connessione passa alla modalità websocket, non è più HTTP standard, quindi alcuni "sensibili" strumenti di ispezione dei pacchetti (su rete aziendale) potrebbe identificarlo come un attacco di sicurezza (ad esempio, DoS), quindi potrebbe interrompere la connessione.

+0

Se ho capito bene, entrambe le soluzioni richiedono la riscrittura del servizio? Perché non ho accesso ad esso. – himself

+0

@himself Se la tua richiesta è di avere la connessione aperta e non usare intestazioni Content-Length, questo non è più HTTP, quindi suppongo che dovrai cambiare il lato del servizio! –

+0

Mhm, indovina cosa dirà il servizio? "Da nessuna parte nello standard HTTP si dice che il middleware HTTP può bufferizzare i dati per lunghi periodi di tempo, quindi il nostro servizio va bene, suppongo che dovrai correggere il tuo codice client HTTP". Torna al punto di partenza. – himself

0

In realtà c'è un dato di lunghezza proprio prima del contenuto del pacchetto che è stato trasferito in modalità di trasferimento codifica chunked. Usando questi dati di lunghezza, IOhandler di idhttp legge un pacchetto di un pacchetto per lo streaming. L'unità minima significativa è un pacchetto, quindi non ci dovrebbe essere bisogno di leggere i caratteri uno per uno da un pacchetto e quindi non c'è bisogno di cambiare le funzioni di IOHandler. L'unico problema è che idhttp non fermerebbe un turno i dati del flusso al passo successivo a causa dell'infinità dei dati del flusso: non c'è un pacchetto finale. Quindi la soluzione sta usando idhttp onwork evento per innescare una lettura dal flusso e l'impostazione della posizione di flusso a zero al fine di evitare di overflow .like questo:

//add a event handler to idhttp  
    IdHTTP.OnWork := IdHTTPWork; 


    procedure TRatesStreamWorker.IdHTTPWork(ASender: TObject; AWorkMode: TWorkMode; AWorkCount: Int64); 
    begin 
     ..... 
     ResponseStringStream.Position :=0; 
     s:=ResponseStringStream.ReadString(ResponseStringStream.Size) ;//this is the packet conten 
     ResponseStringStream.Clear; 
     ... 
    end; 

procedure TForm1.ButtonGetStreamPricesClick(Sender: TObject); 
var 
begin 
    .....  
    source := RatesWorker.RatesURL+'EUR_USD'; 
    RatesWorker.IdHTTP.Get(source,RatesWorker.ResponseStringStream); 
end; 

Eppure utilizzare una scrittura personalizzato() funzione del TStream può essere un soluzione migliore per questo tipo di esigenza.