Ho creato il codice per farlo in modo efficiente. Vengo quasi a una velocità di 4x (il meglio che puoi aspettarti con il doppio su AVX) usando AVX per un singolo thread. Ecco i risultati con oltre 2000, 32000 e 4000000 vettori a sei componenti su una matrice 6x6. Questi corrispondono approssimativamente alle dimensioni della cache L2, L3 e >> L3 sul mio sistema (ogni vettore utilizza 48 byte).
Modifica: ho aggiunto testo (e codice) alla fine per farlo con float. L'accelerazione è quasi 8x con AVX su un singolo thread.
i5-3317U (2 core ivy bridge)
compiled with: g++ m6.cpp -o m6 -O3 -mavx -fopenmp
nvec 2000, repeat 10000, 1 thread : time scalar/SIMD 3.95
nvec 32000, repeat 1000, 1 thread : time scalar/SIMD 3.53
nvec 4000000, repeat 10, 1 thread : time scalar/SIMD 3.28
1 thread for both the SIMD and non-SIMD functions
nvec 2000, repeat 10000, 2 threads: time scalar/SIMD 5.96
nvec 32000, repeat 1000, 2 threads: time scalar/SIMD 5.88
nvec 4000000, repeat 10, 2 threads: time scalar/SIMD 4.52
2 threads on the SIMD function vs. 1 thread on the non-SIMD function
compiled with g++ m6.cpp -o m6 -O3 -msse4.2 -fopenmp
nvec 2000, repeat 10000, 1 thread: time scalar/SIMD 2.15
nvec 32000, repeat 1000, 1 thread: time scalar/SIMD 2.06
nvec 4000000, repeat 10, 1 thread: time scalar/SIMD 2.00
L'algoritmo di base fa SIMD sulle xey vettori, NON sulla matrice 6x6. Un punto chiave è che i vettori xey devono essere matrici di blocchi di struttura di matrici. Questo è anche chiamato un array di struct of array (AoSoA) o struttura ibrida di array. Puoi leggere maggiori informazioni qui "Estensione di un linguaggio simile a C per la programmazione SIMD portatile" http://www.sci.utah.edu/~wald/Publications/2012///ppopp/download//vecimp.pdf
Di seguito è riportato il codice. La funzione aos2aosoa
converte le matrici di sei vettori componenti in matrici di SoA. L'applicazione dovrebbe generare i vettori in questo modulo (non eseguire la conversione) se si desidera ottenere il massimo vantaggio da SIMD. Questo codice utilizza la classe vettoriale Agner Fog http://www.agner.org/optimize/#vectorclass. Sono solo alcuni file di intestazione. Questo codice funzionerà anche con SSE (ma l'incremento è solo di circa 2x come previsto).
Un piccolo avvertimento, le matrici di vettori xey e risultati devono essere multipli di 4. Tuttavia, il numero di vettori non lo fa.
Compile come questo
g++ m6.cpp -o m6 -O3 -mavx -fopenmp //system with AVX
g++ m6.cpp -o m6 -O3 -msse4.2 -fopenmp //system with SSE
in Visual Studio put/arch: AVX nelle opzioni di linea compilatore commmand
Il codice:
#include <stdio.h>
#include <omp.h>
#include "vectorclass.h"
#include <stdlib.h>
double prod_scalar(double *x, double *M, double *y) {
double sum = 0.0f;
for(int i=0; i<6; i++) {
for(int j=0; j<6; j++) {
sum += x[i]*M[i*6 + j]*y[j];
}
}
return sum;
}
void prod_block4(double *x, double *M, double *y, double *result) {
Vec4d sum4 = 0.0f;
for(int i=0; i<6; i++) {
Vec4d x4 = Vec4d().load(&x[4*i]);
for(int j=0; j<6; j++) {
Vec4d y4 = Vec4d().load(&y[4*j]);
sum4 += x4*M[i*6 + j]*y4;
}
}
sum4.store(result);
}
void prod_block4_unroll2(double *x, double *M, double *y, double *result) {
Vec4d sum4_1 = 0.0f;
Vec4d sum4_2 = 0.0f;
Vec4d yrow[6];
for(int i=0; i<6; i++) {
yrow[i] = Vec4d().load(&y[4*i]);
}
for(int i=0; i<6; i++) {
Vec4d x4 = Vec4d().load(&x[4*i]);
for(int j=0; j<6; j+=2) {
sum4_1 += x4*M[i*6 + j]*yrow[j];
sum4_2 += x4*M[i*6 + j+1]*yrow[j+1];
}
}
sum4_1 += sum4_2;
sum4_1.store(result);
}
void loop_scalar(double *x, double *M, double *y, double *result, const int nvec) {
// #pragma omp parallel for
for(int i=0; i<nvec; i++) {
result[i] = prod_scalar(&x[6*i], M, &y[6*i]);
}
}
void loop_block4(double *x, double *M, double *y, double *result, const int nvec) {
// #pragma omp parallel for
for(int i=0; i<(nvec/4); i++) {
// prod_block4(&x[i*24], M, &y[i*24], &result[4*i]);
prod_block4_unroll2(&x[i*24], M, &y[i*24], &result[4*i]);
}
}
void aos2soa(double *in, double *out) {
int cnt = 0;
for(int i=0; i<6; i++) {
for(int j=0; j<4; j++) {
out[cnt++] = in[6*j + i];
}
}
}
void aos2aosoa(double *in, double *out, const int nvec) {
for(int i=0; i<(nvec/4); i++) {
aos2soa(&in[i*24], &out[i*24]);
}
}
double compare_results(double *r1, double * r2, const int nvec) {
double out = 0.0f;
for(int i=0; i<nvec; i++) {
out += r1[i] -r2[i];
//if(out!=0) printf("%f %f\n",r1[i], r2[i]);
}
return out;
}
void loop(const int nvec, const int repeat) {
double *M = (double*)_mm_malloc(6*6*sizeof(double),32);
double *x_aos = (double*)_mm_malloc(nvec*6*sizeof(double),32);
double *y_aos = (double*)_mm_malloc(nvec*6*sizeof(double),32);
double *x_aosoa = (double*)_mm_malloc(nvec*6*sizeof(double),32);
double *y_aosoa = (double*)_mm_malloc(nvec*6*sizeof(double),32);
double *results_scalar = (double*)_mm_malloc(nvec*sizeof(double),32);
double *results_vector = (double*)_mm_malloc(nvec*sizeof(double),32);
for(int i=0; i<6; i++) {
for(int j=0; j<6; j++) {
M[j*6 + i] = i*6 + j;
}
}
for(int i=0; i<(6*nvec); i++) {
double r1 = (double)rand()/RAND_MAX;
double r2 = (double)rand()/RAND_MAX;
//x_aos[i] = i;
x_aos[i] = r1;
//y_aos[i] = i;
y_aos[i] = r2;
}
aos2aosoa(x_aos, x_aosoa, nvec);
aos2aosoa(y_aos, y_aosoa, nvec);
double dtime;
dtime = omp_get_wtime();
for(int i=0; i<repeat; i++) {
loop_scalar(x_aos, M, y_aos, results_scalar, nvec);
}
dtime = omp_get_wtime() - dtime;
printf("time scalar %f\n", dtime);
double dtime_old = dtime;
dtime = omp_get_wtime();
for(int i=0; i<repeat; i++) {
loop_block4(x_aosoa, M, y_aosoa, results_vector, nvec);
}
dtime = omp_get_wtime() - dtime;
printf("time vector %f\n", dtime);
printf("time scalar/vector %f\n", dtime_old/dtime);
printf("difference %f\n", compare_results(results_scalar, results_vector, nvec));
_mm_free(M);
_mm_free(x_aos);
_mm_free(y_aos);
_mm_free(x_aosoa);
_mm_free(y_aosoa);
_mm_free(results_scalar);
_mm_free(results_vector);
}
int main() {
int nveca[3];
nveca[0] = 2000; // 2000*2*6*8 = 192kb //L2
nveca[1] = 32000; // 32000*2*6*8 = 3Mb //L3
nveca[2] = 4*1000000; //366Mb
int nrepeat[3];
nrepeat[0] = 10000;
nrepeat[1] = 1000;
nrepeat[2] = 10;
for(int i=0; i<3; i++) {
printf("nvec %d, repeat %d\n", nveca[i], nrepeat[i]);
loop(nveca[i], nrepeat[i]);
printf("\n");
}
}
Ecco i risultati per float.La spinta è di circa 8x
nvec 2000, repeat 10000, 1 thread: time scalar/SIMD 7.86
nvec 32000, repeat 1000, 1 thread: time scalar/SIMD 7.63
nvec 5000000, repeat 100, 1 thread: time scalar/SIMD 6.63
Ecco il codice per float anziché doppio
#include <stdio.h>
#include <omp.h>
#include "vectorclass.h"
#include <stdlib.h>
#define ROUND_DOWN(x, s) ((x) & ~((s)-1))
float prod_scalar(float *x, float *M, float *y) {
float sum = 0.0f;
for(int i=0; i<6; i++) {
for(int j=0; j<6; j++) {
sum += x[i]*M[i*6 + j]*y[j];
}
}
return sum;
}
float prod_scalar_unroll2(float *x, float *M, float *y) {
float sum_1 = 0.0f;
float sum_2 = 0.0f;
for(int i=0; i<6; i++) {
for(int j=0; j<6; j+=2) {
sum_1 += x[i]*M[i*6 + j]*y[j];
sum_2 += x[i]*M[i*6 + j+1]*y[j+1];
}
}
return sum_1 + sum_2;
}
void prod_block4(float *x, float *M, float *y, float *result) {
Vec8f sum4 = 0.0f;
for(int i=0; i<6; i++) {
Vec8f x4 = Vec8f().load(&x[8*i]);
for(int j=0; j<6; j++) {
Vec8f y4 = Vec8f().load(&y[8*j]);
sum4 += x4*M[i*6 + j]*y4;
}
}
sum4.store(result);
}
void prod_block4_unroll2(float *x, float *M, float *y, float *result) {
Vec8f sum4_1 = 0.0f;
Vec8f sum4_2 = 0.0f;
Vec8f yrow[6];
for(int i=0; i<6; i++) {
yrow[i] = Vec8f().load(&y[8*i]);
}
for(int i=0; i<6; i++) {
Vec8f x4 = Vec8f().load(&x[8*i]);
for(int j=0; j<6; j+=2) {
sum4_1 += x4*M[i*6 + j]*yrow[j];
sum4_2 += x4*M[i*6 + j+1]*yrow[j+1];
}
}
sum4_1 += sum4_2;
sum4_1.store(result);
}
void loop_scalar(float *x, float *M, float *y, float *result, const int nvec) {
// #pragma omp parallel for
for(int i=0; i<nvec; i++) {
result[i] = prod_scalar(&x[6*i], M, &y[6*i]);
//result[i] = prod_scalar_unroll2(&x[6*i], M, &y[6*i]);
}
}
void loop_SIMD(float *x, float *M, float *y, float *result, const int nvec) {
//#pragma omp parallel for schedule(static,256)
//#pragma omp parallel for schedule(static)
const int N = nvec/8;
//printf("chuck %d\n", N/2);
//omp_set_num_threads(2);
//#pragma omp parallel
{
//int nthreads = omp_get_num_threads();
//int ithread = omp_get_thread_num();
//int start = (ithread*N)/nthreads;
//int end = ((ithread+1)*N)/nthreads;
//printf("ithread, start %d, end %d, chunk %d\n",start, end, end-start);
//#pragma omp for
for(int i=0; i<(nvec/8); i++) {
//for(int i=start; i<end; i++) {
// prod_block4(&x[i*24], M, &y[i*24], &result[4*i]);
prod_block4_unroll2(&x[i*48], M, &y[i*48], &result[8*i]);
}
}
}
void aos2soa(float *in, float *out) {
int cnt = 0;
for(int i=0; i<6; i++) {
for(int j=0; j<8; j++) {
out[cnt++] = in[6*j + i];
}
}
}
void aos2aosoa(float *in, float *out, const int nvec) {
for(int i=0; i<(nvec/8); i++) {
aos2soa(&in[i*48], &out[i*48]);
}
}
float compare_results(float *r1, float * r2, const int nvec) {
float out = 0.0f;
for(int i=0; i<nvec; i++) {
out += r1[i] -r2[i];
//if(out!=0) printf("%f %f\n",r1[i], r2[i]);
}
return out;
}
void loop(const int nvec, const int repeat) {
float *M = (float*)_mm_malloc(6*6*sizeof(float),64);
float *x_aos = (float*)_mm_malloc(nvec*6*sizeof(float),64);
float *y_aos = (float*)_mm_malloc(nvec*6*sizeof(float),64);
float *x_aosoa = (float*)_mm_malloc(nvec*6*sizeof(float),64);
float *y_aosoa = (float*)_mm_malloc(nvec*6*sizeof(float),64);
float *results_scalar = (float*)_mm_malloc(nvec*sizeof(float),64);
float *results_SIMD = (float*)_mm_malloc(nvec*sizeof(float),64);
for(int i=0; i<6; i++) {
for(int j=0; j<6; j++) {
M[j*6 + i] = i*6 + j;
}
}
for(int i=0; i<(6*nvec); i++) {
float r1 = (float)rand()/RAND_MAX;
float r2 = (float)rand()/RAND_MAX;
//x_aos[i] = i;
x_aos[i] = r1;
//y_aos[i] = i;
y_aos[i] = r2;
}
aos2aosoa(x_aos, x_aosoa, nvec);
aos2aosoa(y_aos, y_aosoa, nvec);
float dtime;
dtime = omp_get_wtime();
for(int i=0; i<repeat; i++) {
loop_scalar(x_aos, M, y_aos, results_scalar, nvec);
}
dtime = omp_get_wtime() - dtime;
printf("time scalar %f\n", dtime);
float dtime_old = dtime;
dtime = omp_get_wtime();
for(int i=0; i<repeat; i++) {
loop_SIMD(x_aosoa, M, y_aosoa, results_SIMD, nvec);
}
dtime = omp_get_wtime() - dtime;
printf("time SIMD %f\n", dtime);
printf("time scalar/SIMD %f\n", dtime_old/dtime);
printf("difference %f\n", compare_results(results_scalar, results_SIMD, nvec));
_mm_free(M);
_mm_free(x_aos);
_mm_free(y_aos);
_mm_free(x_aosoa);
_mm_free(y_aosoa);
_mm_free(results_scalar);
_mm_free(results_SIMD);
}
int main() {
int nveca[3];
nveca[0] = 2000; // 2000*2*6*8 = 192kb //L2
nveca[1] = 32000; // 32000*2*6*8 = 3Mb //L3
nveca[2] = 5*1000000; //366Mb
int nrepeat[3];
nrepeat[0] = 10000;
nrepeat[1] = 1000;
nrepeat[2] = 100;
for(int i=0; i<3; i++) {
printf("nvec %d, repeat %d\n", nveca[i], nrepeat[i]);
loop(nveca[i], nrepeat[i]);
printf("\n");
}
}
Per Windows 7, è possibile utilizzare 8 dei registri YMM ---> 6 per elementi della matrice 1 di vettore a 1 per il vettore b. Per Linux è gratuito registri di 16 mm. –
Oops, dovrei aver specificato, questi sono doppia precisione. Modificherò la domanda. – Justin
@huseyintugrulbuyukisik ottieni registri di 8 mm su x86, 16 registri su x64. – dsharlet