2012-01-10 7 views
136

Mi dispiace che non riesca a riprodurre l'errore con un esempio più semplice e che il mio codice sia troppo complicato da pubblicare. Se eseguo il programma nella shell IPython invece del normale python, le cose funzionano bene.Errore di decolorazione multiprocessing in Python

Ho cercato alcune note precedenti su questo problema. Sono stati tutti causati dall'utilizzo di pool per chiamare la funzione definita all'interno di una funzione di classe. Ma questo non è il mio caso.

Exception in thread Thread-3: 
Traceback (most recent call last): 
    File "/usr/lib64/python2.7/threading.py", line 552, in __bootstrap_inner 
    self.run() 
    File "/usr/lib64/python2.7/threading.py", line 505, in run 
    self.__target(*self.__args, **self.__kwargs) 
    File "/usr/lib64/python2.7/multiprocessing/pool.py", line 313, in _handle_tasks 
    put(task) 
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed 

Apprezzerei qualsiasi aiuto.

UPDATE: La funzione I pickle è definita al livello più alto del modulo. Sebbene chiami una funzione che contiene una funzione nidificata. i.e, f() chiama g() chiama h() che ha una funzione nidificata i(), e sto chiamando pool.apply_async (f). f(), g(), h() sono tutti definiti al livello superiore. Ho provato un esempio più semplice con questo modello e funziona comunque.

+1

La risposta di livello superiore/accettata è buona, ma potrebbe significare che è necessario ri-strutturare il codice, che potrebbe essere doloroso. Vorrei raccomandare a chiunque abbia questo problema di leggere anche le risposte aggiuntive utilizzando 'dill' e' pathos'. Tuttavia, non ho fortuna con nessuna delle soluzioni quando lavoro con vtkobjects :(Chiunque è riuscito a eseguire il codice Python nell'elaborazione parallela vtkPolyData? – Chris

risposta

178

Questo è un list of what can be pickled. In particolare, le funzioni sono selezionabili solo se sono definite al livello più alto di un modulo.

questo pezzo di codice:

import multiprocessing as mp 

class Foo(): 
    @staticmethod 
    def work(self): 
     pass 

pool = mp.Pool() 
foo = Foo() 
pool.apply_async(foo.work) 
pool.close() 
pool.join() 

cede un errore quasi identico a quello che hai postato:

Exception in thread Thread-2: 
Traceback (most recent call last): 
    File "/usr/lib/python2.7/threading.py", line 552, in __bootstrap_inner 
    self.run() 
    File "/usr/lib/python2.7/threading.py", line 505, in run 
    self.__target(*self.__args, **self.__kwargs) 
    File "/usr/lib/python2.7/multiprocessing/pool.py", line 315, in _handle_tasks 
    put(task) 
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed 

Il problema è che i pool metodi tutti usano un queue.Queue di passare compiti al processi di lavoro. Tutto ciò che passa attraverso il queue.Queue deve essere selezionabile, e foo.work non è selezionabile poiché non è definito al livello più alto del modulo.

Esso può essere risolto definendo una funzione al livello superiore, che chiama foo.work():

def work(foo): 
    foo.work() 

pool.apply_async(work,args=(foo,)) 

noti che foo è pickable, poiché Foo è definita al livello superiore e foo.__dict__ è serializzabili.

+0

Grazie per la tua risposta Ho aggiornato la mia domanda Non riesco a capire che sia la causa, anche se – CodeNoob

+5

Per ottenere un PicklingError qualcosa deve essere messo sulla Coda che non è selezionabile, potrebbe essere la funzione oi suoi argomenti.Per saperne di più sul problema, ti suggerisco di fare una copia del tuo programma, e iniziare a ridimensionarlo, facendolo più semplice e più semplice, ogni volta riesegui il programma per vedere se il problema persiste. Quando diventa davvero semplice, avrai scoperto il problema da solo o avresti qualcosa da poter postare qui. – unutbu

+1

Inoltre: se si definisce una funzione al livello superiore di un modulo, ma è decorata, quindi il riferimento sarà all'output del decoratore e si otterrà comunque questo errore – bobpoekert

1

Stai passando una serie numerica di stringhe per caso?

Ho avuto lo stesso errore esatto quando passo una matrice che contiene una stringa vuota. Penso che possa essere dovuto a questo bug: http://projects.scipy.org/numpy/ticket/1658

13

Ho scoperto che posso anche generare esattamente quell'uscita di errore su un pezzo di codice perfettamente funzionante tentando di usare il profiler su di esso.

Si noti che questo era su Windows (dove il biforcarsi è un po 'meno elegante).

stavo correndo:

python -m profile -o output.pstats <script> 

e ha scoperto che rimuovendo il profiling rimosso l'errore e mettendo il profiling restaurato esso. Mi stava facendo impazzire anche perché sapevo che il codice funzionava. Stavo controllando per vedere se qualcosa aveva aggiornato pool.py ... poi aveva una sensazione di affondamento ed eliminato la profilazione e basta.

Pubblicazione qui per gli archivi nel caso in cui qualcun altro vi si incontri.

+2

WOW, grazie per averlo menzionato! Mi ha fatto impazzire per l'ultima ora o giù di lì; Ho provato tutto con un esempio molto semplice: niente sembrava funzionare. Ma ho anche eseguito il profiler attraverso il mio batchfile :( – tim

+0

Questo è quello che è successo a me! –

50

Vorrei utilizzare pathos.multiprocesssing, anziché multiprocessing. pathos.multiprocessing è un fork di multiprocessing che utilizza dill. dill può serializzare praticamente qualsiasi cosa in python, quindi puoi inviare molto di più in parallelo. Il fork pathos ha anche la capacità di lavorare direttamente con funzioni a più argomenti, come è necessario per i metodi di classe.

>>> from pathos.multiprocessing import ProcessingPool as Pool 
>>> p = Pool(4) 
>>> class Test(object): 
... def plus(self, x, y): 
...  return x+y 
... 
>>> t = Test() 
>>> p.map(t.plus, x, y) 
[4, 6, 8, 10] 
>>> 
>>> class Foo(object): 
... @staticmethod 
... def work(self, x): 
...  return x+1 
... 
>>> f = Foo() 
>>> p.apipe(f.work, f, 100) 
<processing.pool.ApplyResult object at 0x10504f8d0> 
>>> res = _ 
>>> res.get() 
101 

Get pathos (e se ti piace, dill) qui: https://github.com/uqfoundation

+2

ha funzionato a meraviglia.Per chiunque altro, ho installato entrambe le librerie tramite: 'sudo pip install git + https: // github.com/uqfoundation/dill.git @ master' e ' sudo pip install git + https: //github.com/ uqfoundation/pathos.git @ master' –

+3

@AlexanderMcFarlane Non installerei pacchetti python con 'sudo' (da fonti esterne come github in particolare). Invece, vorrei raccomandare di eseguire: 'pip install --user git + ...' – Chris

+0

L'uso di 'pip install pathos' non funziona male e dà questo messaggio:' Impossibile trovare una versione che soddisfi il requisito pp == 1.5 .7-pathos (from pathos) ' – xApple

15

Come altri hanno detto multiprocessing può trasferire solo Python si oppone a processi di lavoro che possono essere in salamoia. Se non è possibile riorganizzare il codice come descritto da unutbu, è possibile utilizzare le funzionalità di decapaggio/disimpegno estese di dill per il trasferimento dei dati (in particolare i dati del codice) come mostrato di seguito.

Questa soluzione richiede solo l'installazione di dill e non altre librerie come pathos:

import os 
from multiprocessing import Pool 

import dill 


def run_dill_encoded(payload): 
    fun, args = dill.loads(payload) 
    return fun(*args) 


def apply_async(pool, fun, args): 
    payload = dill.dumps((fun, args)) 
    return pool.apply_async(run_dill_encoded, (payload,)) 


if __name__ == "__main__": 

    pool = Pool(processes=5) 

    # asyn execution of lambda 
    jobs = [] 
    for i in range(10): 
     job = apply_async(pool, lambda a, b: (a, b, a * b), (i, i + 1)) 
     jobs.append(job) 

    for job in jobs: 
     print job.get() 
    print 

    # async execution of static method 

    class O(object): 

     @staticmethod 
     def calc(): 
      return os.getpid() 

    jobs = [] 
    for i in range(10): 
     job = apply_async(pool, O.calc,()) 
     jobs.append(job) 

    for job in jobs: 
     print job.get() 
+3

Sono l'autore di 'dill' e' pathos' ... e mentre hai ragione, non è tanto più bello e pulito e più flessibile anche per usare 'pathos' come nella mia risposta? O forse sono un po 'prevenuto ... –

+3

Non ero a conoscenza dello stato di '' pathos'' al momento della scrittura e volevo presentare una soluzione che è molto vicina alla risposta. Ora che ho visto la tua soluzione sono d'accordo che questa è la strada da percorrere. – rocksportrocker

+0

Ho letto la tua soluzione e ho pensato "Doh ... non ho nemmeno pensato di farlo in quel modo". –

4

This solution requires only the installation of dill and no other libraries as pathos

def apply_packed_function_for_map((dumped_function, item, args, kwargs),): 
    """ 
    Unpack dumped function as target function and call it with arguments. 

    :param (dumped_function, item, args, kwargs): 
     a tuple of dumped function and its arguments 
    :return: 
     result of target function 
    """ 
    target_function = dill.loads(dumped_function) 
    res = target_function(item, *args, **kwargs) 
    return res 


def pack_function_for_map(target_function, items, *args, **kwargs): 
    """ 
    Pack function and arguments to object that can be sent from one 
    multiprocessing.Process to another. The main problem is: 
     «multiprocessing.Pool.map*» or «apply*» 
     cannot use class methods or closures. 
    It solves this problem with «dill». 
    It works with target function as argument, dumps it («with dill») 
    and returns dumped function with arguments of target function. 
    For more performance we dump only target function itself 
    and don't dump its arguments. 
    How to use (pseudo-code): 

     ~>>> import multiprocessing 
     ~>>> images = [...] 
     ~>>> pool = multiprocessing.Pool(100500) 
     ~>>> features = pool.map(
     ~...  *pack_function_for_map(
     ~...   super(Extractor, self).extract_features, 
     ~...   images, 
     ~...   type='png' 
     ~...   **options, 
     ~... ) 
     ~...) 
     ~>>> 

    :param target_function: 
     function, that you want to execute like target_function(item, *args, **kwargs). 
    :param items: 
     list of items for map 
    :param args: 
     positional arguments for target_function(item, *args, **kwargs) 
    :param kwargs: 
     named arguments for target_function(item, *args, **kwargs) 
    :return: tuple(function_wrapper, dumped_items) 
     It returs a tuple with 
      * function wrapper, that unpack and call target function; 
      * list of packed target function and its' arguments. 
    """ 
    dumped_function = dill.dumps(target_function) 
    dumped_items = [(dumped_function, item, args, kwargs) for item in items] 
    return apply_packed_function_for_map, dumped_items 

Si lavora anche per gli array numpy.

0
Can't pickle <type 'function'>: attribute lookup __builtin__.function failed 

Questo errore arriverà anche se si dispone di una funzione incorporata all'interno dell'oggetto modello passato al lavoro asincrono.

Quindi assicuratevi di controllare gli oggetti modello che sono passati non ha funzioni integrate. (Nel nostro caso, stavamo usando la funzione FieldTracker() di django-model-utils all'interno del modello per tracciare un determinato campo). Ecco il numero link relativo al problema relativo a GitHub.