https://github.com/housleyjk/aiopyramid funziona per me. Vedere la documentazione per websocket http://aiopyramid.readthedocs.io/features.html#websockets
UPD: server di
WebSocket con l'ambiente piramide.
import aiohttp
import asyncio
from aiohttp import web
from webtest import TestApp
from pyramid.config import Configurator
from pyramid.response import Response
async def websocket_handler(request):
ws = web.WebSocketResponse()
await ws.prepare(request)
while not ws.closed:
msg = await ws.receive()
if msg.tp == aiohttp.MsgType.text:
if msg.data == 'close':
await ws.close()
else:
hello = TestApp(request.app.pyramid).get('/')
ws.send_str(hello.text)
elif msg.tp == aiohttp.MsgType.close:
print('websocket connection closed')
elif msg.tp == aiohttp.MsgType.error:
print('ws connection closed with exception %s' %
ws.exception())
else:
ws.send_str('Hi')
return ws
def hello(request):
return Response('Hello world!')
async def init(loop):
app = web.Application(loop=loop)
app.router.add_route('GET', '/{name}', websocket_handler)
config = Configurator()
config.add_route('hello_world', '/')
config.add_view(hello, route_name='hello_world')
app.pyramid = config.make_wsgi_app()
srv = await loop.create_server(app.make_handler(),
'127.0.0.1', 8080)
print("Server started at http://127.0.0.1:8080")
return srv
loop = asyncio.get_event_loop()
loop.run_until_complete(init(loop))
try:
loop.run_forever()
except KeyboardInterrupt:
pass
cliente WebSocket:
import asyncio
import aiohttp
session = aiohttp.ClientSession()
async def client():
ws = await session.ws_connect('http://0.0.0.0:8080/foo')
while True:
ws.send_str('Hi')
await asyncio.sleep(2)
msg = await ws.receive()
if msg.tp == aiohttp.MsgType.text:
print('MSG: ', msg)
if msg.data == 'close':
await ws.close()
break
else:
ws.send_str(msg.data + '/client')
elif msg.tp == aiohttp.MsgType.closed:
break
elif msg.tp == aiohttp.MsgType.error:
break
loop = asyncio.get_event_loop()
loop.run_until_complete(client())
loop.close()
Non sei sicuro di websocket in Python 3 (gevent-socketio si basa su gevent, che io non sono sicuro che è supportato in Python 3). Ma hai considerato eventi inviati dal server? Esempio: https://github.com/antoineleclair/zmq-sse-chat/blob/master/sse/views.py –