2012-03-21 19 views
9

Possiedo un eseguibile che chiamo utilizzando il sottoprocesso.Popen. Quindi, ho intenzione di dargli dei dati via stdin usando un thread che legge il suo valore da una coda che verrà poi popolata in un altro thread. L'output dovrebbe essere letto utilizzando la pipe stdout in un altro thread e nuovamente ordinato in una coda.python: lettura dell'output di sottoprocesso nei thread

Per quanto ho capito dalla mia precedente ricerca, l'utilizzo di thread con Queue è una buona pratica.

L'eseguibile esterno, purtroppo, non mi fornirà rapidamente una risposta per ogni linea inserita, in modo che i cicli di scrittura e di lettura semplici non siano un'opzione. L'eseguibile implementa del multithreading interno e voglio l'output non appena diventa disponibile, quindi il thread del lettore aggiuntivo.

Come esempio per testare l'eseguibile sarà solo mischiare ogni riga (shuffleline.py):

#!/usr/bin/python -u 
import sys 
from random import shuffle 

for line in sys.stdin: 
    line = line.strip() 

    # shuffle line 
    line = list(line) 
    shuffle(line) 
    line = "".join(line) 

    sys.stdout.write("%s\n"%(line)) 
    sys.stdout.flush() # avoid buffers 

Si prega di notare che questo è già come buffer possibile. O non è vero? Questo è il mio ridotta programma di test:

#!/usr/bin/python -u 
import sys 
import Queue 
import threading 
import subprocess 

class WriteThread(threading.Thread): 
    def __init__(self, p_in, source_queue): 
     threading.Thread.__init__(self) 
     self.pipe = p_in 
     self.source_queue = source_queue 

    def run(self): 
     while True: 
      source = self.source_queue.get() 
      print "writing to process: ", repr(source) 
      self.pipe.write(source) 
      self.pipe.flush() 
      self.source_queue.task_done() 

class ReadThread(threading.Thread): 
    def __init__(self, p_out, target_queue): 
     threading.Thread.__init__(self) 
     self.pipe = p_out 
     self.target_queue = target_queue 

    def run(self): 
     while True: 
      line = self.pipe.readline() # blocking read 
      if line == '': 
       break 
      print "reader read: ", line.rstrip() 
      self.target_queue.put(line) 

if __name__ == "__main__": 

    cmd = ["python", "-u", "./shuffleline.py"] # unbuffered 
    proc = subprocess.Popen(cmd, bufsize=0, stdin=subprocess.PIPE, stdout=subprocess.PIPE) 

    source_queue = Queue.Queue() 
    target_queue = Queue.Queue() 

    writer = WriteThread(proc.stdin, source_queue) 
    writer.setDaemon(True) 
    writer.start() 

    reader = ReadThread(proc.stdout, target_queue) 
    reader.setDaemon(True) 
    reader.start() 

    # populate queue 
    for i in range(10): 
     source_queue.put("string %s\n" %i) 
    source_queue.put("") 

    print "source_queue empty: ", source_queue.empty() 
    print "target_queue empty: ", target_queue.empty() 

    import time 
    time.sleep(2) # expect some output from reader thread 

    source_queue.join() # wait until all items in source_queue are processed 
    proc.stdin.close() # should end the subprocess 
    proc.wait() 

questo dare il seguente output (python2.7):

writing to process: 'string 0\n' 
writing to process: 'string 1\n' 
writing to process: 'string 2\n' 
writing to process: 'string 3\n' 
writing to process: 'string 4\n' 
writing to process: 'string 5\n' 
writing to process: 'string 6\n' 
source_queue empty: writing to process: 'string 7\n' 
writing to process: 'string 8\n' 
writing to process: 'string 9\n' 
writing to process: '' 
True 
target_queue empty: True 

poi nulla per 2 secondi ...

reader read: rgsn0i t 
reader read: nrg1sti 
reader read: tis n2rg 
reader read: snt gri3 
reader read: nsri4 tg 
reader read: stir5 gn 
reader read: gnri6ts 
reader read: ngrits7 
reader read: 8nsrt ig 
reader read: sg9 nitr 

L'interleaving all'inizio è previsto. Tuttavia, l'output del sottoprocesso non viene visualizzato fino a dopo il. Il sottoprocesso termina. Con più linee collegate, ottengo un po 'di output, quindi presumo un problema di cache nella pipe stdout. Secondo le altre domande pubblicate qui lo svuotamento di stdout (nel sottoprocesso) dovrebbe funzionare, almeno su Linux.

risposta

7

Il tuo problema non ha nulla a che fare il modulo subprocess, o fili (problematici così come sono), o anche sottoprocessi di miscelazione e fili (un molto cattiva idea, anche peggio di utilizzare le discussioni per cominciare, se non sei utilizzando il backport del modulo sottoprocesso Python 3.2 di che si può ottenere da code.google.com/p/python-subprocess32) o accedendo le stesse cose da più thread (come le vostre print dichiarazioni fanno.)

quello che succede è che il vostro shuffleline.py buffer di programma. Non in uscita, ma in input. Sebbene non sia molto ovvio, quando si esegue un'iterazione su un fileobject, Python leggerà in blocchi, in genere 8k byte.Dal momento che sys.stdin è un FileObject, il ciclo for sarà tampone fino EOF o un intero isolato:

for line in sys.stdin: 
    line = line.strip() 
    .... 

Se si vuole non fare questo, utilizzare un ciclo while per chiamare sys.stdin.readline() (che restituisce '' per EOF):

while True: 
    line = sys.stdin.readline() 
    if not line: 
     break 
    line = line.strip() 
    ... 

o utilizzare la forma a due argomenti di iter(), che crea un iteratore che chiama il primo argomento finché il secondo argomento (la "sentinella") viene restituito:

for line in iter(sys.stdin.readline, ''): 
    line = line.strip() 
    ... 

Sarei anche negligente se non suggerissi di non usare thread per questo, ma I/O non bloccante sui pipe del sottoprocesso, o anche qualcosa come twisted.reactor.spawnProcess che ha molti modi di aggancio processi e altre cose insieme come consumatori e produttori.

+0

Grazie, questa è la soluzione! – muckl

+1

Posso chiedere perché il mix di sottoprocesso e thread è un approccio così terribile? Sembra più elegante di chiamare l'I/O non bloccante più e più volte mentre nulla sta accadendo. Ovviamente i thread non dovrebbero accedere a nessuna infrastruttura dati non protetta da un blocco, ma solo la lettura e la scrittura da o verso una coda sembra sicura. Le modifiche nel backport Python3.2 sono importanti per un caso così semplice come il mio? – muckl

+3

Il problema con i thread e il sottoprocesso in particolare è il problema della miscelazione di thread e fork. Vedi http://www.linuxprogrammingblog.com/threads-and-fork-think-twice-before-using-them e altri articoli simili. Il backport di subprocesso di Python 3.2 risolve questi problemi. Per quanto riguarda i thread in generale, il problema principale è che sono difficili da controllare e eseguire il debug. Ad esempio, non puoi ucciderli da "fuori" dal thread, quindi se un thread è bloccato in una lettura o in una scrittura non c'è nulla che tu possa fare al riguardo. –