Mi sto abituando ad asyncio e trovo la gestione delle attività abbastanza piacevole, ma può essere difficile mescolare le librerie asincrone con le librerie io tradizionali. Il problema che sto affrontando attualmente è come decodificare correttamente uno StreamReader asincrono.Asyncio decodifica utf-8 con StreamReader
La soluzione più semplice è quella di read()
blocchi di stringhe di byte e quindi decodificare ogni blocco, vedere il codice riportato di seguito. (Nel mio programma, non vorrei stampare ogni pezzo, ma decodificarlo in una stringa e mando in un altro metodo per l'elaborazione):
import asyncio
import aiohttp
async def get_data(port):
url = 'http://localhost:{}/'.format(port)
r = await aiohttp.get(url)
stream = r.content
while not stream.at_eof():
data = await stream.read(4)
print(data.decode('utf-8'))
Questo funziona bene, fino a quando v'è una caratteri UTF-8, che è diviso tra troppi pezzi. Ad esempio se la risposta è b'M\xc3\xa4dchen mit Bi\xc3\x9f\n'
, quindi lettura blocchi di 3 funzioneranno, ma blocchi di 4 non (come \xc3
e \x9f
sono in diversi pezzi e decodificare il pezzo termina con \xc3
genererà il seguente errore:
UnicodeDecodeError: 'utf-8' codec can't decode byte 0xc3 in position 3: unexpected end of data
Ho cercato soluzioni appropriate a questo problema e, almeno nel mondo dei blocchi, sembra sia io.TextIOWrapper o codecs.treamReaderWriter (le cui differenze sono discusse in PEP 0400). Tuttavia, entrambi si basano su tipici flussi bloccanti
Ho trascorso 30 minuti Sto cercando esempi con asyncio e continuando a trovare la mia soluzione decode(). Qualcuno sa di una soluzione migliore o questa è una caratteristica mancante nell'asyncio di Python?
Per riferimento, ecco i risultati dell'utilizzo dei due decodificatori "standard" con flussi asincroni.
Utilizzo del lettore di flusso codec:
r = yield from aiohttp.get(url)
decoder = codecs.getreader('utf-8')
stream = decoder(r.content)
eccezione:
File "echo_client.py", line 13, in get_data
data = yield from stream.read(4)
File "/usr/lib/python3.5/codecs.py", line 497, in read
data = self.bytebuffer + newdata
TypeError: can't concat bytes to generator
(chiama lettura() direttamente, anziché yield from
o await
it)
Ho anche provato avvolgendo flusso con io.TextIOWrapper:
stream = TextIOWrapper(r.content)
Ma che conduce al seguente:
File "echo_client.py", line 10, in get_data
stream = TextIOWrapper(r.content)
AttributeError: 'FlowControlStreamReader' object has no attribute 'readable'
P.S. Se si desidera un caso di test esemplificativo, consultare this gist. Puoi eseguirlo con python3.5 per riprodurre l'errore. Se si modifica la dimensione del blocco da 4 a 3 (o 30), funzionerà correttamente.
EDIT
La risposta accettata risolto questo come un fascino. Grazie!Se qualcun altro ha questo problema, qui è una semplice classe wrapper che ho fatto per gestire la decodifica su uno StreamReader:
import codecs
class DecodingStreamReader:
def __init__(self, stream, encoding='utf-8', errors='strict'):
self.stream = stream
self.decoder = codecs.getincrementaldecoder(encoding)(errors=errors)
async def read(self, n=-1):
data = await self.stream.read(n)
if isinstance(data, (bytes, bytearray)):
data = self.decoder.decode(data)
return data
def at_eof(self):
return self.stream.at_eof()
E prima che qualcuno chieda perché non carico solo l'intera risposta in memoria, considera socket web o feed keep alive (come il feed _changes di couchdb in modalità continua). Voglio analizzare tutti i dati in arrivo, mentre arrivano, senza attendere (possibilmente minuti) che la connessione HTTP si chiuda. –