2013-04-29 3 views
15

Desidero qualcosa di simile a executor.map, tranne quando eseguo iterazioni sui risultati, voglio eseguirne l'iterazione in base all'ordine di completamento, ad es. l'elemento di lavoro che è stato completato per primo dovrebbe apparire prima nell'iterazione, ecc. In questo modo l'iterazione bloccherà se non è ancora finito ogni singolo elemento di lavoro nella sequenza.`concurrent.futures` di Python: Iterate su futures in base all'ordine di completamento

So come implementarlo da solo utilizzando le code, ma mi chiedo se sia possibile utilizzare il framework futures.

(Io per lo più usati esecutori filo-based, quindi mi piacerebbe una risposta che si applica a questi, ma una risposta generale sarebbe il benvenuto pure.)

AGGIORNAMENTO: Grazie per le risposte! Puoi spiegare come posso usare as_completed con executor.map? executor.map è lo strumento più utile e conciso per me quando utilizzo i future e sarei riluttante a iniziare a utilizzare gli oggetti Future manualmente.

+0

Sei fortunato! – damzam

risposta

25

executor.map(), come il comando incorporato map(), restituisce solo i risultati nell'ordine del iterabile, in modo purtroppo non si può utilizzare per determinare l'ordine di completamento. concurrent.futures.as_completed() è quello che stai cercando - ecco un esempio:

import time 
import concurrent.futures 

times = [3, 1, 2] 

def sleeper(secs): 
    time.sleep(secs) 
    print('I slept for {} seconds'.format(secs)) 
    return secs 

# returns in the order given 
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor: 
    print(list(executor.map(sleeper, times))) 

# I slept for 1 seconds 
# I slept for 2 seconds 
# I slept for 3 seconds 
# [3, 1, 2] 

# returns in the order completed 
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor: 
    futs = [executor.submit(sleeper, secs) for secs in times] 
    print([fut.result() for fut in concurrent.futures.as_completed(futs)]) 

# I slept for 1 seconds 
# I slept for 2 seconds 
# I slept for 3 seconds 
# [1, 2, 3] 

Naturalmente se si sono tenuti a utilizzare un'interfaccia mappa, è possibile creare il proprio map_as_completed() funzione che incapsula il sopra (forse aggiungerlo a una sottoclasse Executor()), ma penso che la creazione di istanze future tramite executor.submit() sia un modo più semplice/pulito di andare (consente anche di fornire no-args, kwargs).

0

From python doc

concurrent.futures.as_completed(fs, timeout=None)¶ 

restituisce un iteratore sopra le istanze future (eventualmente creati da diverso esecutore istanze) fornite dal fs che i rendimenti dei futures in quanto completa (finito o sono stati annullati). Tutti i futures completati prima che venga chiamato as_completed() verranno restituiti per primi. L'iteratore restituito genera un errore Timeout se viene chiamato successivo() e il risultato non è disponibile dopo un timeout di secondi dalla chiamata originale a as_completed(). Il timeout può essere int o float. Se timeout non è specificato o None, non c'è limite al tempo di attesa.

Si avrebbe bisogno di capire differenza tra executor.map() e executor.submit(). Il primo mappa una funzione in un vettore di argomenti. È molto simile a map, ma avvia le attività in modo asincrono. submit(func, arg) avvia un'attività ad ogni chiamata. In questa attività, func viene applicato a arg.

Ecco un esempio per l'utilizzo di as_completed() con submit() che è possibile eseguire su python 3.0

from concurrent import futures 
import urllib.request 

URLS = ['http://www.foxnews.com/', 
     'http://www.cnn.com/', 
     'http://europe.wsj.com/', 
     'http://www.bbc.co.uk/', 
     'http://some-made-up-domain.com/'] 

def load_url(url, timeout): 
    return urllib.request.urlopen(url, timeout=timeout).read() 

def main(): 
    with futures.ThreadPoolExecutor(max_workers=5) as executor: 
     future_to_url = dict(
      (executor.submit(load_url, url, 60), url) 
      for url in URLS) 

     for future in futures.as_completed(future_to_url): 
      url = future_to_url[future] 
      try: 
       print('%r page is %d bytes' % (
          url, len(future.result()))) 
      except Exception as e: 
       print('%r generated an exception: %s' % (
          url, e)) 

if __name__ == '__main__': 
    main() 

senza map() è usato qui, le attività vengono eseguite con submit e as_completed()

restituisce un iteratore sopra le istanze future fornite dal fs che i rendimenti futuro in quanto completa (finito o sono stati cancellati) .