2013-07-24 20 views
5

Nel mio codice devo gestire "smascherare" i pacchetti websocket, che essenzialmente significa XOR'ing di dati non allineati di lunghezza arbitraria. Grazie a SO (Websocket data unmasking/multi byte xor) ho già scoperto come accelerare (si spera) usando le estensioni SSE2/AVX2, ma a guardarlo ora, mi sembra che la mia gestione dei dati non allineati sia totalmente non ottimale. C'è un modo per ottimizzare il mio codice o renderlo più semplice con le stesse prestazioni, o il mio codice è già il migliore?ottimizzare SSE2/AVX2 non allineato XOR

Ecco la parte importante del codice (per la domanda presumo che i dati saranno sempre sufficienti per eseguire il ciclo AVX2 una volta, ma allo stesso tempo verrà eseguito per lo più poche volte al massimo) :

// circular shift left for uint32 
int cshiftl_u32(uint32_t num, uint8_t shift) { 
    return (num << shift) | (num >> (32 - shift));                  
}                              

// circular shift right for uint32 
int cshiftr_u32(uint32_t num, uint8_t shift) { 
    return (num >> shift) | (num << (32 - shift));                  
}                              

void optimized_xor_32(uint32_t mask, uint8_t *ds, uint8_t *de) { 
    if (ds == de) return; // zero data len -> nothing to do 

    uint8_t maskOffset = 0; 

// process single bytes till 4 byte alignment (<= 3) 
    for (; ds < de && ((uint64_t)ds & (uint64_t)3); ds++) { 
     *ds ^= *((uint8_t *)(&mask) + maskOffset); 
     maskOffset = (maskOffset + 1) & (uint8_t)3; 
    } 

    if (ds == de) return; // done, return 

    if (maskOffset != 0) { // circular left-shift mask around so it works for other instructions 
     mask = cshiftl_u32(mask, maskOffset); 

     maskOffset = 0; 
    } 

// process 4 byte block till 8 byte alignment (<= 1) 
    uint8_t *de32 = (uint8_t *)((uint64_t)de & ~((uint64_t)31)); 

    if (ds < de32 && ((uint64_t)de & (uint64_t)7)) { 
     *(uint32_t *)ds ^= mask; // mask is uint32_t 

     if (++ds == de) return; 
    } 

// process 8 byte block till 16 byte alignment (<= 1) 
    uint64_t mask64 = mask | (mask << 4); 
    uint8_t *de64 = (uint8_t *)((uint64_t)de & ~((uint64_t)63)); 

    if (ds < de64 && ((uint64_t)ds & (uint64_t)15)) { 
     *(uint64_t *)ds ^= mask64; 

     if (++ds == de) return; // done, return 
    } 


// process 16 byte block till 32 byte alignment (<= 1) (if supported) 
#ifdef CPU_SSE2 
    __m128i v128, v128_mask; 
    v128_mask = _mm_set1_epi32(mask); 

    uint8_t *de128 = (uint8_t *)((uint64_t)de & ~((uint64_t)127)); 

    if (ds < de128 && ((uint64_t)ds & (uint64_t)31)) { 
     v128 = _mm_load_si128((__m128i *)ds); 
     v128 = _mm_xor_si128(v128, v128_mask); 
     _mm_store_si128((__m128i *)ds, v128); 

     if (++ds == de) return; // done, return 
    } 

#endif 
#ifdef CPU_AVX2 // process 32 byte blocks (if supported -> haswell upwards) 
    __m256i v256, v256_mask; 
    v256_mask = _mm256_set1_epi32(mask); 

    uint8_t *de256 = (uint8_t *)((uint64_t)de & ~((uint64_t)255)); 

    for (; ds < de256; ds+=32) { 
     v256 = _mm256_load_si256((__m256i *)ds); 
     v256 = _mm256_xor_si256(v256, v256_mask); 
     _mm256_store_si256((__m256i *)ds, v256); 
    } 

    if (ds == de) return; // done, return 
#endif 
#ifdef CPU_SSE2 // process remaining 16 byte blocks (if supported) 
    for (; ds < de128; ds+=16) { 
     v128 = _mm_load_si128((__m128i *)ds); 
     v128 = _mm_xor_si128(v128, v128_mask); 
     _mm_store_si128((__m128i *)ds, v128); 
    } 

    if (ds == de) return; // done, return 

#endif 
    // process remaining 8 byte blocks 
    // this should always be supported, so remaining can be assumed to be executed <= 1 times 
    for (; ds < de64; ds += 8) { 
     *(uint64_t *)ds ^= mask64; 
    } 

    if (ds == de) return; // done, return 

    // process remaining 4 byte blocks (<= 1) 
    if (ds < de32) { 
     *(uint32_t *)ds ^= mask; 

     if (++ds == de) return; // done, return 
    } 


    // process remaining bytes (<= 3) 

    for (; ds < de; ds ++) { 
     *ds ^= *((uint8_t *)(&mask) + maskOffset); 
     maskOffset = (maskOffset + 1) & (uint8_t)3; 
    } 

} 

PS: Si prega di ignorare l'uso di #ifdef invece di cpuid o simili per il rilevamento cpu bandiera.

+0

Hai provato a cronometrare il codice? (Inoltre, potresti voler racchiudere il bit di bit '&' nei tuoi condizionali con parentesi) –

+1

Il tempismo non sarebbe di grande aiuto, dato che posso solo fare ipotesi sui dati che otterrò come input, ma non otterrò alcun reale input per alcuni mesi a venire. Inoltre, otterrei solo un numero assoluto con i tempi, il che non mi aiuta in quanto il mio problema non è scoprire quanto tempo impiega questo codice per eseguire con l'input xy, ma come renderlo più veloce, ad es. Non ho idea di cosa cambiare. P.S .: Avvolto per bit e per una più facile comprensione, grazie per il suggerimento! – griffin

+1

Penso che scoprirete che gli stalli delle dipendenze dei dati superano i benefici allineati/non allineati. Se riesci a srotolare i tuoi loop di 2x, dovresti vedere un miglioramento significativo. – BitBank

risposta

2

A differenza di quanto riportato nel manuale, la maggior parte dei processori Intel è in realtà abbastanza buona per gestire dati non allineati. Dal momento che si utilizzano i builtin del compilatore Intel per la gestione dei vettori, presumo che si abbia accesso a una versione ragionevolmente recente di icc.

Se non è possibile allineare i dati in modo naturale, temo che ciò che si sta facendo sia il più vicino possibile alla massima prestazione. In termini di rendere il codice più leggibile e distribuibile su Xeon Phi (registri vettoriali a 64 byte)/Futuri processori vettoriali più lunghi, suggerirei di iniziare a utilizzare Intel Cilk Plus.

Esempio:

void intel_cilk_xor(uint32_t mask, uint8_t *d, size_t length) { 
    while (length & 0x3) { 
     *(d++) ^= mask; 
     asm ("rold $8, %0" : "+g" (mask) :: "cc"); // rotate dword one byte left 
     length--; 
    } 

    // switch to 4 bytes per block 
    uint32_t _d = d; 
    length >>= 2; 

    // Intel Cilk Plus Array Notation 
    // Should expand automatically to the best possible SIMD instructions 
    // you are compiling for 
    _d[0:length] ^= mask; 
} 

Si prega di notare che non ho testare questo codice come non ho accesso a un compilatore Intel al momento. Se incontrerai problemi, potrò esaminarlo quando tornerò nel mio ufficio la prossima settimana.

Se invece preferite intrinseche poi corretto utilizzo delle macro del preprocessore può facilitare notevolmente la tua vita:

#if defined(__MIC__) 
// intel Xeon Phi 
#define VECTOR_BLOCKSIZE 64 
// I do not remember the correct types/instructions right now 
#error "TODO: MIC handling" 
#elif defined(CPU_AVX2) 
#define VECTOR_BLOCKSIZE 32 
typedef __m256i my_vector_t; 
#define VECTOR_LOAD_MASK _mm256_set1_epi32 
#define VECTOR_XOR(d, mask) _mm_store_si256(d, _mm256_set1_epi32(_mm256_load_si256(d), mask)) 
#elif defined(CPU_SSE2) 
#define VECTOR_BLOCKSIZE 16 
typedef __m128i my_vector_t; 
#define VECTOR_LOAD_MASK _mm128_set1_epi32 
#define VECTOR_XOR(d, mask) _mm_store_si128(d, _mm128_set1_epi32(_mm128_load_si128(d), mask)) 
#else 
#define VECTOR_BLOCKSIZE 8 
#define VECTOR_LOAD_MASK(mask) ((mask) << 32 | (mask)) 
#define VECTOR_XOR(d, mask) (*(d)) ^= (mask) 
typedef uint64_t my_vector_t; 
#fi 

void optimized_xor_32(uint32_t mask, uint8_t *d, size_t length) { 
    size_t i; 

    // there really is no point in having extra 
    // branches for different vector lengths if they are 
    // executed at most once 
    // branch prediction is your friend here 
    // so we do one byte at a time until the block size 
    // is reached 

    while (length && (d & (VECTOR_BLOCKSIZE - 1))) { 
     *(d++) ^= mask; 
     asm ("rold $8, %0" : "+g" (mask) :: "cc"); // rotate dword one byte left 
     length--; 
    } 

    my_vector_t * d_vector = (my_vector_t *)d; 
    my_vector_t vector_mask = VECTOR_LOAD_MASK(mask); 

    size_t vector_legth = length/VECTOR_BLOCKSIZE; // compiler will optimise this to a bitshift 
    length &= VECTOR_BLOCKSIZE -1; // remaining length 

    for (i = 0; i < vector_legth; i++) { 
     VECTOR_XOR(d_vector + i, vector_mask); 
    } 

    // process the tail 
    d = (uint8_t*)(d_vector + i); 
    for (i = 0; i < length; i++) { 
     d[i] ^= mask; 
     asm ("rold $8, %0" : "+g" (mask) :: "cc"); 
    } 

} 

In un'altra nota: Si consiglia di utilizzare i 86 ruotano istruzioni invece di bit sposta per ruotare mask:

#define asm_rol(var, bits) asm ("rol %1, %0" : "+r" (var) : "c" ((uint8_t)bits) : "cc") 
+0

Non sto usando icc ma gcc e non ho alcun tipo speciale di accesso a icc. Non sapevo però delle istruzioni di rotazione, devo cercare cosa fa esattamente, grazie! – griffin

+0

@griffin OK, ho avuto l'impressione che '_mm_load_si128' e la famiglia sia un built-in' icc'. In tal caso dovresti prendere il mio secondo snippet di codice, solo senza la parte per il MIC. Purtroppo non ci sono intrinseche per le istruzioni di rotazione, ma so che per esempio 'htons' usa la rotazione di 2 byte. –

+0

In aumento, ma dovrò provarlo quando ne avrò il tempo, il che probabilmente non succederà così presto, ma mi accerterò di accettarlo quando l'ho testato funzionante e funzionante. Grazie per tutto il tempo! – griffin