Sto prendendo la mia prima incursione nel modulo di elaborazione multipla Python e sto incontrando alcuni problemi. Ho molta familiarità con il modulo di threading ma ho bisogno di assicurarmi che i processi che sto eseguendo siano eseguiti in parallelo.Multiprocessing di Python - AssertionError: può unire solo un processo figlio
Ecco uno schema di ciò che sto cercando di fare. Si prega di ignorare le cose come variabili/funzioni non dichiarate perché non posso incollare il mio codice per intero.
import multiprocessing
import time
def wrap_func_to_run(host, args, output):
output.append(do_something(host, args))
return
def func_to_run(host, args):
return do_something(host, args)
def do_work(server, client, server_args, client_args):
server_output = func_to_run(server, server_args)
client_output = func_to_run(client, client_args)
#handle this output and return a result
return result
def run_server_client(server, client, server_args, client_args, server_output, client_output):
server_process = multiprocessing.Process(target=wrap_func_to_run, args=(server, server_args, server_output))
server_process.start()
client_process = multiprocessing.Process(target=wrap_func_to_run, args=(client, client_args, client_output))
client_process.start()
server_process.join()
client_process.join()
#handle the output and return some result
def run_in_parallel(server, client):
#set up commands for first process
server_output = client_output = []
server_cmd = "cmd"
client_cmd = "cmd"
process_one = multiprocessing.Process(target=run_server_client, args=(server, client, server_cmd, client_cmd, server_output, client_output))
process_one.start()
#set up second process to run - but this one can run here
result = do_work(server, client, "some server args", "some client args")
process_one.join()
#use outputs above and the result to determine result
return final_result
def main():
#grab client
client = client()
#grab server
server = server()
return run_in_parallel(server, client)
if __name__ == "__main__":
main()
Ecco l'errore che sto ricevendo:
Error in sys.exitfunc:
Traceback (most recent call last):
File "/usr/lib64/python2.7/atexit.py", line 24, in _run_exitfuncs
func(*targs, **kargs)
File "/usr/lib64/python2.7/multiprocessing/util.py", line 319, in _exit_function
p.join()
File "/usr/lib64/python2.7/multiprocessing/process.py", line 143, in join
assert self._parent_pid == os.getpid(), 'can only join a child process'
AssertionError: can only join a child process
Ho provato un sacco di cose diverse per risolvere questo ma la mia sensazione è che ci sia qualcosa di sbagliato nel modo sto usando questo modulo.
EDIT:
così ho creato un file che verrà riprodurre questo simulando il client/server e il lavoro che fanno - anche ho perso un punto importante, che era che stavo correndo questo in UNIX. Un altro dato importante è che do_work
nel mio caso implica l'uso di os.fork()
. Non ero in grado di riprodurre l'errore senza usare anche os.fork()
quindi presumo che il problema ci sia. Nel mio caso del mondo reale, quella parte del codice non era mia, quindi la trattavo come una scatola nera (probabilmente un errore da parte mia). In ogni modo ecco il codice per riprodurre -
#!/usr/bin/python
import multiprocessing
import time
import os
import signal
import sys
class Host():
def __init__(self):
self.name = "host"
def work(self):
#override - use to simulate work
pass
class Server(Host):
def __init__(self):
self.name = "server"
def work(self):
x = 0
for i in range(10000):
x+=1
print x
time.sleep(1)
class Client(Host):
def __init__(self):
self.name = "client"
def work(self):
x = 0
for i in range(5000):
x+=1
print x
time.sleep(1)
def func_to_run(host, args):
print host.name + " is working"
host.work()
print host.name + ": " + args
return "done"
def do_work(server, client, server_args, client_args):
print "in do_work"
server_output = client_output = ""
child_pid = os.fork()
if child_pid == 0:
server_output = func_to_run(server, server_args)
sys.exit(server_output)
time.sleep(1)
client_output = func_to_run(client, client_args)
# kill and wait for server to finish
os.kill(child_pid, signal.SIGTERM)
(pid, status) = os.waitpid(child_pid, 0)
return (server_output == "done" and client_output =="done")
def run_server_client(server, client, server_args, client_args):
server_process = multiprocessing.Process(target=func_to_run, args=(server, server_args))
print "Starting server process"
server_process.start()
client_process = multiprocessing.Process(target=func_to_run, args=(client, client_args))
print "Starting client process"
client_process.start()
print "joining processes"
server_process.join()
client_process.join()
print "processes joined and done"
def run_in_parallel(server, client):
#set up commands for first process
server_cmd = "server command for run_server_client"
client_cmd = "client command for run_server_client"
process_one = multiprocessing.Process(target=run_server_client, args=(server, client, server_cmd, client_cmd))
print "Starting process one"
process_one.start()
#set up second process to run - but this one can run here
print "About to do work"
result = do_work(server, client, "server args from do work", "client args from do work")
print "Joining process one"
process_one.join()
#use outputs above and the result to determine result
print "Process one has joined"
return result
def main():
#grab client
client = Client()
#grab server
server = Server()
return run_in_parallel(server, client)
if __name__ == "__main__":
main()
Se rimuovo l'uso di os.fork()
in do_work
non ottengo l'errore e il codice si comporta come mi sarei aspettato prima (tranne che per il passaggio di uscite che ho ho accettato come mio errore/incomprensione). Posso cambiare il vecchio codice per non usare os.fork() ma vorrei anche sapere perché questo ha causato questo problema e se c'è una soluzione praticabile.
EDIT 2:
ho iniziato a lavorare su una soluzione che omette os.fork() prima della risposta accettata. Ecco quello che ho con qualche ritocco per la quantità di lavoro simulato che può essere fatto -
#!/usr/bin/python
import multiprocessing
import time
import os
import signal
import sys
from Queue import Empty
class Host():
def __init__(self):
self.name = "host"
def work(self, w):
#override - use to simulate work
pass
class Server(Host):
def __init__(self):
self.name = "server"
def work(self, w):
x = 0
for i in range(w):
x+=1
print x
time.sleep(1)
class Client(Host):
def __init__(self):
self.name = "client"
def work(self, w):
x = 0
for i in range(w):
x+=1
print x
time.sleep(1)
def func_to_run(host, args, w, q):
print host.name + " is working"
host.work(w)
print host.name + ": " + args
q.put("ZERO")
return "done"
def handle_queue(queue):
done = False
results = []
return_val = 0
while not done:
#try to grab item from Queue
tr = None
try:
tr = queue.get_nowait()
print "found element in queue"
print tr
except Empty:
done = True
if tr is not None:
results.append(tr)
for el in results:
if el != "ZERO":
return_val = 1
return return_val
def do_work(server, client, server_args, client_args):
print "in do_work"
server_output = client_output = ""
child_pid = os.fork()
if child_pid == 0:
server_output = func_to_run(server, server_args)
sys.exit(server_output)
time.sleep(1)
client_output = func_to_run(client, client_args)
# kill and wait for server to finish
os.kill(child_pid, signal.SIGTERM)
(pid, status) = os.waitpid(child_pid, 0)
return (server_output == "done" and client_output =="done")
def run_server_client(server, client, server_args, client_args, w, mq):
local_queue = multiprocessing.Queue()
server_process = multiprocessing.Process(target=func_to_run, args=(server, server_args, w, local_queue))
print "Starting server process"
server_process.start()
client_process = multiprocessing.Process(target=func_to_run, args=(client, client_args, w, local_queue))
print "Starting client process"
client_process.start()
print "joining processes"
server_process.join()
client_process.join()
print "processes joined and done"
if handle_queue(local_queue) == 0:
mq.put("ZERO")
def run_in_parallel(server, client):
#set up commands for first process
master_queue = multiprocessing.Queue()
server_cmd = "server command for run_server_client"
client_cmd = "client command for run_server_client"
process_one = multiprocessing.Process(target=run_server_client, args=(server, client, server_cmd, client_cmd, 400000000, master_queue))
print "Starting process one"
process_one.start()
#set up second process to run - but this one can run here
print "About to do work"
#result = do_work(server, client, "server args from do work", "client args from do work")
run_server_client(server, client, "server args from do work", "client args from do work", 5000, master_queue)
print "Joining process one"
process_one.join()
#use outputs above and the result to determine result
print "Process one has joined"
return_val = handle_queue(master_queue)
print return_val
return return_val
def main():
#grab client
client = Client()
#grab server
server = Server()
val = run_in_parallel(server, client)
if val:
print "failed"
else:
print "passed"
return val
if __name__ == "__main__":
main()
Questo codice ha alcune stampe tweaked solo per vedere esattamente cosa sta succedendo. Ho usato un multiprocessing.Queue per archiviare e condividere gli output tra i processi e tornare al thread principale da gestire. Penso che questo risolva la parte python del mio problema ma ci sono ancora alcuni problemi nel codice su cui sto lavorando. L'unica altra cosa che posso dire è che l'equivalente a func_to_run
implica l'invio di un comando su ssh e l'acquisizione di qualsiasi errore insieme all'output. Per qualche ragione, questo funziona perfettamente bene per un comando che ha un tempo di esecuzione basso, ma non per un comando che ha un tempo/uscita di esecuzione molto più grande. Ho provato a simulare questo con i valori di lavoro drasticamente diversi nel mio codice qui, ma non sono stato in grado di riprodurre risultati simili.
EDIT 3 codice Biblioteca sto usando (ancora una volta non la mia) utilizza Popen.wait()
per i comandi ssh e ho appena letto questo:
Popen.wait()
Wait for child process to terminate. Set and return returncode attribute.Warning This will deadlock when using stdout=PIPE and/or stderr=PIPE and the >child process generates enough output to a pipe such that it blocks waiting for >the OS pipe buffer to accept more data. Use communicate() to avoid that.
ho regolato il codice per non tamponare e basta stampare così com'è ricevuto e tutto funziona.
Hai più problemi qui. Prima di tutto: 'output.append()' probabilmente non fa quello che vuoi quando usi il modulo 'multiprocessing'. In secondo luogo, il problema da te segnalato è come descritto: non puoi chiamare '.join()' su un 'Processo' che non è di proprietà del' Processo' attualmente in esecuzione. Confessi che il tuo esempio è sintetico, quindi è difficile dire dove si trova il problema. Sei sicuro di assegnare il risultato di 'Process()' a una variabile locale di breve durata come in questo esempio? O stai invece utilizzando una variabile globale o di istanza ('self.process_one', ad es.)? –
Per il primo problema - hai ragione. Funziona quando ho usato i thread ma probabilmente a causa dello spazio di memoria condiviso. Le variabili per i processi sono locali. Questi processi non sono di proprietà del processo di chiamata? Sto solo unendo i processi nelle funzioni in cui sono stati creati, quindi presumo che la proprietà sia ok a meno che non ci sia un problema di programmazione. –
Ah! Lo vedo ora, stai chiamando 'join' in un gestore' atexit', ma solo come 'multiprocessing' sta cercando di ripulire se stesso. Speculerò che questo è il risultato del passaggio di un'istanza 'Process' a un altro processo. Se questo è il caso, è IMO un bug sottile in CPython. 'Processo' dovrebbe rifiutarsi di essere sottaceto e darti un'eccezione quando provi a passarlo. So che non è facile creare un riproduttore minimale, ma penso che scoprirai che vale il tuo tempo. Aiuterà te e noi a identificare l'elemento chiave del bug. –