Ho una sottoclasse asyncio.Protocol
che riceve dati da un server. Sto memorizzando questi dati (ogni riga, perché i dati sono testo) in un asyncio.Queue
.asyncio coda consumatore coroutine
import asyncio
q = asyncio.Queue()
class StreamProtocol(asyncio.Protocol):
def __init__(self, loop):
self.loop = loop
self.transport = None
def connection_made(self, transport):
self.transport = transport
def data_received(self, data):
for message in data.decode().splitlines():
yield q.put(message.rstrip())
def connection_lost(self, exc):
self.loop.stop()
loop = asyncio.get_event_loop()
coro = loop.create_connection(lambda: StreamProtocol(loop),
'127.0.0.1', '42')
loop.run_until_complete(coro)
loop.run_forever()
loop.close()
Desidero avere un'altra coroutine responsabile del consumo dei dati nella coda e dell'elaborazione.
- Questo dovrebbe essere un
asyncio.Task
? - Cosa succede se la coda si svuota perché per alcuni secondi non viene ricevuto alcun dato? Come posso assicurarmi che il mio cliente non si fermi (
run_until_complete
)? - C'è un modo più pulito rispetto all'utilizzo di una variabile globale per la mia coda?
Il codice è sbagliato, mi dispiace: 'data_received' dovrebbe essere la funzione normale, non un coroutine con 'rendimento' dentro. Inoltre 'asyncio.Queue' richiede' yield from', non solo 'yield'. –
Ah, giusto. L'ho messo lì senza testarlo solo per dare l'idea di cosa volevo fare. – valentin