2015-07-22 13 views
8

Ho un DataFrame in Apache Spark con una matrice di numeri interi, la sorgente è un insieme di immagini. Alla fine voglio fare PCA su di esso, ma sto avendo problemi solo creando una matrice dai miei array. Come posso creare una matrice da un RDD?Apache Spark: come creare una matrice da un DataFrame?

> imagerdd = traindf.map(lambda row: map(float, row.image)) 
> mat = DenseMatrix(numRows=206456, numCols=10, values=imagerdd) 
Traceback (most recent call last): 

    File "<ipython-input-21-6fdaa8cde069>", line 2, in <module> 
mat = DenseMatrix(numRows=206456, numCols=10, values=imagerdd) 

    File "/usr/local/spark/current/python/lib/pyspark.zip/pyspark/mllib/linalg.py", line 815, in __init__ 
values = self._convert_to_array(values, np.float64) 

    File  "/usr/local/spark/current/python/lib/pyspark.zip/pyspark/mllib/linalg.py", line 806, in _convert_to_array 
    return np.asarray(array_like, dtype=dtype) 

    File "/usr/local/python/conda/lib/python2.7/site-  packages/numpy/core/numeric.py", line 462, in asarray 
    return array(a, dtype, copy=False, order=order) 

TypeError: float() argument must be a string or a number 

che sto ottenendo lo stesso errore da ogni possibile accordo mi viene in mente:

imagerdd = traindf.map(lambda row: Vectors.dense(row.image)) 
imagerdd = traindf.map(lambda row: row.image) 
imagerdd = traindf.map(lambda row: np.array(row.image)) 

Se provo

> imagedf = traindf.select("image") 
> mat = DenseMatrix(numRows=206456, numCols=10, values=imagedf) 

Traceback (chiamata più recente scorso):

File "<ipython-input-26-a8cbdad10291>", line 2, in <module> 
mat = DenseMatrix(numRows=206456, numCols=10, values=imagedf) 

    File "/usr/local/spark/current/python/lib/pyspark.zip/pyspark/mllib/linalg.py", line 815, in __init__ 
    values = self._convert_to_array(values, np.float64) 

    File "/usr/local/spark/current/python/lib/pyspark.zip/pyspark/mllib/linalg.py", line 806, in _convert_to_array 
    return np.asarray(array_like, dtype=dtype) 

    File "/usr/local/python/conda/lib/python2.7/site-packages/numpy/core/numeric.py", line 462, in asarray 
    return array(a, dtype, copy=False, order=order) 

ValueError: setting an array element with a sequence. 

risposta

7

Dato che non hai fornito un esempio di input presumo che appaia più o meno come questo dove id è un numero di riga e image contiene valori.

traindf = sqlContext.createDataFrame([ 
    (1, [1, 2, 3]), 
    (2, [4, 5, 6]), 
    (3, (7, 8, 9)) 
], ("id", "image")) 

prima cosa che dovete capire è che il DenseMatrix è una struttura dati locali. Per essere precisi è un involucro intorno a numpy.ndarray. Per ora (Spark 1.4.1) non ci sono equivalenti distribuiti in PySpark MLlib.

Dense Matrix prendere tre argomenti obbligatori numRows, numCols, values dove values è una struttura dati locale. Nel tuo caso devi raccogliere prima:

values = (traindf. 
    rdd. 
    map(lambda r: (r.id, r.image)). # Extract row id and data 
    sortByKey(). # Sort by row id 
    flatMap(lambda (id, image): image). 
    collect()) 


ncol = len(traindf.rdd.map(lambda r: r.image).first()) 
nrow = traindf.count() 

dm = DenseMatrix(nrow, ncol, values) 

Infine:

> print dm.toArray() 
[[ 1. 4. 7.] 
[ 2. 5. 8.] 
[ 3. 6. 9.]] 

Edit:

In Spark 1.5 + è possibile utilizzare mllib.linalg.distributed come segue:

from pyspark.mllib.linalg.distributed import IndexedRow, IndexedRowMatrix 

mat = IndexedRowMatrix(traindf.map(lambda row: IndexedRow(*row))) 
mat.numRows() 
## 4 
mat.numCols() 
## 3 

anche se per ora l'API è ancora limitata a b E utile in pratica.

+0

Sai come fare lo stesso in scala? https://stackoverflow.com/questions/47010126/calculate-cosine-similarity-spark-dataframe –