Ero sicuro che ci fosse qualcosa di simile nella libreria standard, ma sembra che mi sbagliavo.Python: Qualcosa come `map` che funziona sui thread

Ho un sacco di URL che voglio urlopen in parallelo. Voglio qualcosa come la funzione integrata map, tranne che il lavoro è svolto in parallelo da un mucchio di thread.

C'è un buon modulo che fa questo?


vuoi dire che si desidera mappare iniziare le discussioni (come la risposta di pillmuncher, mappa (urlopen, urls)), o inizierai manualmente i thread di apertura e vorresti qualcosa come la mappa per agire sui risultati dell'esecuzione di ogni thread, man mano che diventano disponibili? – rbp



Qualcuno ha consigliato di utilizzare il pacchetto futures per questo. L'ho provato e sembra che funzioni.


Ecco un esempio:

"Download many URLs in parallel." 

import functools 
import urllib.request 
import futures 

URLS = ['http://www.foxnews.com/', 

def load_url(url, timeout): 
    return urllib.request.urlopen(url, timeout=timeout).read() 

with futures.ThreadPoolExecutor(50) as executor: 
    future_list = executor.run_to_futures(
      [functools.partial(load_url, url, 30) for url in URLS]) 

avrei avvolgetelo in una funzione (non testata):

import itertools 
import threading 
import urllib2 
import Queue 

def openurl(url, queue): 
    def starter(): 
      result = urllib2.urlopen(url) 
     except Ecxeption, exc: 
      def raiser(): 
       raise exc 
      queue.put((url, raiser)) 
      queue.put((url, lambda:result)) 

myurls = ... # the list of urls 
myqueue = Queue.Queue() 

map(openurl, myurls, itertools.repeat(myqueue)) 

for each in myurls: 
    url, getresult = queue.get() 
     result = getresult() 
    except Exception, exc: 
     print 'exception raised:' + str(exc) 
     # do stuff with result 

c'è un metodo map in multiprocessing.Pool. Questo fa più processi.

E se più processi non sono il vostro piatto, è possibile utilizzare multiprocessing.dummy che utilizza thread.

import urllib 
import multiprocessing.dummy 

p = multiprocessing.dummy.Pool(5) 
def f(post): 
    return urllib.urlopen('http://stackoverflow.com/questions/%u' % post) 

print p.map(f, range(3329361, 3329361 + 5)) 

Questo è ottimo, ma non funzionerà su python2.6 se eseguito da un thread a causa di questo bug: http://bugs.python.org/issue14881 – gregsabo


Funziona alla grande in python 2.79 - attualmente l'ultima versione di 2x, e abbastanza bene anche a questo! – FredTheWebGuy


Qui è la mia applicazione della carta filettato:

from threading import Thread 
from queue import Queue 

def thread_map(f, iterable, pool=None): 
    Just like [f(x) for x in iterable] but each f(x) in a separate thread. 
    :param f: f 
    :param iterable: iterable 
    :param pool: thread pool, infinite by default 
    :return: list if results 
    res = {} 
    if pool is None: 
     def target(arg, num): 
       res[num] = f(arg) 
       res[num] = sys.exc_info() 

     threads = [Thread(target=target, args=[arg, i]) for i, arg in enumerate(iterable)] 
     class WorkerThread(Thread): 
      def run(self): 
       while True: 
         num, arg = queue.get(block=False) 
          res[num] = f(arg) 
          res[num] = sys.exc_info() 
        except Empty: 

     queue = Queue() 
     for i, arg in enumerate(iterable): 
      queue.put((i, arg)) 

     threads = [WorkerThread() for _ in range(pool)] 

    [t.start() for t in threads] 
    [t.join() for t in threads] 
    return [res[i] for i in range(len(res))] 

È necessario importare 'Vuoto' sulla seconda riga. – speedplane