2013-10-31 8 views
6

sulla base del modulo multiprocessing Python, ho bisogno di fare quanto segue:Python Multiprocessing Modulo: chiamando un metodo di istanza all'interno di un processo

-Creare un processo sempre in esecuzione, che può essere interrotto da un evento specifico.

-In questo processo, ricevere un messaggio da un client e passare questo messaggio a un metodo gestore dell'istanza dell'oggetto.

Il codice di base è riportato di seguito (alcuni dettagli omessi). Il problema è che provo a chiamare il metodo di istanza (self.enroll (messaggio), ma non c'è alcun effetto, come previsto. Conosco il motivo: i processi usano la propria memoria ed ecc. E ho già implementato la soluzione presentata in Can't pickle <type 'instancemethod'> when using python's multiprocessing Pool.map() per risolvere la questione dei metodi limitati di pickling, oltre a provare diversi approcci utilizzando Manager, Queue, Pool ... poiché nessuno ha funzionato, ho deciso di inserire il codice come "grezzo" il più possibile, in modo da poter comprendere le mie intenzioni. qualsiasi aiuto è benvenuto.

class DistManager: 
    def __init__(self, name, network_address, password): 
     self.name = name 
     self.network_address = network_address 
     self.password = password 
     self.distribution_clients = {} 

    def _run_distribution_process(self): 
     import select 
     while not self.should_stop_distribution_service.is_set(): 
      (sread, swrite, sexc) = select.select([self.distribution_listener], [], [], 0) 
      if (sread): 
       connection = self.distribution_listener.accept() 
       serialized_message = connection.recv() # currently only receiving 
       connection.close() 
       message = pickle.loads(serialized_message) 
       self.enroll(message) # THE PROBLEM IS HERE 

    def start_distribution_service(self, distribution_port): 
     self.distribution_port = distribution_port 
     # patch for making Listener work with select.select during run 
     Listener.fileno = lambda self: self._listener._socket.fileno() 
     self.distribution_listener = Listener(address=(self.network_address, self.distribution_port), 
               authkey=self.password) 
     self.should_stop_distribution_service = Event() 
     self.distribution_process = Process(name='Distribution Runner', target=self._run_distribution_process) 
     self.distribution_process.daemon = True 
     self.distribution_process.start() 

    def stop_distribution_service(self): 
     from time import sleep 
     self.should_stop_distribution_service.set() 
     sleep(1) 
     self.distribution_listener.close() 
     self.distribution_process.terminate() 
     return self.distribution_process.exitcode 

    def _enroll_distribution_client(self, identifier, network_address, phone_number): 
     self.distribution_clients[identifier] = (network_address, phone_number) 

    def enroll(self, message): 
     if type(message.content) is tuple: 
      self._enroll_distribution_client(message.generator_identifier, message.content[0], message.content[1]) 
     else: 
      raise TypeError("Tuple expected") 
     return message.code 
+0

Un'osservazione: una soluzione è creare un'istanza dell'oggetto all'interno del processo, ma questo non è il caso , per altre ragioni, ho bisogno di chiamare un metodo di un'istanza creata al di fuori del processo - questo è il punto centrale. –

risposta

1

è comunque possibile utilizzare multiprocessing.pool per questo senza errori decapaggio.

Aggiungere la seguente riga a un codice che non multiprocessing con la classe e si ca n ancora passare il metodo attraverso il pool. i codici dovrebbero andare al di sopra della classe

import copy_reg 
    import types 

    def _reduce_method(meth): 
     return (getattr,(meth.__self__,meth.__func__.__name__)) 
    copy_reg.pickle(types.MethodType,_reduce_method) 

per una maggiore comprensione di come fare la serializzazione di un metodo vedi sotto http://docs.python.org/2/library/copy_reg.html