2013-08-16 28 views
7

Supponiamo che rappresenti un vettore di caratteristiche usando un dizionario (perché? Perché so che le caratteristiche sono sparse, ma, ne parleremo più avanti).Come calcolare in modo efficiente il prodotto interno di due dizionari

Come devo implementare il prodotto interno di due di tali dizionari (denotata, A, B)

ho provato l'approccio naive:

for k in A: 
    if k in B: 
    sum += A[k] * B[k] 

ma risulta essere lenta.

Alcuni dettagli:

  • sto usando un dizionario per rappresentare le caratteristiche perché

    1. I tasti funzione sono stringhe
    2. Ci sono ~ 20K possibili chiavi
    3. Ogni vettore è scarso (diciamo circa 1000 elementi diversi da zero).
  • Sono davvero interessato a calcolare il prodotto interno a coppie di N = 2000 dizionari diversi (ovvero il loro kernel lineare).

+0

Si potrebbe provare 'per k, v in A.iteritems(): sum + = v * B.get (k, 0)' – val

+0

Questo potrebbe essere ovvio, ma io Ho pensato di aggiungerlo comunque: stai facendo in modo che "A" abbia il più piccolo 'len()' dei due dizionari? A seconda delle circostanze, ciò potrebbe avere un enorme impatto sulla velocità. (E poiché l'operazione è commutativa non avrebbe alcun impatto sulla risposta.) – Noah

risposta

1

Ecco la mia risposta (A seguito su un suggerimento da @ Valentin-Clement):

Per prima cosa ho avvolgere un dok_matrix scipy.sparse. L'idea è di assegnare a ciascuna delle caratteristiche possibili un indice.

import scipy.sparse as sps 
import numpy as np 

class MSK: 
    # DD is a dict of dict, whose values are of type float. 
    # features - the set of possible features keys 
    def __init__(self, DD, features): 
     self.features = {k: j for (j, k) in enumerate(features)} 
     self.strings = DD.keys() 
     n = len(self.strings) 
     d = len(self.features) 
     self.M = sps.dok_matrix((n, d), dtype=np.float64) 
     for (i, s) in enumerate(self.strings): 
      v = DD[s] 
      for k in v: 
       j = self.features[k] 
       self.M[i, j] = v[k] 

E test utilizzando il codice seguente, dove il numero di elementi è 800, la dimensionalità è anche 800, ma la scarsità è 200 (esattamente 200 elementi sono non-zero).

np.random.seed(1) 
N = 800 
DD = dict() 
R = range(N) 
for i in xrange(N): 
    DD[i] = dict() 
    S = np.random.permutation(R) 
    S = S[:N/4] 
    for j in S: 
     DD[i][j] = np.random.randn(1)[0] 

K = MSK(DD, R) 
import cProfile 
cProfile.runctx("A = K.M * K.M.T", globals(), locals()) 
print A.todense() 

L'output è:

2080520 function calls (2080519 primitive calls) in 2.884 seconds 

permette di dire 3 secondi. La mia ingenua implementazione (che usa l'if-statement di @ Joowani) richiede circa 19 secondi.

(MSK sta per MatrixSparseKeys)

+0

FYI, sostituendo dok_matrix() con lil_matrix() possiamo ottenere prestazioni ancora migliori. –

6
Non

sicuro di più veloce, ma qui è un altro approccio:

keys = A.viewkeys() & B.viewkeys() 
the_sum = sum(a[k] * b[k] for k in keys) 
+2

Puoi usare 'A.keys() & B.keys()' su Python 3 e 'A.viewkeys() & B. viewkeys() 'su Python 2.7. –

+0

In realtà 'set (A) .intersection (B)' appare più veloce in alcuni benchmark che ho appena fatto. –

1

Si dovrebbe cercare di utilizzare al posto di namedtuples dict.

from collections import namedtuple 
A = dict 
B = dict 
_A = namedtuple('_A', A.keys()) 
_B = namedtuple('_B', B.keys()) 
DictA = _A(**A) 
DictB = _B(**B) 

e quindi usarli come dettato. maggiori informazioni namedtuples qui: What are "named tuples" in Python?

+0

Come accelera il calcolo? –

+0

in realtà namedtuples sono più veloci ed efficienti di dict (http://stackoverflow.com/questions/1336791/dictionary-vs-object-which-is-more-efficient-and-why), utilizzando che invece di dettare può migliorare il velocità di calcolo –

+0

Se si cercano i campi per nome, è ancora una ricerca del dizionario sotto il cofano. –

7

Hmm, sembra che il tuo approccio è in realtà il migliore per i vettori densi:

>>> # Eric's answer 
>>> timeit.timeit('sum([A[k]*B[k] for k in set(A.keys()) & set(B.keys())])', setup='A=dict((i,i) for i in xrange(100));B=dict((i,i) for i in xrange(100))', number=10000) 
0.4360210521285808 

>>> # My comment 
>>> timeit.timeit('for k,v in A.iteritems(): sum += v*B.get(k,0)', setup='A=dict((i,i) for i in xrange(100));B=dict((i,i) for i in xrange(100));sum=0', number=10000) 
0.4082838999682963 

# My comment, more compact 
>>> timeit.timeit('sum(v*B.get(k,0) for k,v in A.iteritems())', setup='A=dict((i,i) for i in xrange(100));B=dict((i,i) for i in xrange(100))', number=10000) 
0.38053266868496394 

>>> #Your approach 
>>> timeit.timeit('for k in A: sum += A[k]*B[k] if k in B else 0.', setup='A=dict((i,i) for i in xrange(100));B=dict((i,i) for i in xrange(100));sum=0', number=10000) 
0.35574231962510794 

>>> # Your approach, more compact 
>>> timeit.timeit('sum(A[k]*B[k] for k in A if k in B)', setup='A=dict((i,i) for i in xrange(100));B=dict((i,i) for i in xrange(100))', number=10000) 
0.3400850549682559 

Per i più rade, la risposta di Eric esegue meglio, ma la tua è ancora il più veloce:

# Mine 
>>> timeit.timeit('sum(v*B.get(k,0) for k,v in A.iteritems())', setup='import random;A=dict((i,i) for i in xrange(100) if random.random() < 0.3);B=dict((i,i) for i in xrange(100) if random.random() < 0.2)', number=10000) 
0.1390782696843189 

# Eric's 
>>> timeit.timeit('sum([A[k]*B[k] for k in set(A.keys()) & set(B.keys())])', setup='import random;A=dict((i,i) for i in xrange(100) if random.random() < 0.3);B=dict((i,i) for i in xrange(100) if random.random() < 0.2)', number=10000) 
0.11702822992151596 

# Yours 
>>> timeit.timeit('sum(A[k]*B[k] for k in A if k in B)', setup='import random;A=dict((i,i) for i in xrange(100) if random.random() < 0.3);B=dict((i,i) for i in xrange(100) if random.random() < 0.2)', number=10000) 
0.07878250570843193 

EDIT

Dopo fare in giro f o un po ', sembra che sum([x for x ...]) sia significativamente più veloce di sum(x for x in ...). Rebenchmarking con questo e l'osservazione di Janne per le chiavi di risposta di Eric, il tuo è ancora in cima (con Joowani di dare un leggero miglioramento):

>>> timeit.timeit('sum([v*B.get(k,0) for k,v in A.items()])', setup='import random;A=dict((i,i) for i in xrange(100) if random.random() < 0.3);B=dict((i,i) for i in xrange(100) if random.random() < 0.2)', number=100000) 
1.1604375791416714 
>>> timeit.timeit('sum([A[k]*B[k] for k in A.viewkeys() & B.viewkeys()])', setup='import random;A=dict((i,i) for i in xrange(100) if random.random() < 0.3);B=dict((i,i) for i in xrange(100) if random.random() < 0.2)', number=100000) 
0.9234189571552633 
>>> timeit.timeit('sum([A[k]*B[k] for k in A if k in B])', setup='import random;A=dict((i,i) for i in xrange(100) if random.random() < 0.3);B=dict((i,i) for i in xrange(100) if random.random() < 0.2)', number=100000) 
0.5411289579401455 
>>> timeit.timeit('sum([A[k]*B[k] for k in A if k in B]) if len(A)<len(B) else sum([A[k]*B[k] for k in B if k in A])', setup='import random;A=dict((i,i) for i in xrange(100) if random.random() < 0.3);B=dict((i,i) for i in xrange(100) if random.random() < 0.2)', number=100000) 
0.5198972138696263 

Scaling di dimensioni molto grandi, si vede esattamente lo stesso modello:

>>> #Mine 
>>> timeit.timeit('sum([v*B.get(k,0) for k,v in A.iteritems()])', setup='import random;A=dict((i,i) for i in xrange(10000) if random.random() < 0.1);B=dict((i,i) for i in xrange(10000) if random.random() < 0.2)', number=100000) 
45.328807250833506 

>>> #Eric's 
>>> timeit.timeit('sum([A[k]*B[k] for k in A.viewkeys() & B.viewkeys()])', setup='import random;A=dict((i,i) for i in xrange(10000) if random.random() < 0.1);B=dict((i,i) for i in xrange(10000) if random.random() < 0.2)', number=100000) 
28.042937058640973 

>>> #Yours 
>>> timeit.timeit('sum([A[k]*B[k] for k in A if k in B])', setup='import random;A=dict((i,i) for i in xrange(10000) if random.random() < 0.1);B=dict((i,i) for i in xrange(10000) if random.random() < 0.2)', number=100000) 
16.55080344861699 

>>> #Joowani's 
>>> timeit.timeit('sum([A[k]*B[k] for k in A if k in B]) if len(A)<len(B) else sum([A[k]*B[k] for k in B if k in A])', setup='import random;A=dict((i,i) for i in xrange(10000) if random.random() < 0.1);B=dict((i,i) for i in xrange(10000) if random.random() < 0.2)', number=100000) 
15.485236119691308 

Penso che il trucco di Joowani non lo migliori significativamente qui perché i vettori hanno approssimativamente le stesse dimensioni, ma a seconda del problema (se alcuni vettori sono ridicolmente più piccoli di altri) questo potrebbe essere più significativo ...

modificare nuovamente

Ops, sembra che io avrei dovuto prendere un altro caffè prima di pubblicare ... Come ha sottolineato Eric (anche se ho completamente perso che ...), la definizione del campo di setup mantiene la lo stesso per tutte le prove, che non è davvero il modo migliore per fare un punto di riferimento. Con vettori casuali CORRETTE in fase di test, i risultati non sono significativamente diversi, ma per ragioni di completezza:

>>> timeit.timeit('mine(dict((i,i) for i in xrange(100) if random.random() < 0.3),dict((i,i) for i in xrange(100) if random.random() < 0.2))', setup='import random;joowanis=lambda A,B:sum([A[k]*B[k] for k in A if k in B]) if len(A)<len(B) else sum([A[k]*B[k] for k in B if k in A]);mine=lambda A,B:sum([v*B.get(k,0) for k,v in A.iteritems()]);erics=lambda A,B:sum([A[k]*B[k] for k in A.viewkeys() & B.viewkeys()]);yours=lambda A,B:sum([A[k]*B[k] for k in A if k in B])', number=100000) 
6.294158102577967 
>>> timeit.timeit('erics(dict((i,i) for i in xrange(100) if random.random() < 0.3),dict((i,i) for i in xrange(100) if random.random() < 0.2))', setup='import random;joowanis=lambda A,B:sum([A[k]*B[k] for k in A if k in B]) if len(A)<len(B) else sum([A[k]*B[k] for k in B if k in A]);mine=lambda A,B:sum([v*B.get(k,0) for k,v in A.iteritems()]);erics=lambda A,B:sum([A[k]*B[k] for k in A.viewkeys() & B.viewkeys()]);yours=lambda A,B:sum([A[k]*B[k] for k in A if k in B])', number=100000) 
6.068052507449011 
>>> timeit.timeit('yours(dict((i,i) for i in xrange(100) if random.random() < 0.3),dict((i,i) for i in xrange(100) if random.random() < 0.2))', setup='import random;joowanis=lambda A,B:sum([A[k]*B[k] for k in A if k in B]) if len(A)<len(B) else sum([A[k]*B[k] for k in B if k in A]);mine=lambda A,B:sum([v*B.get(k,0) for k,v in A.iteritems()]);erics=lambda A,B:sum([A[k]*B[k] for k in A.viewkeys() & B.viewkeys()]);yours=lambda A,B:sum([A[k]*B[k] for k in A if k in B])', number=100000) 
5.745110704570834 
>>> timeit.timeit('joowanis(dict((i,i) for i in xrange(100) if random.random() < 0.3),dict((i,i) for i in xrange(100) if random.random() < 0.2))', setup='import random;joowanis=lambda A,B:sum([A[k]*B[k] for k in A if k in B]) if len(A)<len(B) else sum([A[k]*B[k] for k in B if k in A]);mine=lambda A,B:sum([v*B.get(k,0) for k,v in A.iteritems()]);erics=lambda A,B:sum([A[k]*B[k] for k in A.viewkeys() & B.viewkeys()]);yours=lambda A,B:sum([A[k]*B[k] for k in A if k in B])', number=100000) 
5.737499445367575 

Per scalare:

>>> timeit.timeit('mine(dict((i,i) for i in xrange(10000) if random.random() < 0.1),dict((i,i) for i in xrange(10000) if random.random() < 0.2))', setup='import random;joowanis=lambda A,B:sum([A[k]*B[k] for k in A if k in B]) if len(A)<len(B) else sum([A[k]*B[k] for k in B if k in A]);mine=lambda A,B:sum([v*B.get(k,0) for k,v in A.iteritems()]);erics=lambda A,B:sum([A[k]*B[k] for k in A.viewkeys() & B.viewkeys()]);yours=lambda A,B:sum([A[k]*B[k] for k in A if k in B])', number=1000) 
5.0510995368395015 
>>> timeit.timeit('erics(dict((i,i) for i in xrange(10000) if random.random() < 0.1),dict((i,i) for i in xrange(10000) if random.random() < 0.2))', setup='import random;joowanis=lambda A,B:sum([A[k]*B[k] for k in A if k in B]) if len(A)<len(B) else sum([A[k]*B[k] for k in B if k in A]);mine=lambda A,B:sum([v*B.get(k,0) for k,v in A.iteritems()]);erics=lambda A,B:sum([A[k]*B[k] for k in A.viewkeys() & B.viewkeys()]);yours=lambda A,B:sum([A[k]*B[k] for k in A if k in B])', number=1000) 
4.350612399185138 
>>> timeit.timeit('yours(dict((i,i) for i in xrange(10000) if random.random() < 0.1),dict((i,i) for i in xrange(10000) if random.random() < 0.2))', setup='import random;joowanis=lambda A,B:sum([A[k]*B[k] for k in A if k in B]) if len(A)<len(B) else sum([A[k]*B[k] for k in B if k in A]);mine=lambda A,B:sum([v*B.get(k,0) for k,v in A.iteritems()]);erics=lambda A,B:sum([A[k]*B[k] for k in A.viewkeys() & B.viewkeys()]);yours=lambda A,B:sum([A[k]*B[k] for k in A if k in B])', number=1000) 
4.15619379016789 
>>> timeit.timeit('joowanis(dict((i,i) for i in xrange(10000) if random.random() < 0.1),dict((i,i) for i in xrange(10000) if random.random() < 0.2))', setup='import random;joowanis=lambda A,B:sum([A[k]*B[k] for k in A if k in B]) if len(A)<len(B) else sum([A[k]*B[k] for k in B if k in A]);mine=lambda A,B:sum([v*B.get(k,0) for k,v in A.iteritems()]);erics=lambda A,B:sum([A[k]*B[k] for k in A.viewkeys() & B.viewkeys()]);yours=lambda A,B:sum([A[k]*B[k] for k in A if k in B])', number=1000) 
4.185129374341159 

Penso che la linea di fondo è che non si può pretendere significativo aumento di velocità riordinando abilmente le tue espressioni per questo genere di cose ... Forse potresti provare a fare la parte numerica in C/Cython o usare il pacchetto Scipy's Sparse?

+0

Ho appena aggiornato il mio da 'set (d.keys())' a 'd.viewkeys()', che può dare un aumento di velocità – Eric

+0

Inoltre, non sono sicuro che questo test sia significativo se il dizionario iniziale è diverso per ogni snippet di codice – Eric

+0

Vero, ma penso che questo possa ancora darci un'idea approssimativa della velocità di ciascuno in media. – val

2

Nel caso in cui A sia molto più lungo di B, questo potrebbe essere d'aiuto?

if len(A) > len(B): 
    A, B = B, A 

for k in A: 
    if k in B: 
     the_sum += A[k] * B[k] 
+0

Wow è stato un veloce edit Eric, grazie! – Joohwan