2011-12-18 7 views
5

Questo codice traspone una matrice di quattro modi. Il primo fa scritture sequenziali, letture non sequenziali. Il secondo è il contrario. I prossimi due sono gli stessi, ma con le annotazioni di salto della cache. Ciò che sembra accadere è che le scritture sequenziali sono più veloci e saltare la cache è più veloce. Quello che non capisco è, se la cache viene saltata, perché le scritture sequenziali sono ancora più veloci?utilizzo della cache in matrice di recepire in c

QueryPerformanceCounter(&before); 
for (i = 0; i < N; ++i) 
    for (j = 0; j < N; ++j) 
     tmp[i][j] = mul2[j][i]; 
QueryPerformanceCounter(&after); 
printf("Transpose 1:\t%ld\n", after.QuadPart - before.QuadPart); 

QueryPerformanceCounter(&before); 
for (j = 0; j < N; ++j) 
    for (i = 0; i < N; ++i) 
    tmp[i][j] = mul2[j][i]; 
QueryPerformanceCounter(&after); 
printf("Transpose 2:\t%ld\n", after.QuadPart - before.QuadPart); 

QueryPerformanceCounter(&before); 
for (i = 0; i < N; ++i) 
    for (j = 0; j < N; ++j) 
     _mm_stream_si32(&tmp[i][j], mul2[j][i]); 
QueryPerformanceCounter(&after); 
printf("Transpose 3:\t%ld\n", after.QuadPart - before.QuadPart); 

QueryPerformanceCounter(&before); 
for (j = 0; j < N; ++j) 
    for (i = 0; i < N; ++i) 
     _mm_stream_si32(&tmp[i][j], mul2[j][i]); 
QueryPerformanceCounter(&after); 
printf("Transpose 4:\t%ld\n", after.QuadPart - before.QuadPart); 

EDIT: L'uscita è

Transpose 1: 47603 
Transpose 2: 92449 
Transpose 3: 38340 
Transpose 4: 69597 
+0

Puoi mostrarci alcuni numeri e le dimensioni che stai testando? – Mysticial

+0

La cache deve essere aggiornato sulle scritture, se le linee della cache associate vengono caricati (che molto probabilmente sono in un semplice caso di test). –

+0

I risultati di questi test dipendono dal contenuto corrente della cache e gli errori di cache/hit possono influenzare i risultati. Forse vale la pena di ripetere i test, ma ogni volta si avvia da uno stato di cache noto. forse invalidando la cache prima che ogni test possa aiutare. –

risposta

4

CPU ha una combinazione scrittura buffer per combinare scrive su una linea di cache per accadere in uno scoppio. In questo caso (cache essendo saltato in scrittura sequenziale), questa combinazione scrittura tampone atti come cache una linea che rende i risultati essere molto simile alla cache non essere saltato.

Per essere precisi, in caso di ignoranza della cache, le scritture continuano a verificarsi a raffica in memoria.

Vedere il comportamento write-combining logic qui.

0

È possibile provare layout di memoria non lineare per la matrice per migliorare l'utilizzo della cache. Con le tessere float a 32 bit a 32 bit è possibile eseguire la trasposizione con un solo accesso a ciascuna linea cache. Inoltre come trasposizione di tessere bonus potrebbe essere fatto facilmente con _MM_TRANSPOSE4_PS.

Trasposizione un grande matrice è ancora funzionamento intensivo memoria molto. Rimarrà comunque molto limitata la larghezza di banda, ma almeno il carico di parole nella cache è quasi ottimale. Non so se le prestazioni potrebbero essere ancora ottimizzate. I miei test dimostrano che un laptop di pochi anni riesce a trasporre 16k * 16k (memoria 1G) in circa 300ms.

Ho provato a utilizzare anche _mm_stream_sd ma in realtà peggiora le prestazioni per qualche motivo. Non capisco abbastanza bene le scritture di memoria non temporali per avere qualche ipotesi pratica sul perché le prestazioni scendano con _mm_stream_ps. La ragione possibile è ovviamente che la cache line è già nella cache L1 pronta per l'operazione di scrittura.

Ma in realtà una parte importante con matrice non lineare potrebbe evitare la trasposizione completamente e semplice eseguire la moltiplicazione nell'ordine delle tessere. Ma ho solo un codice transpose che sto usando per migliorare le mie conoscenze sulla gestione della cache negli algoritmi.

Non ho ancora provato a verificare se il prefetching migliorerebbe l'utilizzo della larghezza di banda della memoria. codice corrente funziona a circa 0,5 istruzioni per ciclo (buona cache di codice amichevole gestisce circa 2 ins per ciclo su questa CPU) che lascia un sacco di cicli gratuiti per le istruzioni di prefetch che consente il calcolo anche abbastanza complesse per ottimizzare i tempi prefetching in fase di esecuzione.

codice di esempio dal mio test benchmark trasposizione segue.

#define MATSIZE 16384 
#define align(val, a) (val + (a - val % a)) 
#define tilewidth 4 
typedef int matrix[align(MATSIZE,tilewidth)*MATSIZE] __attribute__((aligned(64))); 


float &index(matrix m, unsigned i, unsigned j) 
{ 
    /* tiled address calculation */ 
    /* single cache line is used for 4x4 sub matrices (64 bytes = 4*4*sizeof(int) */ 
    /* tiles are arranged linearly from top to bottom */ 
    /* 
    * eg: 16x16 matrix tile positions: 
    * t1 t5 t9 t13 
    * t2 t6 t10 t14 
    * t3 t7 t11 t15 
    * t4 t8 t12 t16 
    */ 
    const unsigned tilestride = tilewidth * MATSIZE; 
    const unsigned comp0 = i % tilewidth; /* i inside tile is least significant part */ 
    const unsigned comp1 = j * tilewidth; /* next part is j multiplied by tile width */ 
    const unsigned comp2 = i/tilewidth * tilestride; 
    const unsigned add = comp0 + comp1 + comp2; 
    return m[add]; 
} 

/* Get start of tile reference */ 
float &tile(matrix m, unsigned i, unsigned j) 
{ 
    const unsigned tilestride = tilewidth * MATSIZE; 
    const unsigned comp1 = j * tilewidth; /* next part is j multiplied by tile width */ 
    const unsigned comp2 = i/tilewidth * tilestride; 
    return m[comp1 + comp2]; 

} 
template<bool diagonal> 
static void doswap(matrix mat, unsigned i, unsigned j) 
{ 
     /* special path to swap whole tile at once */ 
     union { 
      float *fs; 
      __m128 *mm; 
     } src, dst; 
     src.fs = &tile(mat, i, j); 
     dst.fs = &tile(mat, j, i); 
     if (!diagonal) { 
      __m128 srcrow0 = src.mm[0]; 
      __m128 srcrow1 = src.mm[1]; 
      __m128 srcrow2 = src.mm[2]; 
      __m128 srcrow3 = src.mm[3]; 
      __m128 dstrow0 = dst.mm[0]; 
      __m128 dstrow1 = dst.mm[1]; 
      __m128 dstrow2 = dst.mm[2]; 
      __m128 dstrow3 = dst.mm[3]; 

      _MM_TRANSPOSE4_PS(srcrow0, srcrow1, srcrow2, srcrow3); 
      _MM_TRANSPOSE4_PS(dstrow0, dstrow1, dstrow2, dstrow3); 

#if STREAMWRITE == 1 
      _mm_stream_ps(src.fs + 0, dstrow0); 
      _mm_stream_ps(src.fs + 4, dstrow1); 
      _mm_stream_ps(src.fs + 8, dstrow2); 
      _mm_stream_ps(src.fs + 12, dstrow3); 
      _mm_stream_ps(dst.fs + 0, srcrow0); 
      _mm_stream_ps(dst.fs + 4, srcrow1); 
      _mm_stream_ps(dst.fs + 8, srcrow2); 
      _mm_stream_ps(dst.fs + 12, srcrow3); 
#else 
      src.mm[0] = dstrow0; 
      src.mm[1] = dstrow1; 
      src.mm[2] = dstrow2; 
      src.mm[3] = dstrow3; 
      dst.mm[0] = srcrow0; 
      dst.mm[1] = srcrow1; 
      dst.mm[2] = srcrow2; 
      dst.mm[3] = srcrow3; 
#endif 
     } else { 
      __m128 srcrow0 = src.mm[0]; 
      __m128 srcrow1 = src.mm[1]; 
      __m128 srcrow2 = src.mm[2]; 
      __m128 srcrow3 = src.mm[3]; 

      _MM_TRANSPOSE4_PS(srcrow0, srcrow1, srcrow2, srcrow3); 

#if STREAMWRITE == 1 
      _mm_stream_ps(src.fs + 0, srcrow0); 
      _mm_stream_ps(src.fs + 4, srcrow1); 
      _mm_stream_ps(src.fs + 8, srcrow2); 
      _mm_stream_ps(src.fs + 12, srcrow3); 
#else 
      src.mm[0] = srcrow0; 
      src.mm[1] = srcrow1; 
      src.mm[2] = srcrow2; 
      src.mm[3] = srcrow3; 
#endif 
     } 
    } 
} 

static void transpose(matrix mat) 
{ 
    const unsigned xstep = 256; 
    const unsigned ystep = 256; 
    const unsigned istep = 4; 
    const unsigned jstep = 4; 
    unsigned x1, y1, i, j; 
    /* need to increment x check for y limit to allow unrolled inner loop 
    * access entries close to diagonal axis 
    */ 
    for (x1 = 0; x1 < MATSIZE - xstep + 1 && MATSIZE > xstep && xstep; x1 += xstep) 
     for (y1 = 0; y1 < std::min(MATSIZE - ystep + 1, x1 + 1); y1 += ystep) 
      for (i = x1 ; i < x1 + xstep; i += istep) { 
       for (j = y1 ; j < std::min(y1 + ystep, i); j+= jstep) 
       { 
        doswap<false>(mat, i, j); 
       } 
       if (i == j && j < (y1 + ystep)) 
        doswap<true>(mat, i, j); 
      } 

    for (i = 0 ; i < x1; i += istep) { 
     for (j = y1 ; j < std::min(MATSIZE - jstep + 1, i); j+= jstep) 
     { 
      doswap<false>(mat, i, j); 
     } 
     if (i == j) 
      doswap<true>(mat, i, j); 
    } 
    for (i = x1 ; i < MATSIZE - istep + 1; i += istep) { 
     for (j = y1 ; j < std::min(MATSIZE - jstep + 1, i); j+= jstep) 
     { 
      doswap<false>(mat, i, j); 
     } 
     if (i == j) 
      doswap<true>(mat, i, j); 
    } 
    x1 = MATSIZE - MATSIZE % istep; 
    y1 = MATSIZE - MATSIZE % jstep; 

    for (i = x1 ; i < MATSIZE; i++) 
     for (j = 0 ; j < std::min((unsigned)MATSIZE, i); j++) 
        std::swap(index(mat, i, j+0), index(mat, j+0, i)); 

    for (i = 0; i < x1; i++) 
     for (j = y1 ; j < std::min((unsigned)MATSIZE, i) ; j++) 
        std::swap(index(mat, i, j+0), index(mat, j+0, i)); 
}