Un'opzione ready-made che potrebbe aiutare con questo è twisted.internet.defer.DeferredSemaphore
. Questa è la versione asincrona del semaforo normale (conteggio) che potreste già sapere se avete fatto molta programmazione in thread.
Un semaforo (di conteggio) è molto simile a un mutex (un blocco). Ma dove un mutex può essere acquisito solo una volta fino a una versione corrispondente, un semaforo (di conteggio) può essere configurato per consentire un numero arbitrario (ma specificato) di acquisizioni per avere successo prima che siano richieste eventuali versioni corrispondenti.
Ecco un esempio di utilizzo DeferredSemaphore
per eseguire dieci operazioni asincrone, ma per eseguire più di tre in una volta:
from twisted.internet.defer import DeferredSemaphore, gatherResults
from twisted.internet.task import deferLater
from twisted.internet import reactor
def async(n):
print 'Starting job', n
d = deferLater(reactor, n, lambda: None)
def cbFinished(ignored):
print 'Finishing job', n
d.addCallback(cbFinished)
return d
def main():
sem = DeferredSemaphore(3)
jobs = []
for i in range(10):
jobs.append(sem.run(async, i))
d = gatherResults(jobs)
d.addCallback(lambda ignored: reactor.stop())
reactor.run()
if __name__ == '__main__':
main()
DeferredSemaphore
dispone anche espliciti acquire
e release
metodi, ma il metodo run
è così conveniente è quasi sempre quello che vuoi. Chiama il metodo acquire
, che restituisce un valore Deferred
. A quel primo Deferred
, aggiunge un callback che chiama la funzione che hai passato (insieme a qualsiasi argomento posizionale o parola chiave). Se tale funzione restituisce un valore Deferred
, viene aggiunto un callback che chiama il metodo .
Il caso sincrono viene gestito anche chiamando immediatamente release
. Anche gli errori vengono gestiti, consentendo loro di propagarsi ma assicurandosi che sia necessario il release
necessario per lasciare lo DeferredSemaphore
in uno stato coerente. Il risultato della funzione passata a (o il risultato dello Deferred
restituito) diventa il risultato dello Deferred
restituito da run
.
Un altro approccio possibile potrebbe essere basato su DeferredQueue
e cooperate
. DeferredQueue
è come una normale coda, ma il suo metodo get
restituisce un valore Deferred
. Se non ci sono elementi in coda al momento della chiamata, lo Deferred
non si attiva fino a quando non viene aggiunto un articolo.
Ecco un esempio:
from random import randrange
from twisted.internet.defer import DeferredQueue
from twisted.internet.task import deferLater, cooperate
from twisted.internet import reactor
def async(n):
print 'Starting job', n
d = deferLater(reactor, n, lambda: None)
def cbFinished(ignored):
print 'Finishing job', n
d.addCallback(cbFinished)
return d
def assign(jobs):
# Create new jobs to be processed
jobs.put(randrange(10))
reactor.callLater(randrange(10), assign, jobs)
def worker(jobs):
while True:
yield jobs.get().addCallback(async)
def main():
jobs = DeferredQueue()
for i in range(10):
jobs.put(i)
assign(jobs)
for i in range(3):
cooperate(worker(jobs))
reactor.run()
if __name__ == '__main__':
main()
Nota che la funzione operaio async
è lo stesso di quello del primo esempio. Tuttavia, questa volta, è disponibile anche una funzione worker
che estrae in modo esplicito i lavori da DeferredQueue
e li elabora con async
(aggiungendo async
come richiamata allo Deferred
restituito da get
). Il generatore worker
è gestito da cooperate
, che lo itera una volta dopo ogni Deferred
produce incendi.Il ciclo principale, quindi, avvia tre di questi generatori di lavoratori in modo che tre lavori siano in corso in qualsiasi momento.
Questo approccio coinvolge un po 'più di codice rispetto all'approccio DeferredSemaphore
, ma ha alcuni vantaggi che possono essere interessanti. Innanzitutto, cooperate
restituisce un'istanza CooperativeTask
che ha metodi utili come pause
, resume
e un altro paio. Inoltre, tutti i lavori assegnati allo stesso collaboratore saranno cooperano l'uno con l'altro nella pianificazione, in modo da non sovraccaricare il ciclo degli eventi (e questo è ciò che dà il nome dell'API). Sul lato DeferredQueue
, è anche possibile impostare dei limiti sul numero di articoli in attesa di elaborazione, in modo da evitare di sovraccaricare completamente il server (ad esempio, se i processori di immagine rimangono bloccati e smettono di completare le attività). Se il codice che chiama put
gestisce l'eccezione di overflow della coda, è possibile utilizzarla come pressione per cercare di interrompere l'accettazione di nuovi lavori (magari la loro deviazione su un altro server o la segnalazione di un amministratore). Fare cose simili con DeferredSemaphore
è un po 'più complicato, dal momento che non c'è modo di limitare il numero di lavori in attesa di essere in grado di acquisire il semaforo.
Cool, ho davvero apprezzato queste idee. In risposta all'idea di utilizzare un DeferredSemaphore. Ciò sarebbe molto utile se esistessero lotti distinti di lavori che dovevano essere completati. Se un batch ha troppi lavori da fare, esegue solo alcuni lavori contemporaneamente e quindi quando tutti i lavori sono completi, il batch viene raccolto. Questo ha il rovescio della medaglia che nessun risultato viene restituito fino a quando l'intero lotto non termina correttamente? E penso che questo svantaggio sia risolto usando un DeferredQueue ... – agartland
L'approccio con un DeferredQueue e cooperare è intelligente. In realtà mi darà più controllo in futuro per quanto riguarda il ridimensionamento del processore. Non penso nemmeno che sia necessariamente più complicato. Grazie. – agartland