2016-06-08 38 views
5

Sto prendendo la mia prima incursione nel modulo di elaborazione multipla Python e sto incontrando alcuni problemi. Ho molta familiarità con il modulo di threading ma ho bisogno di assicurarmi che i processi che sto eseguendo siano eseguiti in parallelo.Multiprocessing di Python - AssertionError: può unire solo un processo figlio

Ecco uno schema di ciò che sto cercando di fare. Si prega di ignorare le cose come variabili/funzioni non dichiarate perché non posso incollare il mio codice per intero.

import multiprocessing 
import time 

def wrap_func_to_run(host, args, output): 
    output.append(do_something(host, args)) 
    return 

def func_to_run(host, args): 
    return do_something(host, args) 

def do_work(server, client, server_args, client_args): 
    server_output = func_to_run(server, server_args) 
    client_output = func_to_run(client, client_args) 
    #handle this output and return a result 
    return result 

def run_server_client(server, client, server_args, client_args, server_output, client_output): 
    server_process = multiprocessing.Process(target=wrap_func_to_run, args=(server, server_args, server_output)) 
    server_process.start() 
    client_process = multiprocessing.Process(target=wrap_func_to_run, args=(client, client_args, client_output)) 
    client_process.start() 
    server_process.join() 
    client_process.join() 
    #handle the output and return some result  

def run_in_parallel(server, client): 
    #set up commands for first process 
    server_output = client_output = [] 
    server_cmd = "cmd" 
    client_cmd = "cmd" 
    process_one = multiprocessing.Process(target=run_server_client, args=(server, client, server_cmd, client_cmd, server_output, client_output)) 
    process_one.start() 
    #set up second process to run - but this one can run here 
    result = do_work(server, client, "some server args", "some client args") 
    process_one.join() 
    #use outputs above and the result to determine result 
    return final_result 

def main(): 
    #grab client 
    client = client() 
    #grab server 
    server = server() 
    return run_in_parallel(server, client) 

if __name__ == "__main__": 
    main() 

Ecco l'errore che sto ricevendo:

Error in sys.exitfunc: 
Traceback (most recent call last): 
    File "/usr/lib64/python2.7/atexit.py", line 24, in _run_exitfuncs 
    func(*targs, **kargs) 
    File "/usr/lib64/python2.7/multiprocessing/util.py", line 319, in _exit_function 
    p.join() 
    File "/usr/lib64/python2.7/multiprocessing/process.py", line 143, in join 
    assert self._parent_pid == os.getpid(), 'can only join a child process' 
AssertionError: can only join a child process 

Ho provato un sacco di cose diverse per risolvere questo ma la mia sensazione è che ci sia qualcosa di sbagliato nel modo sto usando questo modulo.

EDIT:

così ho creato un file che verrà riprodurre questo simulando il client/server e il lavoro che fanno - anche ho perso un punto importante, che era che stavo correndo questo in UNIX. Un altro dato importante è che do_work nel mio caso implica l'uso di os.fork(). Non ero in grado di riprodurre l'errore senza usare anche os.fork() quindi presumo che il problema ci sia. Nel mio caso del mondo reale, quella parte del codice non era mia, quindi la trattavo come una scatola nera (probabilmente un errore da parte mia). In ogni modo ecco il codice per riprodurre -

#!/usr/bin/python 

import multiprocessing 
import time 
import os 
import signal 
import sys 

class Host(): 
    def __init__(self): 
     self.name = "host" 

    def work(self): 
     #override - use to simulate work 
     pass 

class Server(Host): 
    def __init__(self): 
     self.name = "server" 

    def work(self): 
     x = 0 
     for i in range(10000): 
      x+=1 
     print x 
     time.sleep(1) 

class Client(Host): 
    def __init__(self): 
     self.name = "client" 

    def work(self): 
     x = 0 
     for i in range(5000): 
      x+=1 
     print x 
     time.sleep(1) 

def func_to_run(host, args): 
    print host.name + " is working" 
    host.work() 
    print host.name + ": " + args 
    return "done" 

def do_work(server, client, server_args, client_args): 
    print "in do_work" 
    server_output = client_output = "" 
    child_pid = os.fork() 
    if child_pid == 0: 
     server_output = func_to_run(server, server_args) 
     sys.exit(server_output) 
    time.sleep(1) 

    client_output = func_to_run(client, client_args) 
    # kill and wait for server to finish 
    os.kill(child_pid, signal.SIGTERM) 
    (pid, status) = os.waitpid(child_pid, 0) 

    return (server_output == "done" and client_output =="done") 

def run_server_client(server, client, server_args, client_args): 
    server_process = multiprocessing.Process(target=func_to_run, args=(server, server_args)) 
    print "Starting server process" 
    server_process.start() 
    client_process = multiprocessing.Process(target=func_to_run, args=(client, client_args)) 
    print "Starting client process" 
    client_process.start() 
    print "joining processes" 
    server_process.join() 
    client_process.join() 
    print "processes joined and done" 

def run_in_parallel(server, client): 
    #set up commands for first process 
    server_cmd = "server command for run_server_client" 
    client_cmd = "client command for run_server_client" 
    process_one = multiprocessing.Process(target=run_server_client, args=(server, client, server_cmd, client_cmd)) 
    print "Starting process one" 
    process_one.start() 
    #set up second process to run - but this one can run here 
    print "About to do work" 
    result = do_work(server, client, "server args from do work", "client args from do work") 
    print "Joining process one" 
    process_one.join() 
    #use outputs above and the result to determine result 
    print "Process one has joined" 
    return result 

def main(): 
    #grab client 
    client = Client() 
    #grab server 
    server = Server() 
    return run_in_parallel(server, client) 

if __name__ == "__main__": 
    main() 

Se rimuovo l'uso di os.fork() in do_work non ottengo l'errore e il codice si comporta come mi sarei aspettato prima (tranne che per il passaggio di uscite che ho ho accettato come mio errore/incomprensione). Posso cambiare il vecchio codice per non usare os.fork() ma vorrei anche sapere perché questo ha causato questo problema e se c'è una soluzione praticabile.

EDIT 2:

ho iniziato a lavorare su una soluzione che omette os.fork() prima della risposta accettata. Ecco quello che ho con qualche ritocco per la quantità di lavoro simulato che può essere fatto -

#!/usr/bin/python 

import multiprocessing 
import time 
import os 
import signal 
import sys 
from Queue import Empty 

class Host(): 
    def __init__(self): 
     self.name = "host" 

    def work(self, w): 
     #override - use to simulate work 
     pass 

class Server(Host): 
    def __init__(self): 
     self.name = "server" 

    def work(self, w): 
     x = 0 
     for i in range(w): 
      x+=1 
     print x 
     time.sleep(1) 

class Client(Host): 
    def __init__(self): 
     self.name = "client" 

    def work(self, w): 
     x = 0 
     for i in range(w): 
      x+=1 
     print x 
     time.sleep(1) 

def func_to_run(host, args, w, q): 
    print host.name + " is working" 
    host.work(w) 
    print host.name + ": " + args 
    q.put("ZERO") 
    return "done" 

def handle_queue(queue): 
    done = False 
    results = [] 
    return_val = 0 
    while not done: 
     #try to grab item from Queue 
     tr = None 
     try: 
      tr = queue.get_nowait() 
      print "found element in queue" 
      print tr 
     except Empty: 
      done = True 
     if tr is not None: 
      results.append(tr) 
    for el in results: 
     if el != "ZERO": 
      return_val = 1 
    return return_val 

def do_work(server, client, server_args, client_args): 
    print "in do_work" 
    server_output = client_output = "" 
    child_pid = os.fork() 
    if child_pid == 0: 
     server_output = func_to_run(server, server_args) 
     sys.exit(server_output) 
    time.sleep(1) 

    client_output = func_to_run(client, client_args) 
    # kill and wait for server to finish 
    os.kill(child_pid, signal.SIGTERM) 
    (pid, status) = os.waitpid(child_pid, 0) 

    return (server_output == "done" and client_output =="done") 



def run_server_client(server, client, server_args, client_args, w, mq): 
    local_queue = multiprocessing.Queue() 
    server_process = multiprocessing.Process(target=func_to_run, args=(server, server_args, w, local_queue)) 
    print "Starting server process" 
    server_process.start() 
    client_process = multiprocessing.Process(target=func_to_run, args=(client, client_args, w, local_queue)) 
    print "Starting client process" 
    client_process.start() 
    print "joining processes" 
    server_process.join() 
    client_process.join() 
    print "processes joined and done" 
    if handle_queue(local_queue) == 0: 
     mq.put("ZERO") 

def run_in_parallel(server, client): 
    #set up commands for first process 
    master_queue = multiprocessing.Queue() 
    server_cmd = "server command for run_server_client" 
    client_cmd = "client command for run_server_client" 
    process_one = multiprocessing.Process(target=run_server_client, args=(server, client, server_cmd, client_cmd, 400000000, master_queue)) 
    print "Starting process one" 
    process_one.start() 
    #set up second process to run - but this one can run here 
    print "About to do work" 
    #result = do_work(server, client, "server args from do work", "client args from do work") 
    run_server_client(server, client, "server args from do work", "client args from do work", 5000, master_queue) 
    print "Joining process one" 
    process_one.join() 
    #use outputs above and the result to determine result 
    print "Process one has joined" 
    return_val = handle_queue(master_queue) 
    print return_val 
    return return_val 

def main(): 
    #grab client 
    client = Client() 
    #grab server 
    server = Server() 
    val = run_in_parallel(server, client) 
    if val: 
     print "failed" 
    else: 
     print "passed" 
    return val 

if __name__ == "__main__": 
    main() 

Questo codice ha alcune stampe tweaked solo per vedere esattamente cosa sta succedendo. Ho usato un multiprocessing.Queue per archiviare e condividere gli output tra i processi e tornare al thread principale da gestire. Penso che questo risolva la parte python del mio problema ma ci sono ancora alcuni problemi nel codice su cui sto lavorando. L'unica altra cosa che posso dire è che l'equivalente a func_to_run implica l'invio di un comando su ssh e l'acquisizione di qualsiasi errore insieme all'output. Per qualche ragione, questo funziona perfettamente bene per un comando che ha un tempo di esecuzione basso, ma non per un comando che ha un tempo/uscita di esecuzione molto più grande. Ho provato a simulare questo con i valori di lavoro drasticamente diversi nel mio codice qui, ma non sono stato in grado di riprodurre risultati simili.

EDIT 3 codice Biblioteca sto usando (ancora una volta non la mia) utilizza Popen.wait() per i comandi ssh e ho appena letto questo:

Popen.wait() Wait for child process to terminate. Set and return returncode attribute.

Warning This will deadlock when using stdout=PIPE and/or stderr=PIPE and the >child process generates enough output to a pipe such that it blocks waiting for >the OS pipe buffer to accept more data. Use communicate() to avoid that.

ho regolato il codice per non tamponare e basta stampare così com'è ricevuto e tutto funziona.

+2

Hai più problemi qui. Prima di tutto: 'output.append()' probabilmente non fa quello che vuoi quando usi il modulo 'multiprocessing'. In secondo luogo, il problema da te segnalato è come descritto: non puoi chiamare '.join()' su un 'Processo' che non è di proprietà del' Processo' attualmente in esecuzione. Confessi che il tuo esempio è sintetico, quindi è difficile dire dove si trova il problema. Sei sicuro di assegnare il risultato di 'Process()' a una variabile locale di breve durata come in questo esempio? O stai invece utilizzando una variabile globale o di istanza ('self.process_one', ad es.)? –

+0

Per il primo problema - hai ragione. Funziona quando ho usato i thread ma probabilmente a causa dello spazio di memoria condiviso. Le variabili per i processi sono locali. Questi processi non sono di proprietà del processo di chiamata? Sto solo unendo i processi nelle funzioni in cui sono stati creati, quindi presumo che la proprietà sia ok a meno che non ci sia un problema di programmazione. –

+0

Ah! Lo vedo ora, stai chiamando 'join' in un gestore' atexit', ma solo come 'multiprocessing' sta cercando di ripulire se stesso. Speculerò che questo è il risultato del passaggio di un'istanza 'Process' a un altro processo. Se questo è il caso, è IMO un bug sottile in CPython. 'Processo' dovrebbe rifiutarsi di essere sottaceto e darti un'eccezione quando provi a passarlo. So che non è facile creare un riproduttore minimale, ma penso che scoprirai che vale il tuo tempo. Aiuterà te e noi a identificare l'elemento chiave del bug. –

risposta

3

I can change the old code to not use os.fork() but I'd also like to know why this caused this problem and if there's a workable solution.

La chiave per capire il problema è sapere esattamente che cosa fork() fa. I documenti CPython indicano "Fork a child process." ma questo presume che tu abbia capito la chiamata alla libreria C fork().

Ecco cosa dice manpage di glibc su di esso:

fork() creates a new process by duplicating the calling process. The new process, referred to as the child, is an exact duplicate of the calling process, referred to as the parent, except for the following points: ...

E 'fondamentalmente come se hai preso il vostro programma e fatto una copia del suo stato del programma (heap, stack, puntatore all'istruzione, ecc), con piccole differenze e lasciare Esegue indipendentemente dall'originale. Quando questo processo figlio si chiude in modo naturale, utilizzerà exit() e questo attiverà i gestori atexit() registrati dal modulo multiprocessing.

Cosa si può fare per evitarlo?

  • omettere os.fork(): utilizzare multiprocessing invece, come si stanno esplorando ora
  • probabilmente efficace: dopo l'esecuzione import multiprocessingfork(), solo nel bambino o genitore, se necessario.
  • uso _exit() nel bambino (stato documenti CPython, "Nota Il metodo standard per uscire è sys.exit (n). _exit() dovrebbe normalmente essere utilizzato solo nel processo figlio dopo una fork().")

https://docs.python.org/2/library/os.html#os._exit

+0

Ho intenzione di accettare questo come risposta. Grazie mille per il tuo aiuto! Ho modificato la domanda un'ultima volta con alcune note in più e l'implementazione che avevo iniziato per il tuo primo suggerimento. –

0

Mi sembra che lo stiate facendo una volta di più.Non lo inserisco da run_in_parallel, ma semplicemente chiamando lo run_server_client con gli argomenti appropriati, perché verranno inseriti all'interno.

+0

Ma questo blocco non sarà completato? Ho bisogno sia di 'run_server_client' che di' do_work' per essere eseguiti nello stesso momento, motivo per cui creo qui un processo separato. –