Ho una grande matrice sparsa X nel formato scipy.sparse.csr_matrix e vorrei moltiplicarla per un array numpy W che utilizza il parallelismo. Dopo alcune ricerche ho scoperto che è necessario utilizzare l'array in multiprocessing per evitare di copiare X e W tra processi (ad esempio da qui: How to combine Pool.map with Array (shared memory) in Python multiprocessing? e Is shared readonly data copied to different processes for Python multiprocessing?). Qui è il mio ultimo tentativoCome parallelizzare la moltiplicazione della matrice sparsa di scipy
import multiprocessing
import numpy
import scipy.sparse
import time
def initProcess(data, indices, indptr, shape, Warr, Wshp):
global XData
global XIndices
global XIntptr
global Xshape
XData = data
XIndices = indices
XIntptr = indptr
Xshape = shape
global WArray
global WShape
WArray = Warr
WShape = Wshp
def dot2(args):
rowInds, i = args
global XData
global XIndices
global XIntptr
global Xshape
data = numpy.frombuffer(XData, dtype=numpy.float)
indices = numpy.frombuffer(XIndices, dtype=numpy.int32)
indptr = numpy.frombuffer(XIntptr, dtype=numpy.int32)
Xr = scipy.sparse.csr_matrix((data, indices, indptr), shape=Xshape)
global WArray
global WShape
W = numpy.frombuffer(WArray, dtype=numpy.float).reshape(WShape)
return Xr[rowInds[i]:rowInds[i+1], :].dot(W)
def getMatmat(X):
numJobs = multiprocessing.cpu_count()
rowInds = numpy.array(numpy.linspace(0, X.shape[0], numJobs+1), numpy.int)
#Store the data in X as RawArray objects so we can share it amoung processes
XData = multiprocessing.RawArray("d", X.data)
XIndices = multiprocessing.RawArray("i", X.indices)
XIndptr = multiprocessing.RawArray("i", X.indptr)
def matmat(W):
WArray = multiprocessing.RawArray("d", W.flatten())
pool = multiprocessing.Pool(processes=multiprocessing.cpu_count(), initializer=initProcess, initargs=(XData, XIndices, XIndptr, X.shape, WArray, W.shape))
params = []
for i in range(numJobs):
params.append((rowInds, i))
iterator = pool.map(dot2, params)
P = numpy.zeros((X.shape[0], W.shape[1]))
for i in range(numJobs):
P[rowInds[i]:rowInds[i+1], :] = iterator[i]
return P
return matmat
if __name__ == '__main__':
#Create a random sparse matrix X and a random dense one W
X = scipy.sparse.rand(10000, 8000, 0.1)
X = X.tocsr()
W = numpy.random.rand(8000, 20)
startTime = time.time()
A = getMatmat(X)(W)
parallelTime = time.time()-startTime
startTime = time.time()
B = X.dot(W)
nonParallelTime = time.time()-startTime
print(parallelTime, nonParallelTime)
Tuttavia l'uscita è qualcosa di simile: (4.431, 0.165) che indica la versione parallela è molto più lento di moltiplicazione non parallele.
Credo che il rallentamento possa essere causato in situazioni simili quando si copiano dati di grandi dimensioni sui processi, ma questo non è il caso in cui utilizzo Array per memorizzare le variabili condivise (a meno che non avvenga in numpy.frombuffer o quando creando una csr_matrix, ma poi non sono riuscito a trovare un modo per condividere direttamente una csr_matrix). Un'altra possibile causa della bassa velocità sta restituendo un grande risultato di ogni moltiplicazione di matrice per ogni processo, ma non sono sicuro di come aggirarlo.
Qualcuno può vedere dove sto andando male? Grazie per l'aiuto!
Aggiornamento: Non posso essere sicuro, ma credo che condividere grandi quantità di dati tra processi non sia così efficiente, e idealmente dovrei usare il multithreading (anche se il Global Interpreter Lock (GIL) lo rende molto difficile). Un modo per aggirare questo è rilasciare il GIL usando Cython per esempio (vedi http://docs.cython.org/src/userguide/parallelism.html), anche se molte delle funzioni di numpy devono passare attraverso GIL.
Hai numpy/scipy collegato a un build ATLAS ottimizzato con multithreading?Se lo fai, dovresti ottenere la moltiplicazione della matrice parallela gratuitamente quando usi np.dot. –
Sto usando una libreria BLAS multithread (OpenBLAS) collegata a numpy/scipy ma ho provato X.dot (W) e numpy.dot (X, W) (quest'ultimo non funziona per X sparse) e questo non è parallelised. – Charanpal