6

Sto imparando il multiprocessing di Python e sto tentando di utilizzare questa funzione per compilare un elenco con tutti i file presenti in un sistema operativo. Tuttavia, il codice che ho scritto è in esecuzione solo sequenzialmente.Concorrenza di multiprocessing di Python utilizzando Manager, Pool e un elenco condiviso non funzionante

#!/usr/bin/python 
import os 
import multiprocessing 
tld = [os.path.join("/", f) for f in os.walk("/").next()[1]] #Gets a top level directory names inside "/" 
manager = multiprocessing.Manager() 
files = manager.list() 


def get_files(x): 
    for root, dir, file in os.walk(x): 
     for name in file: 
      files.append(os.path.join(root, name)) 

mp = [multiprocessing.Process(target=get_files, args=(tld[x],)) 
     for x in range(len(tld))] 

for i in mp: 
    i.start() 
    i.join() 
print len(files) 

Quando ho controllato l'albero del processo, posso vedere solo un singolo processo del Cile generato. (Uomo pstree dice {} denota il processo figlio generato dal genitore.)

---bash(10949)---python(12729)-+-python(12730)---{python}(12752) 
           `-python(12750)` 

Quello che cercavo era, a generare un processo per ogni directory tld, compilare l'elenco condiviso files, e che sarebbe intorno 10-15 processi a seconda del numero di directory. Che cosa sto facendo di sbagliato?

EDIT ::

ho usato multiprocessing.Pool per creare thread di lavoro, e questa volta i processi sono ha generato, ma sta dando errori quando provo ad usare multiprocessing.Pool.map(). Mi riferivo al seguente codice nei documenti pitone che mostra

from multiprocessing import Pool 
def f(x): 
return x*x 

if __name__ == '__main__': 
    p = Pool(5) 
    print(p.map(f, [1, 2, 3])) 

Seguendo questo esempio, ho riscritto il codice come

import os 
import multiprocessing 
tld = [os.path.join("/", f) for f in os.walk("/").next()[1]] 
manager = multiprocessing.Manager() 
pool = multiprocessing.Pool(processes=len(tld)) 
print pool 
files = manager.list() 
def get_files(x): 
    for root, dir, file in os.walk(x): 
     for name in file: 
      files.append(os.path.join(root, name)) 
pool.map(get_files, [x for x in tld]) 
pool.close() 
pool.join() 
print len(files) 

e sta biforcano più processi.

---bash(10949)---python(12890)-+-python(12967) 
           |-python(12968) 
           |-python(12970) 
           |-python(12971) 
           |-python(12972) 
           ---snip--- 

Ma il codice è erroring dicendo

Process PoolWorker-2: Traceback (most recent call last): File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap Traceback (most recent call last): File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap self.run() File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run self._target(*self._args, **self._kwargs) File "/usr/lib/python2.7/multiprocessing/pool.py", line 102, in worker File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run task = get() File "/usr/lib/python2.7/multiprocessing/queues.py", line 376, in get return recv() AttributeError: 'module' object has no attribute 'get_files' self._target(*self._args, **self._kwargs) self.run() task = get() File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run self.run() File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run self._target(*self._args, **self._kwargs) File "/usr/lib/python2.7/multiprocessing/pool.py", line 102, in worker File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run task = get() File "/usr/lib/python2.7/multiprocessing/queues.py", line 376, in get AttributeError: 'module' object has no attribute 'get_files' self.run()

Che cosa sto facendo male qui, e perché le get_files) errori funzionali (fuori?

risposta

3

È semplicemente perché si istanzia la vostra piscina prima di definire la funzione get_files:

import os 
import multiprocessing 

tld = [os.path.join("/", f) for f in os.walk("/").next()[1]] 
manager = multiprocessing.Manager() 

files = manager.list() 
def get_files(x): 
    for root, dir, file in os.walk(x): 
     for name in file: 
      files.append(os.path.join(root, name)) 

pool = multiprocessing.Pool(processes=len(tld)) # Instantiate the pool here 

pool.map(get_files, [x for x in tld]) 
pool.close() 
pool.join() 
print len(files) 

L'idea complessiva di un processo è che nel momento che si avvia, si sborsare la memoria del processo principale. Quindi qualsiasi definizione eseguita nel processo principale dopo la forcella non sarà nel sottoprocesso.

Se si desidera una memoria condivisa, è possibile utilizzare la libreria threading, ma si avrà alcuni problemi con esso (cf: The global interpreter lock)

+0

Grazie, che ha funzionato. Tuttavia, mi stavo chiedendo, dal momento che tld era già stato definito, perché definire il pool prima che la funzione fosse importante? Non c'era alcun riferimento alla funzione durante la definizione del pool. – nohup

+0

Ce n'è uno :) nel tuo 'pool.map'. Usando 'pool.map' chiedi al tuo processo di usare la funzione' get_files'. – FunkySayu

+0

concordato. Ma pool.map è stato definito dopo la funzione, prima che il pool di evento fosse stato definito in precedenza, perché durante la definizione del pool, sarebbe solo il numero di processi di lavoro da generare, secondo python doc. 'pool = Pool (processes = 4) # avvia 4 processi worker 'Per favore correggimi dove sto interpretando male. Ancora una volta, grazie mille @FunkySayu – nohup