2015-01-25 12 views
7

Sto tentando di utilizzare un flusso di eventi fornito dall'API Kubernetes utilizzando il modulo requests. Ho riscontrato un problema di buffering : il modulo requests sembra ritardato di un evento.Lettura della risposta HTTP in streaming con la libreria "richieste" Python

Ho codice che sembra qualcosa di simile:

r = requests.get('http://localhost:8080/api/v1beta1/watch/services', 
       stream=True) 

for line in r.iter_lines(): 
    print 'LINE:', line 

Come kubernetes emette le notifiche degli eventi, questo codice verrà visualizzato solo l'ultimo evento emessa quando un nuovo evento entra, che lo rende quasi del tutto inutile per il codice che deve rispondere al servizio aggiungere/eliminare eventi.

Ho risolto questo deposizione delle uova curl in un sottoprocesso invece di utilizzare la libreria requests:

p = subprocess.Popen(['curl', '-sfN', 
         'http://localhost:8080/api/watch/services'], 
        stdout=subprocess.PIPE, 
        bufsize=1) 

for line in iter(p.stdout.readline, b''): 
    print 'LINE:', line 

Questo funziona, ma a scapito di una certa flessibilità. C'è un modo per evitare questo problema di buffering con la libreria requests?

risposta

5

Questo comportamento è dovuto a un'implementazione bacata del metodo iter_lines nella libreria requests.

iter_lines itera il contenuto della risposta in chunk_size blocchi dei dati utilizzando il iter_content iteratore. Se ci sono meno di chunk_size byte di dati disponibili per la lettura dalla server remoto (che sarà tipicamente il caso quando si legge l'ultima riga uscita), l'operazione di lettura si bloccherà fino chunk_size byte di dati sono disponibili.

ho scritto il mio iter_lines routine che funziona correttamente:

import os 


def iter_lines(fd, chunk_size=1024): 
    '''Iterates over the content of a file-like object line-by-line.''' 

    pending = None 

    while True: 
     chunk = os.read(fd.fileno(), chunk_size) 
     if not chunk: 
      break 

     if pending is not None: 
      chunk = pending + chunk 
      pending = None 

     lines = chunk.splitlines() 

     if lines and lines[-1]: 
      pending = lines.pop() 

     for line in lines: 
      yield line 

    if pending: 
     yield(pending) 

Questo funziona perché os.read restituirà meno di chunk_size byte dei dati piuttosto che in attesa di un buffer da riempire.

+0

Si può sostenere che l'implementazione è corretta: la tua inserirà una "interruzione logica" falsa se saranno disponibili più dati. L'approccio corretto sembra essere quello di scoprire la dimensione totale dei dati (specificando uno è un requisito per le comunicazioni TCP) e impiegare solo letture parziali all'estremità nota. –

+0

Non penso che si possa sostenere che l'implementazione esistente sia corretta. Il mio non ha subito prove rigorose, ma sicuramente funziona meglio. Un'implementazione più corretta, idealmente presentata come patch upstream, sarebbe super utile. – larsks

+0

@ivan_pozdeev * "L'approccio corretto sembra essere quello di scoprire la dimensione totale dei dati (specificando uno è un requisito per le comunicazioni TCP)" * - No, TCP è un * stream * e potrebbe avere una lunghezza infinita. Non sono sicuro di dove lo hai sentito, ma fondamentalmente non è vero. –