2013-05-21 22 views
17

Sto implementando uno schema produttore-consumatore in python usando multiprocessing.Pool e multiprocessing.Queue. I consumatori sono processi pre-fork che utilizza gevent per generare più attività.Qual è il modo più pulito per fermare un worker di multiprocessing python collegato a una coda in un ciclo infinito?

Questo vuole essere un assettato giù versione del Codice:

import gevent 
from Queue import Empty as QueueEmpty 
from multiprocessing import Process, Queue, Pool 
import signal 
import time 

# Task queue 
queue = Queue() 

def init_worker(): 
    # Ignore signals in worker 
    signal.signal(signal.SIGTERM, signal.SIG_IGN) 
    signal.signal(signal.SIGINT, signal.SIG_IGN) 
    signal.signal(signal.SIGQUIT, signal.SIG_IGN) 

# One of the worker task 
def worker_task1(): 
    while True: 
     try: 
      m = queue.get(timeout = 2) 

      # Break out if producer says quit 
      if m == 'QUIT': 
       print 'TIME TO QUIT' 
       break 

     except QueueEmpty: 
      pass 

# Worker 
def work(): 
    gevent.joinall([ 
     gevent.spawn(worker_task1), 
    ]) 

pool = Pool(2, init_worker) 
for i in xrange(2): 
    pool.apply_async(work) 

try: 
    while True: 
     queue.put('Some Task') 
     time.sleep(2) 

except KeyboardInterrupt as e: 
    print 'STOPPING' 

    # Signal all workers to quit 
    for i in xrange(2): 
     queue.put('QUIT') 

    pool.join() 

Ora in cui provo a smettere, ottengo seguente stato:

  1. processo padre è in attesa di uno dei bambini a unirsi .
  2. Uno dei bambini è in stato di defunto. Così finito, ma il genitore sta aspettando che l'altro bambino finisca.
  3. Altro bambino mostra: futex(0x7f99d9188000, FUTEX_WAIT, 0, NULL ....

Quindi qual è il modo corretto per terminare un tale processo in modo pulito?

risposta