2011-12-03 9 views
5

Quindi quello che sto facendo è scrivere un servizio di streaming WSGI che utilizza una coda racchiusa in un iteratore per implementare un push multicast. Quello che segue è un modello semplificato del servizio:Implementazione di un servizio di streaming WSGI: (come rilevare i disconnessi del client)

# this is managed by another thread 
def processor_runner(): 
    generator = SerialMessageGenerator() 
    for message in generator: 
     for client in Processor.connections: 
      client.put(message) 

# this is managed by twisted's wsgi implementation 
def main(environ, start_response): 
    queue = Queue() 
    Processor.connections.append(queue) 
    status = '200 OK' 
    response_headers = [ 
     ('Content-Type', 'application/json'), 
     ('Transfer-Encoding', 'chunked') 
    ] 
    start_response(status, response_headers) 
    return iter(queue.get, None) 

E questo sta lavorando molto con contorta come server WSGI (Per inciso, il generatore di seriale è un processo separato collegato al processore da una coda di processo inter) . La mia domanda è: come posso rilevare quando un client si disconnette e quindi rimuoverlo dalla coda? Il mio però sta aggiungendo la coda come una tupla con il client socket (socket, coda) e poi controllando se il socket è ancora connesso prima di eseguire il put. Tuttavia, non so esattamente cosa prelevare da environ. Qualcuno ha qualche esperienza con questo prima di fare qualcosa insieme?

Aggiornato

Ecco la soluzione ho finalmente andato con:

class IterableQueue(Queue): 

def __init__(self): 
    Queue.__init__(self) # Queue is an old style class 
    ShellProcessor.connections.append(self) 

def __iter__(self): 
    return iter(self.get, None) 

def close(self): 
    self.put(None) 
    self.task_done() 
    ShellProcessor.connections.remove(self) 
+0

Inoltre, sentitevi liberi di commentare l'architettura o la prestazione di un servizio di streaming WSGI se avete qualche esperienza. – Bashwork

risposta

1

chiamate intrecciate .close() sull'iteratore se presente quando la richiesta è terminata o interrotta. Si potrebbe fare qualcosa di simile:

# ... 
start_response(status, response_headers) 
return ResponseIterator(iter(queue.get, None), 
    on_finish=lambda: Processor.connections.remove(queue)) 

dove ResponseIterator potrebbe essere:

class ResponseIterator: 

    def __init__(self, iterator, on_finish=None): 
     self.iterator = iterator 
     self.on_finish = on_finish 

    def __iter__(self): 
     return self 

    def next(self): 
     return next(self.iterator) 

    def close(self): 
     if self.on_finish is not None: 
     self.on_finish() 
+0

Bello, non lo sapevo chiamare vicino all'iteratore. Questo è perfetto! – Bashwork