2012-05-02 4 views
61

Nel seguente codice di esempio, mi piacerebbe recuperare il valore restituito dalla funzione worker. Come posso fare questo? Dove è memorizzato questo valore?Come posso recuperare il valore restituito da una funzione passata a multiprocessing.Process?

Esempio di codice:

import multiprocessing 

def worker(procnum): 
    '''worker function''' 
    print str(procnum) + ' represent!' 
    return procnum 


if __name__ == '__main__': 
    jobs = [] 
    for i in range(5): 
     p = multiprocessing.Process(target=worker, args=(i,)) 
     jobs.append(p) 
     p.start() 

    for proc in jobs: 
     proc.join() 
    print jobs 

uscita:

0 represent! 
1 represent! 
2 represent! 
3 represent! 
4 represent! 
[<Process(Process-1, stopped)>, <Process(Process-2, stopped)>, <Process(Process-3, stopped)>, <Process(Process-4, stopped)>, <Process(Process-5, stopped)>] 

io non riesco a trovare l'attributo rilevante gli oggetti memorizzati in jobs.

Grazie in anticipo, BLZ

risposta

64

Usa shared variable per comunicare. Ad esempio come questo:

import multiprocessing 

def worker(procnum, return_dict): 
    '''worker function''' 
    print str(procnum) + ' represent!' 
    return_dict[procnum] = procnum 


if __name__ == '__main__': 
    manager = multiprocessing.Manager() 
    return_dict = manager.dict() 
    jobs = [] 
    for i in range(5): 
     p = multiprocessing.Process(target=worker, args=(i,return_dict)) 
     jobs.append(p) 
     p.start() 

    for proc in jobs: 
     proc.join() 
    print return_dict.values() 
+16

Ti consiglio di utilizzare una (https://docs.python.org/2/library/multiprocessing.html#multiprocessing.Queue), piuttosto che un 'manager [multiprocessing.Queue''] 'qui. L'uso di un 'Manager' richiede di generare un processo completamente nuovo, che è eccessivo quando un' Queue' farebbe. – dano

+1

@dano: Mi chiedo, se utilizziamo l'oggetto Queue(), non possiamo garantire l'ordine quando ogni processo restituisce il valore. Voglio dire se abbiamo bisogno dell'ordine nel risultato, per fare il prossimo lavoro. Come possiamo sapere esattamente da quale output proviene il processo – Catbuilts

+4

@Catbuilts Puoi restituire una tupla da ciascun processo, in cui un valore è il valore di ritorno effettivo a cui tieni, e l'altro è un identificatore univoco dal processo. Ma mi chiedo anche perché è necessario sapere quale processo sta restituendo quale valore. Se questo è ciò che devi sapere sul processo, o hai bisogno di correlare tra il tuo elenco di input e l'elenco di output? In tal caso, consiglierei di usare 'multiprocessing.Pool.map' per elaborare il tuo elenco di oggetti di lavoro. – dano

36

I pensa che l'approccio suggerito da @sega_sai sia il migliore. Ma in realtà ha bisogno di un esempio di codice, quindi ecco qui:

import multiprocessing 
from os import getpid 

def worker(procnum): 
    print 'I am number %d in process %d' % (procnum, getpid()) 
    return getpid() 

if __name__ == '__main__': 
    pool = multiprocessing.Pool(processes = 3) 
    print pool.map(worker, range(5)) 

che stamperà i valori di ritorno:

I am number 0 in process 19139 
I am number 1 in process 19138 
I am number 2 in process 19140 
I am number 3 in process 19139 
I am number 4 in process 19140 
[19139, 19138, 19140, 19139, 19140] 

Se si ha familiarità con map (Python built-in), questo non dovrebbe essere troppo impegnativo Altrimenti dai un'occhiata a sega_Sai's link.

Nota quanto poco codice è necessario. (Nota anche come i processi vengono riutilizzati).

+0

Qualche idea per cui il mio 'getpid()' restituisce tutti lo stesso valore? Sono in esecuzione Python3 – zelusp

+0

Non sono sicuro di come Pool distribuisca le attività sui lavoratori. Forse possono finire allo stesso lavoratore se sono veramente veloci? Succede costantemente? Anche se aggiungi un ritardo? – Mark

+0

Ho anche pensato che fosse una cosa legata alla velocità, ma quando nutro 'pool.map' un intervallo di 1.000.000 utilizzando più di 10 processi vedo al massimo due diversi pid. – zelusp

5

È possibile utilizzare il exit integrato per impostare il codice di uscita di un processo.Esso può essere ottenuto dal l'attributo del processo exitcode:

import multiprocessing 

def worker(procnum): 
    print str(procnum) + ' represent!' 
    exit(procnum) 

if __name__ == '__main__': 
    jobs = [] 
    for i in range(5): 
     p = multiprocessing.Process(target=worker, args=(i,)) 
     jobs.append(p) 
     p.start() 

    result = [] 
    for proc in jobs: 
     proc.join() 
     result.append(proc.exitcode) 
    print result 

uscita:

0 represent! 
1 represent! 
2 represent! 
3 represent! 
4 represent! 
[0, 1, 2, 3, 4] 
+2

Sappiate che questo approccio potrebbe creare confusione. I processi dovrebbero generalmente uscire con il codice di uscita 0 sono completati senza errori. Se si dispone di qualcosa che controlla i codici di uscita del processo del sistema, è possibile che vengano visualizzati come errori. – ferrouswheel

3

Per chiunque altro che sta cercando come ottenere un valore da un Process utilizzando Queue:

import multiprocessing 

ret = {'foo': False} 

def worker(queue): 
    ret = queue.get() 
    ret['foo'] = True 
    queue.put(ret) 

if __name__ == '__main__': 
    queue = multiprocessing.Queue() 
    queue.put(ret) 
    p = multiprocessing.Process(target=worker, args=(queue,)) 
    p.start() 
    print queue.get() # Prints {"foo": True} 
    p.join() 
+0

quando inserisco qualcosa in una coda nel mio processo di lavoro, il mio join non viene mai raggiunto. Qualche idea su come potrebbe venire? –

+0

@LaurensKoppenol vuoi dire che il tuo codice principale si blocca permanentemente su p.join() e non continua mai? Il tuo processo ha un ciclo infinito? –

+3

Sì, si blocca lì all'infinito. I miei lavoratori finiscono tutti (il ciclo all'interno della funzione di lavoro termina, l'istruzione di stampa in seguito viene stampata, per tutti i lavoratori). Il join non fa nulla. Se rimuovo 'Queue' dalla mia funzione, mi permette di passare' join() ' –

7

Questo esempio mostra come utilizzare un elenco di istanze multiprocessing.Pipe s per tornare stringhe da un numero arbitrario di processi:

import multiprocessing 

def worker(procnum, send_end): 
    '''worker function''' 
    result = str(procnum) + ' represent!' 
    print result 
    send_end.send(result) 

def main(): 
    jobs = [] 
    pipe_list = [] 
    for i in range(5): 
     recv_end, send_end = multiprocessing.Pipe(False) 
     p = multiprocessing.Process(target=worker, args=(i, send_end)) 
     jobs.append(p) 
     pipe_list.append(recv_end) 
     p.start() 

    for proc in jobs: 
     proc.join() 
    result_list = [x.recv() for x in pipe_list] 
    print result_list 

if __name__ == '__main__': 
    main() 

uscita:

0 represent! 
1 represent! 
2 represent! 
3 represent! 
4 represent! 
['0 represent!', '1 represent!', '2 represent!', '3 represent!', '4 represent!'] 

Questa soluzione utilizza meno risorse di un multiprocessing.Queue che utilizza

  • un tubo
  • almeno un lucchetto
  • un buffer
  • un filo

o un multiprocessing.SimpleQueue che utilizza

  • un tubo
  • almeno una serratura

È molto istruttivo guardare la fonte di ognuno di questi tipi.

+0

Quale sarebbe il modo migliore per farlo senza rendere i tubi una variabile globale? – Nickpick

+0

Ho inserito tutti i dati globali e il codice in una funzione principale e funziona allo stesso modo. Questo risponde alla tua domanda? –

+0

la pipe deve sempre essere letta prima che qualsiasi nuovo valore possa essere aggiunto (inviato) ad esso? – Nickpick

0

Ho modificato la risposta di vartec un po 'da quando avevo bisogno di ottenere i codici di errore dalla funzione. (Grazie vertec !!! è un trucco fantastico)

Questo può anche essere fatto con un manager.list ma penso che sia meglio averlo in un ditt e memorizzare una lista al suo interno. In questo modo, manteniamo la funzione e i risultati poiché non possiamo essere sicuri dell'ordine in cui verrà compilata la lista.

from multiprocessing import Process 
import time 
import datetime 
import multiprocessing 


def func1(fn, m_list): 
    print 'func1: starting' 
    time.sleep(1) 
    m_list[fn] = "this is the first function" 
    print 'func1: finishing' 
    # return "func1" # no need for return since Multiprocess doesnt return it =(

def func2(fn, m_list): 
    print 'func2: starting' 
    time.sleep(3) 
    m_list[fn] = "this is function 2" 
    print 'func2: finishing' 
    # return "func2" 

def func3(fn, m_list): 
    print 'func3: starting' 
    time.sleep(9) 
    # if fail wont join the rest because it never populate the dict 
    # or do a try/except to get something in return. 
    raise ValueError("failed here") 
    # if we want to get the error in the manager dict we can catch the error 
    try: 
     raise ValueError("failed here") 
     m_list[fn] = "this is third" 
    except: 
     m_list[fn] = "this is third and it fail horrible" 
     # print 'func3: finishing' 
     # return "func3" 


def runInParallel(*fns): # * is to accept any input in list 
    start_time = datetime.datetime.now() 
    proc = [] 
    manager = multiprocessing.Manager() 
    m_list = manager.dict() 
    for fn in fns: 
     # print fn 
     # print dir(fn) 
     p = Process(target=fn, name=fn.func_name, args=(fn, m_list)) 
     p.start() 
     proc.append(p) 
    for p in proc: 
     p.join() # 5 is the time out 

    print datetime.datetime.now() - start_time 
    return m_list, proc 

if __name__ == '__main__': 
    manager, proc = runInParallel(func1, func2, func3) 
    # print dir(proc[0]) 
    # print proc[0]._name 
    # print proc[0].name 
    # print proc[0].exitcode 

    # here you can check what did fail 
    for i in proc: 
     print i.name, i.exitcode # name was set up in the Process line 53 

    # here will only show the function that worked and where able to populate the 
    # manager dict 
    for i, j in manager.items(): 
     print dir(i) # things you can do to the function 
     print i, j 
1

Per qualche ragione, non riuscivo a trovare un esempio generale di come fare questo con Queue ovunque (esempi doc anche di Python non deporre le uova più processi), ecco quello che ho avuto a lavorare dopo come 10 tentativi :

def add_helper(queue, arg1, arg2): # the func called in child processes 
    ret = arg1 + arg2 
    queue.put(ret) 

def multi_add(): # spawns child processes 
    q = Queue() 
    processes = [] 
    rets = [] 
    for _ in range(0, 100): 
     p = Process(target=add_helper, args=(q, 1, 2)) 
     processes.append(p) 
     p.start() 
    for p in processes: 
     ret = q.get() # will block 
     rets.append(ret) 
    for p in processes: 
     p.join() 
    return rets 

Queue è un blocco, coda di thread-safe che è possibile utilizzare per memorizzare i valori di ritorno dai processi figlio. Quindi devi passare la coda ad ogni processo. Qualcosa di meno ovvio qui è che devi fare get() dalla coda prima di join allo Process oppure la coda si riempie e blocca tutto.

Aggiornamento per coloro che sono orientati agli oggetti (testato in Python 3.4):

from multiprocessing import Process, Queue 

class Multiprocessor(): 

    def __init__(self): 
     self.processes = [] 
     self.queue = Queue() 

    @staticmethod 
    def _wrapper(func, queue, args, kwargs): 
     ret = func(*args, **kwargs) 
     queue.put(ret) 

    def run(self, func, *args, **kwargs): 
     args2 = [func, self.queue, args, kwargs] 
     p = Process(target=self._wrapper, args=args2) 
     self.processes.append(p) 
     p.start() 

    def wait(self): 
     rets = [] 
     for p in self.processes: 
      ret = self.queue.get() 
      rets.append(ret) 
     for p in self.processes: 
      p.join() 
     return rets 

# tester 
if __name__ == "__main__": 
    mp = Multiprocessor() 
    num_proc = 64 
    for _ in range(num_proc): # queue up multiple tasks running `sum` 
     mp.run(sum, [1, 2, 3, 4, 5]) 
    ret = mp.wait() # get all results 
    print(ret) 
    assert len(ret) == num_proc and all(r == 15 for r in ret)