2013-04-03 17 views
5

Abbiamo una coda di lavori e i lavoratori elaborano questi lavori uno alla volta. Ogni lavoro ci richiede di formattare alcuni dati ed emettere una richiesta HTTP POST, con i dati come carico utile della richiesta.Come si inviano richieste HTTP asincrone in python uno alla volta?

In che modo è possibile che ciascun worker emetta queste richieste POST HTTP in modo asincrono in modo a thread singolo e senza blocco? Non ci interessa la risposta della richiesta: tutto ciò che vogliamo è che la richiesta venga eseguita il prima possibile e quindi che il lavoratore passi immediatamente al lavoro successivo.

Abbiamo esplorato utilizzando gevent e la libreria grequests (vedere Why does gevent.spawn not execute the parameterized function until a call to Greenlet.join?). Il nostro codice operaio sembra qualcosa di simile:

def execute_task(worker, job): 

    print "About to spawn request" 
    greenlet = gevent.spawn(requests.post, url, params=params) 

    print "Request spawned, about to call sleep" 
    gevent.sleep() 

    print "Greenlet status: ", greenlet.ready() 

La prima istruzione print esegue, ma le dichiarazioni secondo e il terzo di stampa non vengono mai stampate e l'URL viene mai colpito.

Come possiamo ottenere queste richieste asincrone da eseguire?

+0

C'è una lib standard chiamata [asyncore] (http://docs.python.org/2/library/asyncore.html) ma è forse troppo di basso livello per il tuo caso d'uso. – lucasg

+0

Dovrei essere d'accordo con @georgesl su questo, asyncore sarebbe un ottimo posto per migrare perché ti darà una migliore flessibilità sull'applicazione per uno sviluppo successivo. Inoltre, 'http: // stackoverflow.com/questions/15753901/python-asyncore-client-socket-can-not-determaine-connection-status/15754244 # 15754244' ecco un buon inizio e un esempio di come può essere utilizzato (vedi la risposta alla mia domanda). In caso contrario, dovresti effettivamente farlo in più processi, anche le librerie "sub" di python molto probabilmente lo collegheranno per te se puoi inviare richieste paralell, questo è il problema del multi-processo – Torxed

+0

Il tuo codice gevent sembra okay (e un test rapido mi dice che funziona bene, io uso gevent 1.0b3). Immagino che dipenda dal contesto in cui viene chiamato execute_task'. – robertklep

risposta

1

1) fanno un oggetto Queue.Queue

2) fare il maggior numero di "lavoratore" fili che vuoi che loop e letti dal Queue.Queue

3), passare i posti di lavoro nella coda. coda

il lavoratore discussioni saranno leggere la Queue.Queue nell'ordine in cui sono posti su di esso

esempio che legge le linee da un file e li mette in un Queue.Queue

import sys 
import urllib2 
import urllib 
from Queue import Queue 
import threading 
import re 

THEEND = "TERMINATION-NOW-THE-END" 


#read from file into Queue.Queue asynchronously 
class QueueFile(threading.Thread): 
    def run(self): 
     if not(isinstance(self.myq, Queue)): 
      print "Queue not set to a Queue" 
      sys.exit(1) 
     h = open(self.f, 'r') 
     for l in h: 
      self.myq.put(l.strip()) # this will block if the queue is full 
     self.myq.put(THEEND) 

    def set_queue(self, q): 
     self.myq = q 

    def set_file(self, f): 
     self.f = f 

un'idea di ciò che un thread di lavoro potrebbe essere come (solo esempio)

class myWorker(threading.Thread): 
    def run(self): 
     while(running):   
      try: 
       data = self.q.get() # read from fifo 

       req = urllib2.Request("http://192.168.1.10/url/path") 
       req.add_data(urllib.urlencode(data)) 
       h1 = urllib2.urlopen(req, timeout=10) 
       res = h1.read() 
       assert(len(res) > 80) 

      except urllib2.HTTPError, e: 
       print e 

      except urllib2.URLError, e: 
       print "done %d reqs " % n 
       print e 
       sys.exit() 

Per rendere gli oggetti in base threading.Thread andare, creare l'oggetto quindi chiamare "start" per l'istanza

1

Dovresti eseguirlo in thread diversi o utilizzare la libreria asyncore incorporata. Molte librerie useranno il threading senza che tu lo sappia, o si baserà su asyncore, che è una parte standard di Python.

Ecco una combinazione di Threading e asyncore:

#!/usr/bin/python 
# -*- coding: iso-8859-15 -*- 
import asyncore, socket 
from threading import * 
from time import sleep 
from os import _exit 
from logger import * # <- Non-standard library containing a log function 
from config import * # <- Non-standard library containing settings such as "server" 

class logDispatcher(Thread, asyncore.dispatcher): 
    def __init__(self, config=None): 
     self.inbuffer = '' 
     self.buffer = '' 
     self.lockedbuffer = False 
     self.is_writable = False 

     self.is_connected = False 

     self.exit = False 
     self.initated = False 

     asyncore.dispatcher.__init__(self) 
     Thread.__init__(self) 

     self.create_socket(socket.AF_INET, socket.SOCK_STREAM) 
     try: 
      self.connect((server, server_port)) 
     except: 
      log('Could not connect to ' + server, 'LOG_SOCK') 
      return None 

     self.start() 

    def handle_connect_event(self): 
     self.is_connected = True 

    def handle_connect(self): 
     self.is_connected = True 
     log('Connected to ' + str(server), 'LOG_SOCK') 

    def handle_close(self): 
     self.is_connected = False 
     self.close() 

    def handle_read(self): 
     data = self.recv(8192) 
     while self.lockedbuffer: 
      sleep(0.01) 

     self.inbuffer += data 


    def handle_write(self): 
     while self.is_writable: 
      sent = self.send(self.buffer) 
      sleep(1) 

      self.buffer = self.buffer[sent:] 
      if len(self.buffer) <= 0: 
       self.is_writable = False 
      sleep(0.01) 

    def _send(self, what): 
     self.buffer += what + '\r\n' 
     self.is_writable = True 

    def run(self): 
     self._send('GET/HTTP/1.1\r\n') 

while 1: 
    logDispatcher() # <- Initate one for each request. 
    asyncore.loop(0.1) 
    log('All threads are done, next loop in 10', 'CORE') 
    sleep(10) 

o si può semplicemente fare un filo che fa il lavoro e poi muore.

from threading import * 
class worker(Thread): 
    def __init__(self, host, postdata) 
     Thread.__init__(self) 
     self.host = host 
     self.postdata = postdata 
     self.start() 
    def run(self): 
     sock.send(self.postdata) #Pseudo, create the socket! 

for data in postDataObjects: 
    worker('example.com', data) 

Se è necessario limitare il numero di thread (se si sta inviando messaggi oltre 5k o almeno così si potrebbe ottenere tassazione sul sistema) basta fare un while len(enumerate()) > 1000: sleep(0.1) e lasciare che l'oggetto del crochet attendere qualche Discussioni morire fuori

0

avvolgere l'url ei parametri in un elenco, quindi inserire una coppia una volta ogni volta nel pool di attività (il pool di attività qui ha o un'attività o è vuoto), creare thread, leggere l'attività dal pool di attività, quando un thread ottiene l'operazione e invia la richiesta, quindi ne espande un altro dall'elenco (ovvero questo è effettivamente un elenco di code)

1

È possibile utilizzare il metodo join anziché sleep e quindi controllare lo stato.Se vuoi eseguire uno alla volta che risolverà il problema. Modificare leggermente il codice per testarlo sembra funzionare correttamente.

import gevent 
import requests 

def execute_task(worker, job): 

    print "About to spawn request" 
    greenlet = gevent.spawn(requests.get, 'http://example.com', params={}) 

    print "Request spawned, about to call sleep" 
    gevent.sleep() 

    print "Greenlet status: ", greenlet.ready() 
    print greenlet.get() 

execute_task(None, None) 

dà i risultati:

About to spawn request 
Request spawned, about to call sleep 
Greenlet status: True 
<Response [200]> 

c'è più in corso in questo processo Python che potrebbe essere bloccando Gevent esecuzione questo greenlet?