2013-02-28 3 views
38

dire che ho un elenco molto grande e sto eseguendo un'operazione in questo modo:Come multi-thread un'operazione all'interno di un ciclo in Python

for item in items: 
    try: 
     api.my_operation(item) 
    except: 
     print 'error with item' 

Il mio problema è duplice:

  • ci sono un sacco di elementi
  • api.my_operation prende per sempre per tornare

mi piacerebbe usare il multi-threading per girare un Bunc h di api.my_operations in una sola volta in modo da poter elaborare forse 5 o 10 o anche 100 elementi contemporaneamente.

Se my_operation() restituisce un'eccezione (perché forse ho già elaborato quell'elemento) - è OK. Non romperà nulla. Il ciclo può continuare con l'elemento successivo.

Nota: questo è per Python 2.7.3

+1

Provare ad abilitare il mio codice (per il ciclo) per eseguire in questo modo ma non sono sicuro da dove proviene l'API. NameError: nome 'api' non definito – radtek

risposta

66

In primo luogo, in Python, se il codice è CPU-bound, multithreading sarà non aiuta, perché solo un thread può tenere il Global Interpreter Lock, e quindi esegui il codice Python alla volta. Quindi, è necessario utilizzare i processi, non i thread.

Questo non è vero se l'operazione "richiede sempre un ritorno" perché è legata all'IO, ovvero, in attesa sulla rete o su copie del disco o simili. Tornerò su dopo.


Avanti, il modo di trattare 5 o 10 o 100 elementi in una sola volta è quello di creare un pool di 5 o 10 o 100 lavoratori, e mettere gli elementi in una coda che il servizio lavoratori. Fortunatamente, le librerie stdlib multiprocessing e concurrent.futures racchiudono automaticamente la maggior parte dei dettagli.

Il primo è più potente e flessibile per la programmazione tradizionale; il secondo è più semplice se devi comporre l'attesa del futuro; per casi banali, non importa quale scegli tu. (In questo caso, l'implementazione più evidente con ogni prende 3 linee con futures, 4 linee con multiprocessing.)

Se stai usando 2,6-2,7 o 3,0-3,1, futures non è integrato, ma è possibile installarlo da PyPI (pip install futures).


Infine, di solito è molto più semplice per parallelizzare le cose se si può trasformare l'intera iterazione del ciclo in una chiamata di funzione (qualcosa che si potrebbe, ad esempio, passare a map), quindi cerchiamo di fare quel primo:

def try_my_operation(item): 
    try: 
     api.my_operation(item) 
    except: 
     print('error with item') 

Mettere tutto insieme:

executor = concurrent.futures.ProcessPoolExecutor(10) 
futures = [executor.submit(try_my_operation, item) for item in items] 
concurrent.futures.wait(futures) 

Se si dispone di molti lavori relativamente piccoli, l'overhead del multiprocessing potrebbe sommergere i guadagni. Il modo per risolvere questo è far ripartire il lavoro in lavori più grandi.Per esempio (utilizzando grouper dal itertools recipes, che è possibile copiare e incollare nel vostro codice, o ottenere dal progetto more-itertools su PyPI):

def try_multiple_operations(items): 
    for item in items: 
     try: 
      api.my_operation(item) 
     except: 
      print('error with item') 

executor = concurrent.futures.ProcessPoolExecutor(10) 
futures = [executor.submit(try_multiple_operations, group) 
      for group in grouper(5, items)] 
concurrent.futures.wait(futures) 

Infine, che cosa se il codice è legato IO? Quindi i thread sono ugualmente buoni come i processi e con meno overhead (e meno limitazioni, ma queste limitazioni di solito non influiranno su casi come questo). A volte questo "meno overhead" è sufficiente a significare che non è necessario eseguire il batching con i thread, ma lo si fa con i processi, che è una bella vittoria.

Quindi, come si utilizzano i thread anziché i processi? Basta cambiare ProcessPoolExecutor a ThreadPoolExecutor.

Se non sei sicuro che il tuo codice sia limitato dalla CPU o legato all'IO, provalo in entrambi i modi.


Can I do this for multiple functions in my python script? For example, if I had another for loop elsewhere in the code that I wanted to parallelize. Is it possible to do two multi threaded functions in the same script?

Sì. In realtà, ci sono due modi diversi per farlo.

In primo luogo, è possibile condividere lo stesso esecutore (thread o processo) e utilizzarlo da più posizioni senza problemi. L'intero punto dei compiti e dei futures è che sono autonomi; non ti importa dove corrono, solo che li metti in fila e alla fine ottieni la risposta.

In alternativa, è possibile avere due esecutori nello stesso programma senza problemi. Questo ha un costo in termini di prestazioni, se si usano entrambi gli esecutori contemporaneamente, si finirà per provare ad eseguire (ad esempio) 16 thread occupati su 8 core, il che significa che ci sarà un cambio di contesto. Ma a volte vale la pena farlo perché, ad esempio, i due esecutori sono raramente occupati allo stesso tempo e rende il tuo codice molto più semplice. O forse un executor sta eseguendo attività molto grandi che possono richiedere del tempo per essere completate, mentre l'altro sta eseguendo attività molto piccole che devono essere completate il più rapidamente possibile, perché la reattività è più importante della velocità effettiva per una parte del programma.

Se non sai quale è appropriato per il tuo programma, di solito è il primo.

+0

Devo installare simultaneamente? Come? Sto usando Python 2.7.3 e non riesce a trovare il modulo simultaneo. * modifica * sembra concomitante è disponibile solo in 3.2. Bummer. – doremi

+1

@doremi: pensavo avessi usato 3.x, perché stai chiamando 'print' come una funzione piuttosto che usarla come una dichiarazione. Ma se si utilizza 2.x, è possibile installare 'futures' da [PyPI] (https://pypi.python.org/pypi/futures) (ad esempio,' pip install futures') e solo 'import futures'. invece di "import concurrent.futures". (Oppure puoi usare il multiprocessing, che non è molto più complicato, significa solo 4 linee di codice anziché 3) – abarnert

+0

Questa è una risposta molto approfondita @abarnert, come al solito :) Vedi anche la mia risposta per un versione estremamente portatile dell'implementazione (utilizzando tutto ciò che è disponibile dalla 2.7.x). – woozyking

6

È possibile dividere la trasformazione in un determinato numero di thread con un approccio simile a questo:

import threading                 

def process(items, start, end):             
    for item in items[start:end]:            
     try:                  
      api.my_operation(item)            
     except Exception:              
      print('error with item')            


def split_processing(items, num_splits=4):          
    split_size = len(items) // num_splits          
    threads = []                 
    for i in range(num_splits):             
     # determine the indices of the list this thread will handle    
     start = i * split_size             
     # special case on the last chunk to account for uneven splits   
     end = None if i+1 == num_splits else (i+1) * split_size     
     # create the thread              
     threads.append(               
      threading.Thread(target=process, args=(items, start, end)))   
     threads[-1].start() # start the thread we just created     

    # wait for all threads to finish            
    for t in threads:               
     t.join()                 



split_processing(items) 
16

Edit 2018/02/06: revisione basata su this comment

Edit: dimenticato per menzionare che questo funziona su Python 2.7.x

C'è multiprocesing.pool e il seguente esempio illustra come utilizzare uno di essi:

from multiprocessing.pool import ThreadPool as Pool 
# from multiprocessing import Pool 

pool_size = 5 # your "parallelness" 

# define worker function before a Pool is instantiated 
def worker(item): 
    try: 
     api.my_operation(item) 
    except: 
     print('error with item') 

pool = Pool(pool_size) 

for item in items: 
    pool.apply_async(worker, (item,)) 

pool.close() 
pool.join() 

Ora, se davvero identificare che il processo è destinato CPU come @abarnert menzionato, il cambiamento ThreadPool alla realizzazione pool di processi (commentato in regime di ThreadPool). È possibile trovare ulteriori dettagli qui: http://docs.python.org/2/library/multiprocessing.html#using-a-pool-of-workers

+2

Si noti che 'multiprocessing.ThreadPool' non è affatto documentato. Tuttavia, 'multiprocessing.dummy.Pool' è la stessa classe, ed è _is_ [documentato] (https://docs.python.org/2/library/multiprocessing.html#module-multiprocessing.dummy). È discutibile che il primo sia più esplicitamente significativo, e vale la pena utilizzarlo anche se non è tecnicamente garantito funzionare. Ad ogni modo, potresti volere un commento (spiegando che quest'ultimo è un threadpool nonostante il nome, o il primo è presente in tutte le versioni di CPython 2.7 anche se non è documentato). – abarnert

+1

Per tutti gli altri noob di Python come me: se questo non funziona, assicurati che 'worker' sia definito prima di creare il tuo pool! (per https://stackoverflow.com/a/2783017/398316) – M2X

+0

@ M2X buona chiamata. Revisionerò la risposta di conseguenza – woozyking