2010-06-19 1 views
14

Ho bisogno di un modo per leggere tutti i caratteri attualmente disponibili nello stream creato da Popen o per scoprire quanti caratteri sono rimasti nel buffer.Come posso leggere tutti i dati disponibili dal sottoprocesso.Popen.stdout (non bloccante)?

Backround: Voglio controllare a distanza un'applicazione interattiva in Python. Finora ho usato Popen per creare un nuovo sottoprocesso:

process=subprocess.Popen(["python"],shell=True,stdin=subprocess.PIPE,stdout=subprocess.PIPE,stderr=subprocess.PIPE, cwd=workingDir) 

(python non sto davvero iniziando, ma l'interfaccia attuale interattivo è simile.) Al momento ho letto 1 byte fino a rilevare che il processo di ha raggiunto il richiamo di ordine:

output = "" 
while output[-6:]!="SCIP> ": 
    output += process.stdout.read(1) 
    sys.stdout.write(output[-1]) 
return output 

Poi avviare un calcolo in via process.stdin.write("command\n"). Il mio problema è che non posso controllare se il calcolo è finito o meno, perché non posso controllare, se gli ultimi caratteri nel flusso sono il prompt o no. read() o read(n) blocca il mio thread fino a quando non raggiunge EOF, cosa che non accadrà mai, perché il programma interattivo non si esaurirà finché non verrà comunicato. Cercare il prompt nel modo in cui il ciclo sopra non funzionerà, perché il prompt si verificherà solo dopo il calcolo.

La soluzione ideale mi consentirebbe di leggere tutti i caratteri disponibili dallo stream e restituire immediatamente una stringa vuota, se non c'è nulla da leggere.

+0

Potrebbe pexpect http://www.noah.org/wiki/Pexpect fare ciò che ti serve? – Mark

+1

Ho già esaminato quello, e sì, potrebbe. Ma se possibile vorrei una soluzione che funzionasse senza moduli esterni. – Perseids

+1

Non sono sicuro che sia possibile fornire: http://www.python.org/dev/peps/pep-3145/ –

risposta

3

Rovistando ho trovato questo davvero bella soluzione

Persistent python subprocess

che evita il problema di blocco tutti insieme utilizzando fcntl per impostare attributi di file sulle tubazioni di processo parziali in modalità non-blocking, nessun thread ausiliari o polling necessario. Potrei mancare qualcosa ma ha risolto il mio problema di controllo di processo interattivo.

+5

fcntl è solo sistemi Unix. No worky su Windows == non una soluzione "Python". :) – ddotsenko

+0

Ho poca esperienza con la programmazione di Windows ma questo [collegamento] (http://stackoverflow.com/questions/5309262/is-there-way-to-make-file-descriptor-non-blocking-in-windows) sembra suggerire che esiste anche un equivalente di I/O non bloccante in Windows. –

+0

E questo [collegamento] (http://eduunix.ccut.edu.cn/index2/html/python/OReilly%20-%20Python%20Programming%20on%20Win32/pythonwin32_snode133.html) mostra come utilizzare il modulo Python win32file per creare pipe in modalità sovrapposta (Windows slang per non bloccanti). –

0

Non penso che readline() bloccherà il processo.

line = process.stdout.readline() 

All'inizio ho cercato di usare

for line in process.stdout: 
    print(line) 

ma che sembra appendere fino a quando il processo termina.

+0

Sfortunatamente readline() blocca il processo. Inoltre non leggerebbe il prompt, perché non c'è una nuova riga alla fine. – Perseids

1

Non è corretto che read() blocchi fino a EOF - blocca fino a quando non ottiene abbastanza dati di cui ha bisogno - e dall'altra parte è possibile che alcuni dati siano conservati nei buffer (non viene svuotato solo perché è terminato stampare con una nuova riga).

perché non provare a stampare qualcosa come "### OVER ###\n" bambino e poi stdout.flush(), poi sul lato genitore raccogliere fino a vedere il su Token, dice con ''.join(i for i in iter(process.stdout.readline, '### OVER ###\n'))

+0

Questo non sarebbe d'aiuto, perché voglio fare il contrario: non aspettare che sia pronto, ma scopri che non lo è. Leggendo il prompt una volta che è lì funziona abbastanza affidabile. (Nota a margine: il programma che carico nel sottoprocesso è un binario precompilato e il motivo per cui utilizzo l'interfaccia interattiva è soprattutto che voglio risparmiarmi la fatica di usare l'interfaccia C++, quindi non posso modificare l'output.) – Perseids

11

analisi incrementale di stdout di Popen non è un problema davvero. Basta inserire un tubo in un filo e farlo scorrere attraverso l'output, cercando i delimitatori. In base alle proprie preferenze, può essere collegato a un altro pipe/file o inserire i "pezzi" analizzati sullo "stack" in modalità asincrona. Ecco un esempio di asincrono "chunking" di stdout basata su delimitatore personalizzato:

import cStringIO 
import uuid 
import threading 
import os 

class InputStreamChunker(threading.Thread): 
    ''' 
    Threaded object/code that mediates reading output from a stream, 
    detects "separation markers" in the stream and spits out chunks 
    of original stream, split when ends of chunk are encountered. 

    Results are made available as a list of filled file-like objects 
    (your choice). Results are accessible either "asynchronously" 
    (you can poll at will for results in a non-blocking way) or 
    "synchronously" by exposing a "subscribe and wait" system based 
    on threading.Event flags. 

    Usage: 
    - instantiate this object 
    - give our input pipe as "stdout" to other subprocess and start it: 
     Popen(..., stdout = th.input, ...) 
    - (optional) subscribe to data_available event 
    - pull resulting file-like objects off .data 
     (if you are "messing" with .data from outside of the thread, 
     be curteous and wrap the thread-unsafe manipulations between: 
     obj.data_unoccupied.clear() 
     ... mess with .data 
     obj.data_unoccupied.set() 
     The thread will not touch obj.data for the duration and will 
     block reading.) 

    License: Public domain 
    Absolutely no warranty provided 
    ''' 
    def __init__(self, delimiter = None, outputObjConstructor = None): 
     ''' 
     delimiter - the string that will be considered a delimiter for the stream 
     outputObjConstructor - instanses of these will be attached to self.data array 
     (intantiator_pointer, args, kw) 
     ''' 
     super(InputStreamChunker,self).__init__() 

     self._data_available = threading.Event() 
     self._data_available.clear() # parent will .wait() on this for results. 
     self._data = [] 
     self._data_unoccupied = threading.Event() 
     self._data_unoccupied.set() # parent will set this to true when self.results is being changed from outside 
     self._r, self._w = os.pipe() # takes all inputs. self.input = public pipe in. 
     self._stop = False 
     if not delimiter: delimiter = str(uuid.uuid1()) 
     self._stream_delimiter = [l for l in delimiter] 
     self._stream_roll_back_len = (len(delimiter)-1) * -1 
     if not outputObjConstructor: 
      self._obj = (cStringIO.StringIO,(), {}) 
     else: 
      self._obj = outputObjConstructor 
    @property 
    def data_available(self): 
     '''returns a threading.Event instance pointer that is 
     True (and non-blocking to .wait()) when we attached a 
     new IO obj to the .data array. 
     Code consuming the array may decide to set it back to False 
     if it's done with all chunks and wants to be blocked on .wait()''' 
     return self._data_available 
    @property 
    def data_unoccupied(self): 
     '''returns a threading.Event instance pointer that is normally 
     True (and non-blocking to .wait()) Set it to False with .clear() 
     before you start non-thread-safe manipulations (changing) .data 
     array. Set it back to True with .set() when you are done''' 
     return self._data_unoccupied 
    @property 
    def data(self): 
     '''returns a list of input chunkes (file-like objects) captured 
     so far. This is a "stack" of sorts. Code consuming the chunks 
     would be responsible for disposing of the file-like objects. 
     By default, the file-like objects are instances of cStringIO''' 
     return self._data 
    @property 
    def input(self): 
     '''This is a file descriptor (not a file-like). 
     It's the input end of our pipe which you give to other process 
     to be used as stdout pipe for that process''' 
     return self._w 
    def flush(self): 
     '''Normally a read on a pipe is blocking. 
     To get things moving (make the subprocess yield the buffer, 
     we inject our chunk delimiter into self.input 

     This is useful when primary subprocess does not write anything 
     to our in pipe, but we need to make internal pipe reader let go 
     of the pipe and move on with things. 
     ''' 
     os.write(self._w, ''.join(self._stream_delimiter)) 
    def stop(self): 
     self._stop = True 
     self.flush() # reader has its teeth on the pipe. This makes it let go for for a sec. 
     os.close(self._w) 
     self._data_available.set() 
    def __del__(self): 
     try: 
      self.stop() 
     except: 
      pass 
     try: 
      del self._w 
      del self._r 
      del self._data 
     except: 
      pass 
    def run(self): 
     ''' Plan: 
     - We read into a fresh instance of IO obj until marker encountered. 
     - When marker is detected, we attach that IO obj to "results" array 
      and signal the calling code (through threading.Event flag) that 
      results are available 
     - repeat until .stop() was called on the thread. 
     ''' 
     marker = ['' for l in self._stream_delimiter] # '' is there on purpose 
     tf = self._obj[0](*self._obj[1], **self._obj[2]) 
     while not self._stop: 
      l = os.read(self._r, 1) 
      print('Thread talking: Ordinal of char is:%s' %ord(l)) 
      trash_str = marker.pop(0) 
      marker.append(l) 
      if marker != self._stream_delimiter: 
       tf.write(l) 
      else: 
       # chopping off the marker first 
       tf.seek(self._stream_roll_back_len, 2) 
       tf.truncate() 
       tf.seek(0) 
       self._data_unoccupied.wait(5) # seriously, how much time is needed to get your items off the stack? 
       self._data.append(tf) 
       self._data_available.set() 
       tf = self._obj[0](*self._obj[1], **self._obj[2]) 
     os.close(self._r) 
     tf.close() 
     del tf 

def waitforresults(ch, answers, expect): 
    while len(answers) < expect: 
     ch.data_available.wait(0.5); ch.data_unoccupied.clear() 
     while ch.data: 
      answers.append(ch.data.pop(0)) 
     ch.data_available.clear(); ch.data_unoccupied.set() 
     print('Main talking: %s answers received, expecting %s\n' % (len(answers), expect)) 

def test(): 
    ''' 
    - set up chunker 
    - set up Popen with chunker's output stream 
    - push some data into proc.stdin 
    - get results 
    - cleanup 
    ''' 

    import subprocess 

    ch = InputStreamChunker('\n') 
    ch.daemon = True 
    ch.start() 

    print('starting the subprocess\n') 
    p = subprocess.Popen(
     ['cat'], 
     stdin = subprocess.PIPE, 
     stdout = ch.input, 
     stderr = subprocess.PIPE) 

    answers = [] 

    i = p.stdin 
    i.write('line1 qwer\n') # will be in results 
    i.write('line2 qwer\n') # will be in results 
    i.write('line3 zxcv asdf') # will be in results only after a ch.flush(), 
           # prepended to other line or when the pipe is closed 
    waitforresults(ch, answers, expect = 2) 

    i.write('line4 tyui\n') # will be in results 
    i.write('line5 hjkl\n') # will be in results 
    i.write('line6 mnbv') # will be in results only after a ch.flush(), 
           # prepended to other line or when the pipe is closed 
    waitforresults(ch, answers, expect = 4) 

    ## now we will flush the rest of input (that last line did not have a delimiter) 
    i.close() 
    ch.flush() 
    waitforresults(ch, answers, expect = 5) 

    should_be = ['line1 qwer', 'line2 qwer', 
     'line3 zxcv asdfline4 tyui', 'line5 hjkl', 'line6 mnbv'] 
    assert should_be == [i.read() for i in answers] 

    # don't forget to stop the chunker. It it closes the pipes 
    p.terminate() 
    ch.stop() 
    del p, ch 

if __name__ == '__main__': 
    test() 

Edit: rimosso il verbosità erronea di "scrivendo a stdin di proc è un one-time-cosa"

+0

Credo che mi piaccia la tua versione migliore di quella che ho effettivamente implementato (vedi la risposta di Brian). Anche se leggi anche solo un personaggio alla volta, l'approccio multithread è più pulito. Tuttavia non sono d'accordo con la tua conclusione sui flussi di input. Almeno se lo svuotate e date al processo il tempo necessario per leggere i dati, il risultato vi tornerà sopra il flusso di output. Vedi http://codepad.org/Yu7SoORS righe 172-188 come esempio (ho sostituito grep con cat). Altrimenti il ​​programma di cui ho bisogno fallirebbe miseramente. – Perseids

+0

@Persidi Molto interessante. Sono corretto sulle mie supposizioni affrettate riguardo al sottoprocesso. Lo STDIN di Popen. Regoleremo l'esempio di codice – ddotsenko

2

C'è un'altra possibile soluzione, ma potrebbe essere necessario riordinare un po 'il programma.

Se si dispone di più origini di I/O (descrittori di file, socket, ecc.) E si desidera attendere tutte in una volta, utilizzare il modulo Python select. È possibile (ad esempio) inserire l'input standard (per la lettura dal terminale) e la pipe (dal sottoprocesso) in un elenco e attendere che l'input sia pronto su uno di essi.select blocchi finché I/O non è disponibile su uno qualsiasi dei descrittori nell'elenco. Quindi si scansiona l'elenco, cercando quelli che hanno dati disponibili.

Questo approccio risulta essere abbastanza efficiente, molto più del polling di un descrittore di file per vedere se ci sono dati. Ha anche la virtù della semplicità; cioè, puoi realizzare ciò che vuoi con un minimo di codice. Il codice più semplice significa meno opportunità per i bug.

+0

Ho capito che funziona usando il polling dal modulo select. Grazie. My readAllAvailableData() ora è simile a questo: http://codepad.org/ArYdEc3s. L'implementazione non è affatto efficiente, ma è abbastanza veloce per il mio scopo. Immagino che la soluzione più elegante avrebbe usato Pexpect come suggerito da Mark (se è possibile utilizzare moduli esterni). – Perseids

1

Ho provato un sacco di approcci come fare un non-blocking stdout dal seguente:

fd = output.fileno() 
fl = fcntl.fcntl(fd, fcntl.F_GETFL) 
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK) 

Ma l'unica soluzione di lavoro viene descritto here:

master, slave = pty.openpty() 

proc = subprocess.Popen(
    shlex.split(command), 
    stdout=slave, 
    stderr=slave, 
    close_fds=True, 
    bufsize=0 
) 

stdout = os.fdopen(master) 

E poi:

while True: 
    out = stdout.readline() 
    output_result = proc.poll() 
    if out == '' and output_result is not None: 
     break 
    if out != '': 
     print(out)