17

Ho un'applicazione python multi-threaded. Voglio eseguire un ciclo asyncio in un thread e postare calback e coroutine da un altro thread. Dovrebbe essere facile, ma non riesco a capire come funziona il asyncio.python asyncio, come creare e cancellare compiti da un altro thread

sono arrivato fino alla soluzione seguente che fa la metà di quello che voglio, sentitevi liberi di commentare nulla:

import asyncio 
from threading import Thread 

class B(Thread): 
    def __init__(self): 
     Thread.__init__(self) 
     self.loop = None 

    def run(self): 
     self.loop = asyncio.new_event_loop() 
     asyncio.set_event_loop(self.loop) #why do I need that?? 
     self.loop.run_forever() 

    def stop(self): 
     self.loop.call_soon_threadsafe(self.loop.stop) 

    def add_task(self, coro): 
     """this method should return a task object, that I 
      can cancel, not a handle""" 
     f = functools.partial(self.loop.create_task, coro) 
     return self.loop.call_soon_threadsafe(f) 

    def cancel_task(self, xx): 
     #no idea 

@asyncio.coroutine 
def test(): 
    while True: 
     print("running") 
     yield from asyncio.sleep(1) 

b.start() 
time.sleep(1) #need to wait for loop to start 
t = b.add_task(test()) 
time.sleep(10) 
#here the program runs fine but how can I cancel the task? 

b.stop() 

Quindi, a partire e l'arresto del ciclo funziona bene. Ho pensato di creare un compito usando create_task, ma quel metodo non è protetto da thread, quindi l'ho spostato in call_soon_threadsafe. Ma mi piacerebbe essere in grado di ottenere l'oggetto compito per poter annullare l'attività. Potrei fare una roba complicata usando Future e Condition, ma ci deve essere un modo più semplice, no?

risposta

13

Penso che potrebbe essere necessario rendere il vostro metodo add_task consapevole della sua chiamata da un thread diverso dal ciclo di eventi. In questo modo, se viene chiamato dallo stesso thread, è possibile chiamare direttamente lo asyncio.async, altrimenti, può eseguire un lavoro aggiuntivo per passare l'attività dal thread del ciclo al thread chiamante. Ecco un esempio:

import time 
import asyncio 
import functools 
from threading import Thread, current_thread, Event 
from concurrent.futures import Future 

class B(Thread): 
    def __init__(self, start_event): 
     Thread.__init__(self) 
     self.loop = None 
     self.tid = None 
     self.event = start_event 

    def run(self): 
     self.loop = asyncio.new_event_loop() 
     asyncio.set_event_loop(self.loop) 
     self.tid = current_thread() 
     self.loop.call_soon(self.event.set) 
     self.loop.run_forever() 

    def stop(self): 
     self.loop.call_soon_threadsafe(self.loop.stop) 

    def add_task(self, coro): 
     """this method should return a task object, that I 
      can cancel, not a handle""" 
     def _async_add(func, fut): 
      try: 
       ret = func() 
       fut.set_result(ret) 
      except Exception as e: 
       fut.set_exception(e) 

     f = functools.partial(asyncio.async, coro, loop=self.loop) 
     if current_thread() == self.tid: 
      return f() # We can call directly if we're not going between threads. 
     else: 
      # We're in a non-event loop thread so we use a Future 
      # to get the task from the event loop thread once 
      # it's ready. 
      fut = Future() 
      self.loop.call_soon_threadsafe(_async_add, f, fut) 
      return fut.result() 

    def cancel_task(self, task): 
     self.loop.call_soon_threadsafe(task.cancel) 


@asyncio.coroutine 
def test(): 
    while True: 
     print("running") 
     yield from asyncio.sleep(1) 

event = Event() 
b = B(event) 
b.start() 
event.wait() # Let the loop's thread signal us, rather than sleeping 
t = b.add_task(test()) # This is a real task 
time.sleep(10) 
b.stop() 

Innanzitutto, si salva l'ID del ciclo di eventi nel metodo run, in modo da poter capire se chiamate verso add_task provengono da altri thread tardi. Se add_task viene chiamato da un thread del ciclo non evento, si utilizza call_soon_threadsafe per chiamare una funzione che pianificherà la coroutine e quindi utilizzerà un concurrent.futures.Future per passare l'attività al thread chiamante, che attende il risultato dello Future.

Una nota sulla cancellazione di un compito: è quando si chiama cancel su un Task, un CancelledError verrà sollevata nel coroutine la prossima volta che il ciclo viene eseguito evento. Ciò significa che la coroutine che il Task sta eseguendo verrà annullata a causa dell'eccezione la prossima volta che raggiunge un punto di snervamento, a meno che la coroutine non rilevi lo CancelledError e si impedisca di abortire. Si noti inoltre che questo funziona solo se la funzione che si sta avviando è in realtà una coroutine interrompibile; un restituito da BaseEventLoop.run_in_executor, ad esempio, non può davvero essere annullato, perché in realtà è racchiuso attorno a uno concurrent.futures.Future, e quelli non possono essere annullati una volta che la loro funzione sottostante inizia effettivamente l'esecuzione. In questi casi, il asyncio.Future dirà che è stato annullato, ma la funzione attualmente in esecuzione nell'esecutore continuerà a essere eseguita.

Edit: Aggiornato il primo esempio di utilizzare concurrent.futures.Future, al posto di un queue.Queue, per suggerimento di Andrew Svetlov.

Nota: asyncio.async è obsoleto poiché la versione 3.4.4 utilizza invece asyncio.ensure_future.

+0

Grazie per l'esempio mi ha aiutato a risolvere diversi problemi che avevo. Dovevo anche installare Future with Future (loop = self.loop), altrimenti in alcuni casi il futuro avrebbe preso il ciclo sbagliato –

+0

@OlivierRD Dovresti usare 'concurrent.futures.Future', non' asyncio.Future'. 'concurrent.futures.Future' non accetta un arugment di parola chiave' loop'. – dano

+0

la documentazione sembra dire che lo fa: https://docs.python.org/3/library/asyncio-task.html#asyncio.Future –

6

Fai tutto bene. Per compito arresto metodo make

class B(Thread): 
    # ... 
    def cancel(self, task): 
     self.loop.call_soon_threadsafe(task.cancel) 

BTW si hanno impostare un ciclo di eventi per il thread creato esplicitamente per

self.loop = asyncio.new_event_loop() 
asyncio.set_event_loop(self.loop) 

perché asyncio crea ciclo di eventi implicita solo per thread principale.

+0

Il pezzo mancante qui è come ottenere la maniglia per il 'task' in primo luogo. Poiché l'OP deve usare 'call_soon_threadsafe (self.loop.create_task)' nel metodo 'add_task', in realtà non ha un handle per l'attività dopo averlo aggiunto al ciclo. – dano

+1

Capito. Hai ragione. @dano Con BTW puoi usare concurrent.futures.Future invece di Queue nella tua risposta. Penso sia più pulito. –

+0

Sì, sono d'accordo che l'utilizzo di un 'Future' è più bello di un' Queue'. Ho aggiornato la mia risposta per riflettere questo. Grazie! – dano

5

solo per riferimento qui il codice che ho finalmente implementato in base all'aiuto che ho ottenuto su questo sito, è più semplice poiché non ho bisogno di tutte le funzionalità. grazie ancora!

class B(Thread): 
    def __init__(self): 
     Thread.__init__(self) 
     self.loop = None 

    def run(self): 
     self.loop = asyncio.new_event_loop() 
     asyncio.set_event_loop(self.loop) 
     self.loop.run_forever() 

    def stop(self): 
     self.loop.call_soon_threadsafe(self.loop.stop) 

    def _add_task(self, future, coro): 
     task = self.loop.create_task(coro) 
     future.set_result(task) 

    def add_task(self, coro): 
     future = Future() 
     p = functools.partial(self._add_task, future, coro) 
     self.loop.call_soon_threadsafe(p) 
     return future.result() #block until result is available 

    def cancel(self, task): 
     self.loop.call_soon_threadsafe(task.cancel) 
2

Dalla versione 3.4.4 asyncio fornisce una funzione chiamata run_coroutine_threadsafe presentare un oggetto coroutine da un filo ad un ciclo di eventi. Restituisce un concurrent.futures.Future per accedere al risultato o annullare l'attività.

Usando il tuo esempio:

@asyncio.coroutine 
def test(loop): 
    try: 
     while True: 
      print("Running") 
      yield from asyncio.sleep(1, loop=loop) 
    except asyncio.CancelledError: 
     print("Cancelled") 
     loop.stop() 
     raise 

loop = asyncio.new_event_loop() 
thread = threading.Thread(target=loop.run_forever) 
future = asyncio.run_coroutine_threadsafe(test(loop), loop) 

thread.start() 
time.sleep(5) 
future.cancel() 
thread.join() 
+0

Per evitare da una condizione di competizione o deadlock, non chiamare 'future.cancel()' direttamente. Usa invece 'loop.call_soon_threadsafe (future.cancel)'. Vedi [qui] (https://docs.python.org/3.4/library/asyncio-dev.html#concurrency-and-multithreading). – changyuheng

+1

@ ChangYu-heng Questo è vero per i futures [asyncio.Future] (https://docs.python.org/3.4/library/asyncio-task.html#asyncio.Future), ma [run_coroutine_threadsafe] (https: // docs.python.org/3.4/library/asyncio-task.html#asyncio.run_coroutine_threadsafe) restituisce un [concurrent.futures.Future] (https://docs.python.org/3.4/library/concurrent.futures.html# concurrent.futures.Future) che è thread-safe e non dipende da alcun loop di eventi. – Vincent

+0

@Vicent Spiacente di non aver letto attentamente la domanda originale. Quindi un ulteriore commento per questo sarebbe: usare 'loop.call_soon_threadsafe (future.cancel)' se si sta per eseguire 'future.cancel()' dal thread che non è il ciclo di eventi in cui si vive. – changyuheng