Possiedo un eseguibile che chiamo utilizzando il sottoprocesso.Popen. Quindi, ho intenzione di dargli dei dati via stdin usando un thread che legge il suo valore da una coda che verrà poi popolata in un altro thread. L'output dovrebbe essere letto utilizzando la pipe stdout in un altro thread e nuovamente ordinato in una coda.python: lettura dell'output di sottoprocesso nei thread
Per quanto ho capito dalla mia precedente ricerca, l'utilizzo di thread con Queue è una buona pratica.
L'eseguibile esterno, purtroppo, non mi fornirà rapidamente una risposta per ogni linea inserita, in modo che i cicli di scrittura e di lettura semplici non siano un'opzione. L'eseguibile implementa del multithreading interno e voglio l'output non appena diventa disponibile, quindi il thread del lettore aggiuntivo.
Come esempio per testare l'eseguibile sarà solo mischiare ogni riga (shuffleline.py):
#!/usr/bin/python -u
import sys
from random import shuffle
for line in sys.stdin:
line = line.strip()
# shuffle line
line = list(line)
shuffle(line)
line = "".join(line)
sys.stdout.write("%s\n"%(line))
sys.stdout.flush() # avoid buffers
Si prega di notare che questo è già come buffer possibile. O non è vero? Questo è il mio ridotta programma di test:
#!/usr/bin/python -u
import sys
import Queue
import threading
import subprocess
class WriteThread(threading.Thread):
def __init__(self, p_in, source_queue):
threading.Thread.__init__(self)
self.pipe = p_in
self.source_queue = source_queue
def run(self):
while True:
source = self.source_queue.get()
print "writing to process: ", repr(source)
self.pipe.write(source)
self.pipe.flush()
self.source_queue.task_done()
class ReadThread(threading.Thread):
def __init__(self, p_out, target_queue):
threading.Thread.__init__(self)
self.pipe = p_out
self.target_queue = target_queue
def run(self):
while True:
line = self.pipe.readline() # blocking read
if line == '':
break
print "reader read: ", line.rstrip()
self.target_queue.put(line)
if __name__ == "__main__":
cmd = ["python", "-u", "./shuffleline.py"] # unbuffered
proc = subprocess.Popen(cmd, bufsize=0, stdin=subprocess.PIPE, stdout=subprocess.PIPE)
source_queue = Queue.Queue()
target_queue = Queue.Queue()
writer = WriteThread(proc.stdin, source_queue)
writer.setDaemon(True)
writer.start()
reader = ReadThread(proc.stdout, target_queue)
reader.setDaemon(True)
reader.start()
# populate queue
for i in range(10):
source_queue.put("string %s\n" %i)
source_queue.put("")
print "source_queue empty: ", source_queue.empty()
print "target_queue empty: ", target_queue.empty()
import time
time.sleep(2) # expect some output from reader thread
source_queue.join() # wait until all items in source_queue are processed
proc.stdin.close() # should end the subprocess
proc.wait()
questo dare il seguente output (python2.7):
writing to process: 'string 0\n'
writing to process: 'string 1\n'
writing to process: 'string 2\n'
writing to process: 'string 3\n'
writing to process: 'string 4\n'
writing to process: 'string 5\n'
writing to process: 'string 6\n'
source_queue empty: writing to process: 'string 7\n'
writing to process: 'string 8\n'
writing to process: 'string 9\n'
writing to process: ''
True
target_queue empty: True
poi nulla per 2 secondi ...
reader read: rgsn0i t
reader read: nrg1sti
reader read: tis n2rg
reader read: snt gri3
reader read: nsri4 tg
reader read: stir5 gn
reader read: gnri6ts
reader read: ngrits7
reader read: 8nsrt ig
reader read: sg9 nitr
L'interleaving all'inizio è previsto. Tuttavia, l'output del sottoprocesso non viene visualizzato fino a dopo il. Il sottoprocesso termina. Con più linee collegate, ottengo un po 'di output, quindi presumo un problema di cache nella pipe stdout. Secondo le altre domande pubblicate qui lo svuotamento di stdout (nel sottoprocesso) dovrebbe funzionare, almeno su Linux.
Grazie, questa è la soluzione! – muckl
Posso chiedere perché il mix di sottoprocesso e thread è un approccio così terribile? Sembra più elegante di chiamare l'I/O non bloccante più e più volte mentre nulla sta accadendo. Ovviamente i thread non dovrebbero accedere a nessuna infrastruttura dati non protetta da un blocco, ma solo la lettura e la scrittura da o verso una coda sembra sicura. Le modifiche nel backport Python3.2 sono importanti per un caso così semplice come il mio? – muckl
Il problema con i thread e il sottoprocesso in particolare è il problema della miscelazione di thread e fork. Vedi http://www.linuxprogrammingblog.com/threads-and-fork-think-twice-before-using-them e altri articoli simili. Il backport di subprocesso di Python 3.2 risolve questi problemi. Per quanto riguarda i thread in generale, il problema principale è che sono difficili da controllare e eseguire il debug. Ad esempio, non puoi ucciderli da "fuori" dal thread, quindi se un thread è bloccato in una lettura o in una scrittura non c'è nulla che tu possa fare al riguardo. –