Qual è il modo migliore di attendere (senza girare) finché qualcosa non è disponibile in uno dei due (multiprocessing) Queues, dove entrambi risiedono sullo stesso sistema?"select" su più code Python multiprocessing?
risposta
Non sembra che ci sia ancora un modo ufficiale per gestirlo. O almeno, non si basa su questo:
si potrebbe provare qualcosa di simile a ciò che questo post sta facendo - l'accesso ai filehandles tubi sottostanti:
e quindi selezionare.
È possibile utilizzare qualcosa come il modello Observer, in cui i destinatari della coda vengono informati delle modifiche di stato.
In questo caso, si potrebbe avere il thread di lavoro designato come un ascoltatore su ogni coda, e ogni volta che riceve un segnale di pronto, si può lavorare sul nuovo elemento, altrimenti dormire.
Bene, il 'get' è distruttivo, quindi non si può davvero fare osservazione sulla coda stessa come descritto da GoF. Il thread di dequeueing dovrebbe essere il "osservato" - speravo in un overhead minore rispetto a due thread aggiuntivi. – cdleary
Inoltre, se volessi un singolo punto di accesso per il processo chiamante (come in 'select') avrei bisogno di una coda thread-safe in cima a quei due thread. – cdleary
Sembra che l'utilizzo di thread che inoltrano gli elementi in entrata a una singola coda in attesa su di essi sia una scelta pratica quando si utilizza il multiprocessing in modo indipendente dalla piattaforma.
L'evitamento dei thread richiede la gestione di pipe/FD di basso livello, entrambe specifiche della piattaforma e non facili da gestire in modo coerente con l'API di livello superiore.
Oppure avresti bisogno di Code con la possibilità di impostare i callback che ritengo siano l'interfaccia di livello superiore corretta per cui andare. Cioè si potrebbe scrivere qualcosa di simile:
singlequeue = Queue() incoming_queue1.setcallback(singlequeue.put) incoming_queue2.setcallback(singlequeue.put) ... singlequeue.get()
Forse il pacchetto di multiprocessing potrebbe crescere questa API, ma è ancora arrivati. Il concetto funziona bene con py.execnet che utilizza il termine "canale" invece di "code", vedi qui http://tinyurl.com/nmtr4w
Questa sarebbe un'interfaccia molto bella! (Anche se chiaramente c'è il vantaggio di mantenere le interfacce stdlib strette, come Jesse menziona nel rapporto bug di @ars.) – cdleary
true ma l'attuale API pubblica Queue non gestisce il tuo caso d'uso che penso sia comune. – hpk42
Se è "comune" - presenta un bug report + patch (con test per l'amore di pete) su bugs.python.org e posso valutarlo per 2.7/3.x – jnoller
In realtà è possibile utilizzare multiprocessing.Queue objects in select.select. cioè
que = multiprocessing.Queue()
(input,[],[]) = select.select([que._reader],[],[])
selezionerebbe que solamente se è pronto per essere letto da.
Nessuna documentazione a riguardo. Stavo leggendo il codice sorgente della libreria multiprocessing.queue (su Linux normalmente è come /usr/lib/python2.6/multiprocessing/queue.py) per scoprirlo.
Con Queue.Queue non ho trovato alcun modo intelligente per farlo (e mi piacerebbe molto).
sembra non funzionare sotto Windows. – fluke
Funziona alla grande su Unix, ma su Windows l'implementazione 'select.select' può gestire solo socket, non descrittori di file e quindi fallisce. –
Qual è la differenza principale tra 'Queue.Queue' e' multiprocessing.Queue', e '' multiprocessing.Queue' può essere usato per il multithreading e non solo per il multiprocessing? – CMCDragonkai
Non sono sicuro se la selezione su una coda di elaborazione multipla funzioni su Windows. Come selezionare su windows ascolta socket e non handle di file, ho il sospetto che potrebbero esserci dei problemi.
La mia risposta è di creare un thread per ascoltare ogni coda in un blocco, e di mettere tutti i risultati in una singola coda ascoltata dal thread principale, essenzialmente multiplexando le singole code in una singola.
Il mio codice per fare questo è:
"""
Allow multiple queues to be waited upon.
queue,value = multiq.select(list_of_queues)
"""
import queue
import threading
class queue_reader(threading.Thread):
def __init__(self,inq,sharedq):
threading.Thread.__init__(self)
self.inq = inq
self.sharedq = sharedq
def run(self):
while True:
data = self.inq.get()
print ("thread reads data=",data)
result = (self.inq,data)
self.sharedq.put(result)
class multi_queue(queue.Queue):
def __init__(self,list_of_queues):
queue.Queue.__init__(self)
for q in list_of_queues:
qr = queue_reader(q,self)
qr.start()
def select(list_of_queues):
outq = queue.Queue()
for q in list_of_queues:
qr = queue_reader(q,outq)
qr.start()
return outq.get()
La seguente routine di test mostra come usarlo:
import multiq
import queue
q1 = queue.Queue()
q2 = queue.Queue()
q3 = multiq.multi_queue([q1,q2])
q1.put(1)
q2.put(2)
q1.put(3)
q1.put(4)
res=0
while not res==4:
while not q3.empty():
res = q3.get()[1]
print ("returning result =",res)
Spero che questo aiuti.
Tony Wallace
nuova versione del codice di cui sopra ...
Non so quanto bene il select in una coda multiprocessing funziona su Windows. Come selezionare su windows ascolta socket e non handle di file, ho il sospetto che potrebbero esserci dei problemi.
La mia risposta è di creare un thread per ascoltare ogni coda in un blocco, e di mettere tutti i risultati in una singola coda ascoltata dal thread principale, essenzialmente multiplexando le singole code in una singola.
Il mio codice per fare questo è:
"""
Allow multiple queues to be waited upon.
An EndOfQueueMarker marks a queue as
"all data sent on this queue".
When this marker has been accessed on
all input threads, this marker is returned
by the multi_queue.
"""
import queue
import threading
class EndOfQueueMarker:
def __str___(self):
return "End of data marker"
pass
class queue_reader(threading.Thread):
def __init__(self,inq,sharedq):
threading.Thread.__init__(self)
self.inq = inq
self.sharedq = sharedq
def run(self):
q_run = True
while q_run:
data = self.inq.get()
result = (self.inq,data)
self.sharedq.put(result)
if data is EndOfQueueMarker:
q_run = False
class multi_queue(queue.Queue):
def __init__(self,list_of_queues):
queue.Queue.__init__(self)
self.qList = list_of_queues
self.qrList = []
for q in list_of_queues:
qr = queue_reader(q,self)
qr.start()
self.qrList.append(qr)
def get(self,blocking=True,timeout=None):
res = []
while len(res)==0:
if len(self.qList)==0:
res = (self,EndOfQueueMarker)
else:
res = queue.Queue.get(self,blocking,timeout)
if res[1] is EndOfQueueMarker:
self.qList.remove(res[0])
res = []
return res
def join(self):
for qr in self.qrList:
qr.join()
def select(list_of_queues):
outq = queue.Queue()
for q in list_of_queues:
qr = queue_reader(q,outq)
qr.start()
return outq.get()
Il codice di seguito è la mia routine di test per mostrare come funziona:
import multiq
import queue
q1 = queue.Queue()
q2 = queue.Queue()
q3 = multiq.multi_queue([q1,q2])
q1.put(1)
q2.put(2)
q1.put(3)
q1.put(4)
q1.put(multiq.EndOfQueueMarker)
q2.put(multiq.EndOfQueueMarker)
res=0
have_data = True
while have_data:
res = q3.get()[1]
print ("returning result =",res)
have_data = not(res==multiq.EndOfQueueMarker)
Non pratico.
Inserire un'intestazione nei messaggi e inviarli a una coda comune. Questo semplifica il codice e sarà più pulito nel complesso.
+1 Wow, bella scoperta! Il mio Google-fu sembra essere debole ... – cdleary
sfortunatamente, il secondo URL non funziona più –