2013-05-29 2 views
8

Ho una grande matrice sparsa X nel formato scipy.sparse.csr_matrix e vorrei moltiplicarla per un array numpy W che utilizza il parallelismo. Dopo alcune ricerche ho scoperto che è necessario utilizzare l'array in multiprocessing per evitare di copiare X e W tra processi (ad esempio da qui: How to combine Pool.map with Array (shared memory) in Python multiprocessing? e Is shared readonly data copied to different processes for Python multiprocessing?). Qui è il mio ultimo tentativoCome parallelizzare la moltiplicazione della matrice sparsa di scipy

import multiprocessing 
import numpy 
import scipy.sparse 
import time 

def initProcess(data, indices, indptr, shape, Warr, Wshp): 
    global XData 
    global XIndices 
    global XIntptr 
    global Xshape 

    XData = data 
    XIndices = indices 
    XIntptr = indptr 
    Xshape = shape 

    global WArray 
    global WShape 

    WArray = Warr  
    WShape = Wshp 

def dot2(args): 
    rowInds, i = args  

    global XData 
    global XIndices 
    global XIntptr 
    global Xshape 

    data = numpy.frombuffer(XData, dtype=numpy.float) 
    indices = numpy.frombuffer(XIndices, dtype=numpy.int32) 
    indptr = numpy.frombuffer(XIntptr, dtype=numpy.int32) 
    Xr = scipy.sparse.csr_matrix((data, indices, indptr), shape=Xshape) 

    global WArray 
    global WShape 
    W = numpy.frombuffer(WArray, dtype=numpy.float).reshape(WShape) 

    return Xr[rowInds[i]:rowInds[i+1], :].dot(W) 

def getMatmat(X): 
    numJobs = multiprocessing.cpu_count() 
    rowInds = numpy.array(numpy.linspace(0, X.shape[0], numJobs+1), numpy.int) 

    #Store the data in X as RawArray objects so we can share it amoung processes 
    XData = multiprocessing.RawArray("d", X.data) 
    XIndices = multiprocessing.RawArray("i", X.indices) 
    XIndptr = multiprocessing.RawArray("i", X.indptr) 

    def matmat(W): 
     WArray = multiprocessing.RawArray("d", W.flatten()) 
     pool = multiprocessing.Pool(processes=multiprocessing.cpu_count(), initializer=initProcess, initargs=(XData, XIndices, XIndptr, X.shape, WArray, W.shape)) 
     params = [] 

     for i in range(numJobs): 
      params.append((rowInds, i)) 

     iterator = pool.map(dot2, params) 
     P = numpy.zeros((X.shape[0], W.shape[1])) 

     for i in range(numJobs): 
      P[rowInds[i]:rowInds[i+1], :] = iterator[i] 

     return P 

    return matmat 

if __name__ == '__main__': 
    #Create a random sparse matrix X and a random dense one W  
    X = scipy.sparse.rand(10000, 8000, 0.1) 
    X = X.tocsr() 
    W = numpy.random.rand(8000, 20) 

    startTime = time.time() 
    A = getMatmat(X)(W) 
    parallelTime = time.time()-startTime 

    startTime = time.time() 
    B = X.dot(W) 
    nonParallelTime = time.time()-startTime 

    print(parallelTime, nonParallelTime) 

Tuttavia l'uscita è qualcosa di simile: (4.431, 0.165) che indica la versione parallela è molto più lento di moltiplicazione non parallele.

Credo che il rallentamento possa essere causato in situazioni simili quando si copiano dati di grandi dimensioni sui processi, ma questo non è il caso in cui utilizzo Array per memorizzare le variabili condivise (a meno che non avvenga in numpy.frombuffer o quando creando una csr_matrix, ma poi non sono riuscito a trovare un modo per condividere direttamente una csr_matrix). Un'altra possibile causa della bassa velocità sta restituendo un grande risultato di ogni moltiplicazione di matrice per ogni processo, ma non sono sicuro di come aggirarlo.

Qualcuno può vedere dove sto andando male? Grazie per l'aiuto!

Aggiornamento: Non posso essere sicuro, ma credo che condividere grandi quantità di dati tra processi non sia così efficiente, e idealmente dovrei usare il multithreading (anche se il Global Interpreter Lock (GIL) lo rende molto difficile). Un modo per aggirare questo è rilasciare il GIL usando Cython per esempio (vedi http://docs.cython.org/src/userguide/parallelism.html), anche se molte delle funzioni di numpy devono passare attraverso GIL.

+0

Hai numpy/scipy collegato a un build ATLAS ottimizzato con multithreading?Se lo fai, dovresti ottenere la moltiplicazione della matrice parallela gratuitamente quando usi np.dot. –

+1

Sto usando una libreria BLAS multithread (OpenBLAS) collegata a numpy/scipy ma ho provato X.dot (W) e numpy.dot (X, W) (quest'ultimo non funziona per X sparse) e questo non è parallelised. – Charanpal

risposta

1

La soluzione migliore è passare a C con Cython. In questo modo puoi battere GIL e usare OpenMP. Non mi sorprende che il multiprocessing sia più lento - ci sono molti overhead lì.

Ecco un involucro ingenuo wrapper OpenMP della matrice sparsa di CSparse - codice di prodotto vettoriale in python.

Sul mio portatile, questo funziona un po 'più veloce di scipy. Ma non ho tanti core. Il codice, incluso lo script setup.py e i file header C e roba è in questo elenco: https://gist.github.com/rmcgibbo/6019670

Sospetto che se si vuole veramente che il codice parallelo sia veloce (sul mio portatile, è solo circa il 20% più veloce rispetto a scipy a thread singolo, anche quando si utilizzano 4 thread), è necessario riflettere più attentamente su dove si verifica il parallelismo rispetto a quello che ho fatto, prestando attenzione alla localizzazione della cache.

# psparse.pyx 

#----------------------------------------------------------------------------- 
# Imports 
#----------------------------------------------------------------------------- 
cimport cython 
cimport numpy as np 
import numpy as np 
import scipy.sparse 
from libc.stddef cimport ptrdiff_t 
from cython.parallel import parallel, prange 

#----------------------------------------------------------------------------- 
# Headers 
#----------------------------------------------------------------------------- 

ctypedef int csi 

ctypedef struct cs: 
    # matrix in compressed-column or triplet form 
    csi nzmax  # maximum number of entries 
    csi m   # number of rows 
    csi n   # number of columns 
    csi *p   # column pointers (size n+1) or col indices (size nzmax) 
    csi *i   # row indices, size nzmax 
    double *x  # numerical values, size nzmax 
    csi nz   # # of entries in triplet matrix, -1 for compressed-col 

cdef extern csi cs_gaxpy (cs *A, double *x, double *y) nogil 
cdef extern csi cs_print (cs *A, csi brief) nogil 

assert sizeof(csi) == 4 

#----------------------------------------------------------------------------- 
# Functions 
#----------------------------------------------------------------------------- 

@cython.boundscheck(False) 
def pmultiply(X not None, np.ndarray[ndim=2, mode='fortran', dtype=np.float64_t] W not None): 
    """Multiply a sparse CSC matrix by a dense matrix 

    Parameters 
    ---------- 
    X : scipy.sparse.csc_matrix 
     A sparse matrix, of size N x M 
    W : np.ndarray[dtype=float564, ndim=2, mode='fortran'] 
     A dense matrix, of size M x P. Note, W must be contiguous and in 
     fortran (column-major) order. You can ensure this using 
     numpy's `asfortranarray` function. 

    Returns 
    ------- 
    A : np.ndarray[dtype=float64, ndim=2, mode='fortran'] 
     A dense matrix, of size N x P, the result of multiplying X by W. 

    Notes 
    ----- 
    This function is parallelized over the columns of W using OpenMP. You 
    can control the number of threads at runtime using the OMP_NUM_THREADS 
    environment variable. The internal sparse matrix code is from CSPARSE, 
    a Concise Sparse matrix package. Copyright (c) 2006, Timothy A. Davis. 
    http://www.cise.ufl.edu/research/sparse/CSparse, licensed under the 
    GNU LGPL v2.1+. 

    References 
    ---------- 
    .. [1] Davis, Timothy A., "Direct Methods for Sparse Linear Systems 
    (Fundamentals of Algorithms 2)," SIAM Press, 2006. ISBN: 0898716136 
    """ 
    if X.shape[1] != W.shape[0]: 
     raise ValueError('matrices are not aligned') 

    cdef int i 
    cdef cs csX 
    cdef np.ndarray[double, ndim=2, mode='fortran'] result 
    cdef np.ndarray[csi, ndim=1, mode = 'c'] indptr = X.indptr 
    cdef np.ndarray[csi, ndim=1, mode = 'c'] indices = X.indices 
    cdef np.ndarray[double, ndim=1, mode = 'c'] data = X.data 

    # Pack the scipy data into the CSparse struct. This is just copying some 
    # pointers. 
    csX.nzmax = X.data.shape[0] 
    csX.m = X.shape[0] 
    csX.n = X.shape[1] 
    csX.p = &indptr[0] 
    csX.i = &indices[0] 
    csX.x = &data[0] 
    csX.nz = -1 # to indicate CSC format 

    result = np.zeros((X.shape[0], W.shape[1]), order='F', dtype=np.double) 
    for i in prange(W.shape[1], nogil=True): 
     # X is in fortran format, so we can get quick access to each of its 
     # columns 
     cs_gaxpy(&csX, &W[0, i], &result[0, i]) 

    return result 

Chiama qualche C da CSparse.

// src/cs_gaxpy.c 

#include "cs.h" 
/* y = A*x+y */ 
csi cs_gaxpy (const cs *A, const double *x, double *y) 
{ 
    csi p, j, n, *Ap, *Ai ; 
    double *Ax ; 
    if (!CS_CSC (A) || !x || !y) return (0) ;  /* check inputs */ 
    n = A->n ; Ap = A->p ; Ai = A->i ; Ax = A->x ; 
    for (j = 0 ; j < n ; j++) 
    { 
     for (p = Ap [j] ; p < Ap [j+1] ; p++) 
     { 
     y [Ai [p]] += Ax [p] * x [j] ; 
     } 
    } 
    return (1) ; 
} 
+0

Grazie per questa risposta! Avevo pensieri simili e avevo scritto un prodotto punto Cython/OpenMP basato su Eigen (vedi pdot2d di https://github.com/charanpald/sppy/blob/master/sppy/csarray_base.pyx). Qui, ho diviso le righe di X in blocchi cpu_count e viene eseguito circa 2 volte più velocemente sulla mia macchina a 8 core (sono sicuro che può essere migliorato però). Mi confronterò con la tua soluzione non appena risolvo alcuni problemi con la compilazione. – Charanpal

1

Forse un po 'in ritardo con la risposta. Potrebbe essere possibile ottenere accelerazioni parallele affidabili usando il pacchetto pyTrilinos che fornisce wrapper python a molte funzioni in Trilinos. Ecco il tuo esempio convertite ad usare pyTrilinos:

from PyTrilinos import Epetra 
from scipy.sparse import rand 
import numpy as np 

n_rows = 10000 
n_cols = 8000 
n_vecs = 20 
fill_factor = 0.1 

comm = Epetra.PyComm() 
my_id = comm.MyPID() 

row_map = Epetra.Map(n_rows, 0, comm) 
out_vec_map = row_map 
in_vec_map = Epetra.Map(n_cols, 0, comm) 
col_map = Epetra.Map(n_cols, range(n_cols), 0, comm) 

n_local_rows = row_map.NumMyElements() 

# Create local block matrix in scipy and convert to Epetra 
X = rand(n_local_rows, n_cols, fill_factor).tocoo() 

A = Epetra.CrsMatrix(Epetra.Copy, row_map, col_map, int(fill_factor*n_cols*1.2), True) 
A.InsertMyValues(X.row, X.col, X.data) 
A.FillComplete() 

# Create sub-vectors in numpy and convert to Epetra format 
W = np.random.rand(in_vec_map.NumMyElements(), n_vecs) 
V = Epetra.MultiVector(in_vec_map, n_vecs) 

V[:] = W.T # order of indices is opposite 

B = Epetra.MultiVector(out_vec_map, n_vecs) 

# Multiply 
A.Multiply(False, V, B) 

È quindi possibile eseguire questo codice usando MPI

mpiexec -n 2 python scipy_to_trilinos.py 

Altri esempi di PyTrilinos possono essere trovati sul repository github here. Naturalmente se si dovesse usare pyTrilinos, questo modo di inizializzare la matrice usando scipy potrebbe non essere il più ottimale.