2010-06-23 6 views
5

Immaginate un albero binario invertito con i nodi A, B, C, D, E, F a livello 0. nodi G, H, I a livello 1, nodo J a livello 2, e il nodo K al livello 3.Implementazione di un tipo speciale di coda di multiprocessing in Python

livello 1: G = func (a, B), H = func (C, D), I = func (e, F)

livello 2: J = func (G, H)

Livello 3: K = func (J, I).

Ogni coppia di nodi sul Livello 0 deve essere elaborata nell'ordine, Ogni coppia di nodi sul Livello 1 può essere elaborata in qualsiasi ordine ma il risultato deve essere elaborato al livello successivo come mostrato, e così via fino alla fine con il risultato finale, K.

Il problema reale è un problema di geometria computazionale in cui una sequenza di solidi si fondono insieme. A è adiacente a B che è adiacente a C, e così via. Il fusibile risultante di A e B (G) è adiacente al fusibile di C e D (H). La miccia risultante di J e I (K) è il risultato finale. Quindi non puoi fondere G e I poiché non sono adiacenti. Se il numero di nodi su un livello non è una potenza di 2, si finisce con un'entità pendente che deve essere elaborata un livello ulteriore.

Poiché il processo di fusione è computazionalmente costoso e richiede molta memoria ma è molto parallelo, vorrei utilizzare il pacchetto multiprocessing Python e qualche forma di coda. Dopo aver calcolato G = func (A, B), vorrei inserire il risultato G nella coda per il calcolo successivo di J = func (G, H). Quando la coda è vuota, l'ultimo risultato è il risultato finale. Tieni presente che il file mp.queue non produrrà necessariamente risultati FIFO, poiché I = func (E, F) potrebbe terminare prima di H = func (C, D)

Ho trovato alcune (cattive) soluzioni ma sono sicuro che c'è una soluzione elegante appena oltre la mia portata. Suggerimenti?

+0

Perché il livello = 0 deve essere elaborato in ordine, ma il livello = 1 può essere elaborato in qualsiasi ordine? Non è sufficiente scegliere due foglie conosciute e fonderle in un singolo nodo? – Stephen

+0

Non sono corretto nel dire che i nodi devono essere elaborati in ordine. Devono essere trattati in termini di adiacenza. A è adiacente a B è adiacente a C e così via per il livello 0. Puoi fare func (A, B) o func (B, C) ma non func (A, C). Allo stesso modo al livello 1, G è adiacente a H è adiacente a I. Puoi fare func (G, H) o func (H, I) ma non func (G, I). – user90855

risposta

0

Non sono riuscito a creare un modello intelligente per una coda, ma è possibile sostituire facilmente la coda con un altro processo, che nel mio esempio ho chiamato WorkerManager. Questo processo raccoglie i risultati di tutti i processi Worker e avvia nuovi lavoratori solo se sono presenti due pacchetti di dati adiacenti che attendono di essere elaborati. In questo modo, non proverai mai a unire risultati non adiacenti, quindi puoi ignorare i "livelli" e sparare il calcolo della coppia successiva non appena è pronto.

from multiprocessing import Process, Queue 

class Result(object): 
    '''Result from start to end.''' 
    def __init__(self, start, end, data): 
     self.start = start 
     self.end = end 
     self.data = data 


class Worker(Process): 
    '''Joins two results into one result.''' 
    def __init__(self, result_queue, pair): 
     self.result_queue = result_queue 
     self.pair = pair 
     super(Worker, self).__init__() 

    def run(self): 
     left, right = self.pair 
     result = Result(left.start, right.end, 
         '(%s, %s)' % (left.data, right.data)) 
     self.result_queue.put(result) 


class WorkerManager(Process): 
    ''' 
    Takes results from result_queue, pairs them 
    and assigns workers to process them. 
    Returns final result into final_queue. 
    ''' 
    def __init__(self, result_queue, final_queue, start, end): 
     self._result_queue = result_queue 
     self._final_queue = final_queue 
     self._start = start 
     self._end = end 
     self._results = [] 
     super(WorkerManager, self).__init__() 

    def run(self): 
     while True: 
      result = self._result_queue.get() 
      self._add_result(result) 
      if self._has_final_result(): 
       self._final_queue.put(self._get_final_result()) 
       return 
      pair = self._find_adjacent_pair() 
      if pair: 
       self._start_worker(pair) 

    def _add_result(self, result): 
     self._results.append(result) 
     self._results.sort(key=lambda result: result.start) 

    def _has_final_result(self): 
     return (len(self._results) == 1 
       and self._results[0].start == self._start 
       and self._results[0].end == self._end) 

    def _get_final_result(self): 
     return self._results[0] 

    def _find_adjacent_pair(self): 
     for i in xrange(len(self._results) - 1): 
      left, right = self._results[i], self._results[i + 1] 
      if left.end == right.start: 
       self._results = self._results[:i] + self._results[i + 2:] 
       return left, right 

    def _start_worker(self, pair): 
     worker = Worker(self._result_queue, pair) 
     worker.start() 

if __name__ == '__main__': 
    DATA = [Result(i, i + 1, str(i)) for i in xrange(6)] 
    result_queue = Queue() 
    final_queue = Queue() 
    start = 0 
    end = len(DATA) 
    man = WorkerManager(result_queue, final_queue, start, end) 
    man.start() 
    for res in DATA: 
     result_queue.put(res) 
    final = final_queue.get() 
    print final.start 
    # 0 
    print final.end 
    # 6 
    print final.data 
    # For example: 
    # (((0, 1), (2, 3)), (4, 5)) 

Per il mio esempio, ho usato un semplice Worker che restituisce dati forniti tra parentesi, separati da una virgola, ma si potrebbe mettere qualsiasi calcolo in là. Nel mio caso, il risultato finale è stato (((0, 1), (2, 3)), (4, 5)) che significa che l'algoritmo ha calcolato (0, 1) e (2, 3) prima di calcolare ((0, 1), (2, 3)) e quindi ha unito il risultato con (4, 5). Spero che questo sia quello che stavi cercando.

+0

mi si avvicinò con una soluzione che assomiglia: def fusore (forme): shape1_id, Shape1 = forme [0] shape2_id, shape2 = forme [1] fuso = OCC.BRepAlgoAPI.BRepAlgoAPI_Fuse (Shape1, shape2).Shape() return ((shape1_id, shape2_id), fuso) results = [(i, a) for i, a enumerate (slices)] while len (risultati)> 1: P = processing.Pool (7 risultati = P.map (fusore, [(a, b) per a, b in zip (risultati [:: 2], risultati [1 :: 2])]) results.sort (chiave = lambda risultato: risultato [0]) – user90855