risposta

6
import org.apache.spark.mllib.linalg.{Vectors,Vector,Matrix,SingularValueDecomposition,DenseMatrix,DenseVector} 
import org.apache.spark.mllib.linalg.distributed.RowMatrix 

def computeInverse(X: RowMatrix): DenseMatrix = { 
    val nCoef = X.numCols.toInt 
    val svd = X.computeSVD(nCoef, computeU = true) 
    if (svd.s.size < nCoef) { 
    sys.error(s"RowMatrix.computeInverse called on singular matrix.") 
    } 

    // Create the inv diagonal matrix from S 
    val invS = DenseMatrix.diag(new DenseVector(svd.s.toArray.map(x => math.pow(x,-1)))) 

    // U cannot be a RowMatrix 
    val U = new DenseMatrix(svd.U.numRows().toInt,svd.U.numCols().toInt,svd.U.rows.collect.flatMap(x => x.toArray)) 

    // If you could make V distributed, then this may be better. However its alreadly local...so maybe this is fine. 
    val V = svd.V 
    // inv(X) = V*inv(S)*transpose(U) --- the U is already transposed. 
    (V.multiply(invS)).multiply(U) 
    } 
3

ho avuto problemi di utilizzare questa funzione con l'opzione

conf.set("spark.sql.shuffle.partitions", "12") 

Le righe matriceRiga GOT mescolate.

Ecco un aggiornamento che ha funzionato per me

import org.apache.spark.mllib.linalg.{DenseMatrix,DenseVector} 
import org.apache.spark.mllib.linalg.distributed.IndexedRowMatrix 

def computeInverse(X: IndexedRowMatrix) 
: DenseMatrix = 
{ 
    val nCoef = X.numCols.toInt 
    val svd = X.computeSVD(nCoef, computeU = true) 
    if (svd.s.size < nCoef) { 
    sys.error(s"IndexedRowMatrix.computeInverse called on singular matrix.") 
    } 

    // Create the inv diagonal matrix from S 
    val invS = DenseMatrix.diag(new DenseVector(svd.s.toArray.map(x => math.pow(x, -1)))) 

    // U cannot be a RowMatrix 
    val U = svd.U.toBlockMatrix().toLocalMatrix().multiply(DenseMatrix.eye(svd.U.numRows().toInt)).transpose 

    val V = svd.V 
    (V.multiply(invS)).multiply(U) 
} 
0

Matrix U restituito dal X.computeSVD ha dimensioni MXK dove m è il numero di righe dell'originale (distribuito) matriceRiga X. Ci si aspetterebbe m deve essere grande (possibilmente più grande di k), quindi non è consigliabile raccoglierlo nel driver se vogliamo che il nostro codice riduca a valori molto grandi di m.

Direi che entrambe le soluzioni sottostanti soffrono di questo difetto. La risposta data da @Alexander Kharlamov chiama val U = svd.U.toBlockMatrix().toLocalMatrix() che raccoglie la matrice nel driver. Lo stesso accade con la risposta data da @Climbs_lika_Spyder (btw your nick rocks !!), che chiama svd.U.rows.collect.flatMap(x => x.toArray). Vorrei piuttosto suggerire di fare affidamento su una moltiplicazione di matrici distribuite come il codice Scala pubblicato here.

+0

Non vedo calcoli inversi al collegamento che hai aggiunto. –

+0

@Climbs_lika_Spyder Il collegamento riguarda la moltiplicazione della matrice distribuita per sostituire la moltiplicazione della matrice locale '(V.multiply (invS)). Moltiplica (U)' nell'ultima riga della soluzione, in modo da non dover raccogliere 'U' nel driver. Penso che 'V' e' invS' non siano abbastanza grandi da causare problemi. – Pablo