2016-02-01 14 views
7

Ho una sottoclasse asyncio.Protocol che riceve dati da un server. Sto memorizzando questi dati (ogni riga, perché i dati sono testo) in un asyncio.Queue.asyncio coda consumatore coroutine

import asyncio 

q = asyncio.Queue() 

class StreamProtocol(asyncio.Protocol): 
    def __init__(self, loop): 
     self.loop = loop 
     self.transport = None 

    def connection_made(self, transport): 
     self.transport = transport 

    def data_received(self, data): 
     for message in data.decode().splitlines(): 
      yield q.put(message.rstrip()) 

    def connection_lost(self, exc): 
     self.loop.stop() 

loop = asyncio.get_event_loop() 
coro = loop.create_connection(lambda: StreamProtocol(loop), 
           '127.0.0.1', '42') 
loop.run_until_complete(coro) 
loop.run_forever() 
loop.close() 

Desidero avere un'altra coroutine responsabile del consumo dei dati nella coda e dell'elaborazione.

  • Questo dovrebbe essere un asyncio.Task?
  • Cosa succede se la coda si svuota perché per alcuni secondi non viene ricevuto alcun dato? Come posso assicurarmi che il mio cliente non si fermi (run_until_complete)?
  • C'è un modo più pulito rispetto all'utilizzo di una variabile globale per la mia coda?
+0

Il codice è sbagliato, mi dispiace: 'data_received' dovrebbe essere la funzione normale, non un coroutine con 'rendimento' dentro. Inoltre 'asyncio.Queue' richiede' yield from', non solo 'yield'. –

+0

Ah, giusto. L'ho messo lì senza testarlo solo per dare l'idea di cosa volevo fare. – valentin

risposta

5

Questo dovrebbe essere un asyncio.Task?

Sì, crearlo utilizzando asyncio.ensure_future o loop.create_task.

Cosa succede se la coda si svuota perché per alcuni secondi non viene ricevuto alcun dato?

Basta usare queue.get aspettare fino a quando un articolo è disponibile:

async def consume(queue): 
    while True: 
     item = await queue.get() 
     print(item) 

C'è un modo più pulito rispetto all'utilizzo di una variabile globale per la mia coda?

Sì, semplicemente passarlo come argomento al protocollo coroutine consumatori e flusso:

class StreamProtocol(asyncio.Protocol): 
    def __init__(self, loop, queue): 
     self.loop = loop 
     self.queue = queue 

    def data_received(self, data): 
     for message in data.decode().splitlines(): 
      self.queue.put_nowait(message.rstrip()) 

    def connection_lost(self, exc): 
     self.loop.stop() 

Come posso assicurarmi che la mia consumatore non si ferma (run_until_complete)?

Una volta chiusa la connessione, utilizzare queue.join per attendere fino a quando la coda è vuota.


esempio completa:

loop = asyncio.get_event_loop() 
queue = asyncio.Queue() 
# Connection coroutine 
factory = lambda: StreamProtocol(loop, queue) 
connection = loop.create_connection(factory, '127.0.0.1', '42') 
# Consumer task 
consumer = asyncio.ensure_future(consume(queue)) 
# Set up connection 
loop.run_until_complete(connection) 
# Wait until the connection is closed 
loop.run_forever() 
# Wait until the queue is empty 
loop.run_until_complete(queue.join()) 
# Cancel the consumer 
consumer.cancel() 
# Let the consumer terminate 
loop.run_until_complete(consumer) 
# Close the loop 
loop.close() 

In alternativa, è anche possibile utilizzare streams:

async def tcp_client(host, port, loop=None): 
    reader, writer = await asyncio.open_connection(host, port, loop=loop) 
    async for line in reader: 
     print(line.rstrip()) 
    writer.close() 

loop = asyncio.get_event_loop() 
loop.run_until_complete(tcp_client('127.0.0.1', 42, loop)) 
loop.close() 
+0

Grazie! Sembra il modo giusto per farlo. Penso che ci sia un problema con il tuo esempio completo, la variabile 'coro' non esiste – valentin

+0

@toogy True, l'ho appena corretto. – Vincent

+0

Perfetto. Solo un'ultima cosa. Cosa succede se voglio che il mio consumatore sia più di una funzione (intendo una classe)? Dovrei semplicemente ereditare la classe 'asyncio.Task'? – valentin