2010-07-25 5 views
23

Ero sicuro che ci fosse qualcosa di simile nella libreria standard, ma sembra che mi sbagliavo.Python: Qualcosa come `map` che funziona sui thread

Ho un sacco di URL che voglio urlopen in parallelo. Voglio qualcosa come la funzione integrata map, tranne che il lavoro è svolto in parallelo da un mucchio di thread.

C'è un buon modulo che fa questo?

+1

vuoi dire che si desidera mappare iniziare le discussioni (come la risposta di pillmuncher, mappa (urlopen, urls)), o inizierai manualmente i thread di apertura e vorresti qualcosa come la mappa per agire sui risultati dell'esecuzione di ogni thread, man mano che diventano disponibili? – rbp

risposta

10

Qualcuno ha consigliato di utilizzare il pacchetto futures per questo. L'ho provato e sembra che funzioni.

http://pypi.python.org/pypi/futures

Ecco un esempio:

"Download many URLs in parallel." 

import functools 
import urllib.request 
import futures 

URLS = ['http://www.foxnews.com/', 
     'http://www.cnn.com/', 
     'http://europe.wsj.com/', 
     'http://www.bbc.co.uk/', 
     'http://some-made-up-domain.com/'] 

def load_url(url, timeout): 
    return urllib.request.urlopen(url, timeout=timeout).read() 

with futures.ThreadPoolExecutor(50) as executor: 
    future_list = executor.run_to_futures(
      [functools.partial(load_url, url, 30) for url in URLS]) 
0

avrei avvolgetelo in una funzione (non testata):

import itertools 
import threading 
import urllib2 
import Queue 

def openurl(url, queue): 
    def starter(): 
     try: 
      result = urllib2.urlopen(url) 
     except Ecxeption, exc: 
      def raiser(): 
       raise exc 
      queue.put((url, raiser)) 
     else: 
      queue.put((url, lambda:result)) 
    threadind.Thread(target=starter).start() 

myurls = ... # the list of urls 
myqueue = Queue.Queue() 

map(openurl, myurls, itertools.repeat(myqueue)) 

for each in myurls: 
    url, getresult = queue.get() 
    try: 
     result = getresult() 
    except Exception, exc: 
     print 'exception raised:' + str(exc) 
    else: 
     # do stuff with result 
34

c'è un metodo map in multiprocessing.Pool. Questo fa più processi.

E se più processi non sono il vostro piatto, è possibile utilizzare multiprocessing.dummy che utilizza thread.

import urllib 
import multiprocessing.dummy 

p = multiprocessing.dummy.Pool(5) 
def f(post): 
    return urllib.urlopen('http://stackoverflow.com/questions/%u' % post) 

print p.map(f, range(3329361, 3329361 + 5)) 
+0

Questo è ottimo, ma non funzionerà su python2.6 se eseguito da un thread a causa di questo bug: http://bugs.python.org/issue14881 – gregsabo

+0

Funziona alla grande in python 2.79 - attualmente l'ultima versione di 2x, e abbastanza bene anche a questo! – FredTheWebGuy

1

Qui è la mia applicazione della carta filettato:

from threading import Thread 
from queue import Queue 

def thread_map(f, iterable, pool=None): 
    """ 
    Just like [f(x) for x in iterable] but each f(x) in a separate thread. 
    :param f: f 
    :param iterable: iterable 
    :param pool: thread pool, infinite by default 
    :return: list if results 
    """ 
    res = {} 
    if pool is None: 
     def target(arg, num): 
      try: 
       res[num] = f(arg) 
      except: 
       res[num] = sys.exc_info() 

     threads = [Thread(target=target, args=[arg, i]) for i, arg in enumerate(iterable)] 
    else: 
     class WorkerThread(Thread): 
      def run(self): 
       while True: 
        try: 
         num, arg = queue.get(block=False) 
         try: 
          res[num] = f(arg) 
         except: 
          res[num] = sys.exc_info() 
        except Empty: 
         break 

     queue = Queue() 
     for i, arg in enumerate(iterable): 
      queue.put((i, arg)) 

     threads = [WorkerThread() for _ in range(pool)] 

    [t.start() for t in threads] 
    [t.join() for t in threads] 
    return [res[i] for i in range(len(res))] 
+0

È necessario importare 'Vuoto' sulla seconda riga. – speedplane