2015-03-05 14 views
5

Ho difficoltà a inizializzare ndarrays thread-locale con cython.parallel:cython.parallel: come inizializzare il buffer ndarray locale del thread?

pseudo-codice:

cdef: 
    ndarray buffer 

with nogil, parallel(): 
    buffer = np.empty(...) 

    for i in prange(n): 
     with gil: 
      print "Thread %d: data address: 0x%x" % (threadid(), <uintptr_t>buffer.data) 

     some_func(buffer.data) # use thread-local buffer 

cdef void some_func(char * buffer_ptr) nogil: 
    (... works on buffer contents...) 

Il mio problema è che in tutte le discussioni buffer.data punti allo stesso indirizzo. Vale a dire l'indirizzo del thread che ha assegnato l'ultima volta buffer.

Nonostante buffer essendo assegnato all'interno del blocco parallel() (o in alternativa prange), Cython non fa buffer una variabile private o thread locale ma mantiene come variabile shared.

Come risultato, buffer.data punta alla stessa area di memoria che provoca il caos sul mio algoritmo.

Questo non è un problema esclusivamente con oggetti ndarray ma apparentemente con tutti gli oggetti definiti cdef class.

Come posso risolvere questo problema?

+1

può chiamare 'np.empty' senza la gil? –

+1

forse [questa risposta] (http://stackoverflow.com/a/20520295/832621) porta quello che vuoi ... –

+1

@BiRico E 'una domanda retorica :)? No, non è possibile istanziare una matrice numpy (o una memoryview) all'interno di un blocco 'nogil' (altrimenti la matrice non verrebbe allocata nella memoria gestita da Python, e non potrebbe essere raccolta dei dati inutili, ecc.) –

risposta

2

Penso di aver finalmente trovato una soluzione a questo problema che mi piace. La versione breve è che si crea una matrice che ha forma:

(number_of_threads, ...<whatever shape you need in the thread>...) Poi, chiamare openmp.omp_get_thread_num e l'uso che per indicizzare la matrice per ottenere un sub-array "filo-locale". Questo evita di avere una matrice separata per ogni indice di loop (che potrebbe essere enorme) ma impedisce anche che i thread si sovrascrivano l'un l'altro.

Ecco una versione grezza di quello che ho fatto:

import numpy as np 
import multiprocessing 

from cython.parallel cimport parallel 
from cython.parallel import prange 
cimport openmp 

cdef extern from "stdlib.h": 
    void free(void* ptr) 
    void* malloc(size_t size) 
    void* realloc(void* ptr, size_t size) 

... 

cdef int num_items = ... 
num_threads = multiprocessing.cpu_count() 
result_array = np.zeros((num_threads, num_items), dtype=DTYPE) # Make sure each thread uses separate memory 
cdef c_numpy.ndarray result_cn 
cdef CDTYPE ** result_pointer_arr 
result_pointer_arr = <CDTYPE **> malloc(num_threads * sizeof(CDTYPE *)) 
for i in range(num_threads): 
    result_cn = result_array[i] 
    result_pointer_arr[i] = <CDTYPE*> result_cn.data 

cdef int thread_number 
for i in prange(num_items, nogil=True, chunksize=1, num_threads=num_threads, schedule='static'): 
    thread_number = openmp.omp_get_thread_num() 
    some_function(result_pointer_arr[thread_number])