2012-11-21 9 views
10

Ho una matrice 3D che ho bisogno di interpolare su un asse (l'ultima dimensione). Diciamo y.shape = (nx, ny, nz), voglio interpolare in nz per ogni (nx, ny). Tuttavia, voglio interpolare per un valore diverso in ogni [i, j].Interpolazione veloce su array 3D

Ecco un codice per esemplificare. Se volessi interpolare a un singolo valore, diciamo new_z, userei scipy.interpolate.interp1d come questo

# y is a 3D ndarray 
# x is a 1D ndarray with the abcissa values 
# new_z is a number 
f = scipy.interpolate.interp1d(x, y, axis=-1, kind='linear') 
result = f(new_z) 

Tuttavia, per questo problema quello che realmente voglio è di interpolare a una diversa per ogni new_zy[i, j]. Così faccio questo:

# y is a 3D ndarray 
# x is a 1D ndarray with the abcissa values 
# new_z is a 2D array 
result = numpy.empty(y.shape[:-1]) 
for i in range(nx): 
    for j in range(ny): 
     f = scipy.interpolate.interp1d(x, y[i, j], axis=-1, kind='linear') 
     result[i, j] = f(new_z[i, j]) 

Sfortunatamente, con loop multipli questo diventa inefficiente e lento. C'è un modo migliore per fare questo tipo di interpolazione? L'interpolazione lineare è sufficiente. Una possibilità è di implementare questo in Cython, ma stavo cercando di evitarlo perché voglio avere la flessibilità di passare all'interpolazione cubica e non voglio farlo a mano in Cython.

risposta

6

per accelerare ordine elevato interpolare, è possibile chiamare interp1d() solo una volta, e quindi utilizzare l'attributo _spline e la funzione di basso livello _bspleval() nel modulo _fitpack. Ecco il codice:

from scipy.interpolate import interp1d 
import numpy as np 

nx, ny, nz = 30, 40, 50 
x = np.arange(0, nz, 1.0) 
y = np.random.randn(nx, ny, nz) 
new_x = np.random.random_integers(1, (nz-1)*10, size=(nx, ny))/10.0 

def original_interpolation(x, y, new_x): 
    result = np.empty(y.shape[:-1]) 
    for i in xrange(nx): 
     for j in xrange(ny): 
      f = interp1d(x, y[i, j], axis=-1, kind=3) 
      result[i, j] = f(new_x[i, j]) 
    return result 

def fast_interpolation(x, y, new_x): 
    from scipy.interpolate._fitpack import _bspleval 
    f = interp1d(x, y, axis=-1, kind=3) 
    xj,cvals,k = f._spline 
    result = np.empty_like(new_x) 
    for (i, j), value in np.ndenumerate(new_x): 
     result[i, j] = _bspleval(value, x, cvals[:, i, j], k, 0) 
    return result 

r1 = original_interpolation(x, y, new_x) 
r2 = fast_interpolation(x, y, new_x) 

>>> np.allclose(r1, r2) 
True 

%timeit original_interpolation(x, y, new_x) 
%timeit fast_interpolation(x, y, new_x) 
1 loops, best of 3: 3.78 s per loop 
100 loops, best of 3: 15.4 ms per loop 
+0

Grazie. La tua soluzione è anche molto interessante. Sono rimasto sorpreso da così tante buone risposte. Sfortunatamente, posso accettarne solo uno. Anche se la tua soluzione non ha la velocità di Cython o la soluzione di @ pv, è quella più adatta per l'inquadratura della domanda. E il più flessibile in termini di tipo di interpolazione. Quindi lo accetto. – tiago

+0

Sto cercando di eseguire questo codice ma sto ricevendo questo errore L'oggetto 'BSpline' non è iterabile – Delosari

3

Non penso che lo interp1d abbia un metodo per farlo velocemente, quindi non è possibile evitare il loop qui.

Cython probabilmente si può ancora evitare codificando l'interpolazione lineare utilizzando np.searchsorted, qualcosa di simile (non testato):

def interp3d(x, y, new_x): 
    assert x.ndim == 1 and y.ndim == 3 and new_x.ndim == 2 
    assert y.shape[:2] == new_x.shape and x.shape == y.shape[2:] 

    nx, ny = y.shape[:2] 
    new_x = new_x.ravel() 
    j = np.arange(len(new_x)) 
    k = np.searchsorted(x, new_x).clip(1, len(x) - 1) 
    y = y.reshape(-1, x.shape[0]) 
    p = (new_x - x[k-1])/(x[k] - x[k-1]) 
    result = (1 - p) * y[j,k-1] + p * y[j,k] 
    return result.reshape(nx, ny) 

non aiuta con interpolazione cubica, però.

MODIFICA: ha reso una funzione e risolto errori di off-one. Alcuni tempi vs. Cython (griglia 500x500x500):

In [58]: %timeit interp3d(x, y, new_x) 
10 loops, best of 3: 82.7 ms per loop 

In [59]: %timeit cyfile.interp3d(x, y, new_x) 
10 loops, best of 3: 86.3 ms per loop 

In [60]: abs(interp3d(x, y, new_x) - cyfile.interp3d(x, y, new_x)).max() 
Out[60]: 2.2204460492503131e-16 

Anche se, si può sostenere che il codice Cython è più facile da leggere.

+0

Grazie, è sicuramente un modo elegante di farlo con Numpy. Ho finito con una soluzione rapida di Cython (vedi la mia risposta). È stato più veloce scrivere Cython piuttosto che aspettare che la versione python finisse di funzionare. – tiago

+0

@pv. puoi evitare il loop eseguendo [l'interpolazione come un'operazione vettoriale] (http://stackoverflow.com/a/13495570/709852) –

+0

@HenryGomersall: sì (ho provato a farlo nel codice sopra), ma intendevo che non è possibile se vuoi restare con interp1d. –

3

Poiché il suggerimento di Numpy stava richiedendo troppo tempo, potrei aspettare così ecco la versione cython per riferimento futuro. Da alcuni benchmark sparsi è circa 3000 volte più veloce (garantito, è solo l'interpolazione lineare e non tanto quanto interp1d ma va bene per questo scopo).

import numpy as N 
cimport numpy as N 
cimport cython 

DTYPEf = N.float64 
ctypedef N.float64_t DTYPEf_t 

@cython.boundscheck(False) # turn of bounds-checking for entire function 
@cython.wraparound(False) # turn of bounds-checking for entire function 
cpdef interp3d(N.ndarray[DTYPEf_t, ndim=1] x, N.ndarray[DTYPEf_t, ndim=3] y, 
       N.ndarray[DTYPEf_t, ndim=2] new_x): 
    """ 
    interp3d(x, y, new_x) 

    Performs linear interpolation over the last dimension of a 3D array, 
    according to new values from a 2D array new_x. Thus, interpolate 
    y[i, j, :] for new_x[i, j]. 

    Parameters 
    ---------- 
    x : 1-D ndarray (double type) 
     Array containg the x (abcissa) values. Must be monotonically 
     increasing. 
    y : 3-D ndarray (double type) 
     Array containing the y values to interpolate. 
    x_new: 2-D ndarray (double type) 
     Array with new abcissas to interpolate. 

    Returns 
    ------- 
    new_y : 3-D ndarray 
     Interpolated values. 
    """ 
    cdef int nx = y.shape[0] 
    cdef int ny = y.shape[1] 
    cdef int nz = y.shape[2] 
    cdef int i, j, k 
    cdef N.ndarray[DTYPEf_t, ndim=2] new_y = N.zeros((nx, ny), dtype=DTYPEf) 

    for i in range(nx): 
     for j in range(ny): 
      for k in range(1, nz): 
       if x[k] > new_x[i, j]: 
        new_y[i, j] = (y[i, j, k] - y[i, j, k - 1]) * \ 
        (new_x[i, j] - x[k-1])/(x[k] - x[k - 1]) + y[i, j, k - 1] 
        break 
    return new_y 
1

Si potrebbe utilizzare map_coordinates per questo:

from numpy import random, meshgrid, arange 
from scipy.ndimage import map_coordinates 

(nx, ny, nz) = (4, 5, 6) 
# some random array 
A = random.rand(nx, ny, nz) 

# random floating-point indices in [0, nz-1] 
Z = random.rand(nx, ny)*(nz-1) 

# regular integer indices of shape (nx,ny) 
X, Y = meshgrid(arange(nx), arange(ny), indexing='ij') 

coords = (X, Y, Z) # X, Y, and Z are of shape (nx, ny) 

print map_coordinates(A, coords, order=1, cval=-999.) 
+0

Avevo pensato a 'coordinate_carta', grazie per il tuo suggerimento. Nel mio caso 'nx, ny, nz' sono più vicini a 500 ciascuno, quindi penso che' map_coordinates' potrebbe essere un po 'avido nella RAM. Farò alcuni benchmark e riferirò. – tiago

2

Basandosi su @pv.'s answer, e vectorising il ciclo interno, il seguente fornisce un aumento di velocità sostanziale (EDIT: cambiato il costoso numpy.tile di utilizzare numpy.lib.stride_tricks.as_strided):

import numpy 
from scipy import interpolate 

nx = 30 
ny = 40 
nz = 50 

y = numpy.random.randn(nx, ny, nz) 
x = numpy.float64(numpy.arange(0, nz)) 

# We select some locations in the range [0.1, nz-0.1] 
new_z = numpy.random.random_integers(1, (nz-1)*10, size=(nx, ny))/10.0 

# y is a 3D ndarray 
# x is a 1D ndarray with the abcissa values 
# new_z is a 2D array 

def original_interpolation(): 
    result = numpy.empty(y.shape[:-1]) 
    for i in range(nx): 
     for j in range(ny): 
      f = interpolate.interp1d(x, y[i, j], axis=-1, kind='linear') 
      result[i, j] = f(new_z[i, j]) 

    return result 

grid_x, grid_y = numpy.mgrid[0:nx, 0:ny] 
def faster_interpolation(): 
    flat_new_z = new_z.ravel() 
    k = numpy.searchsorted(x, flat_new_z) 
    k = k.reshape(nx, ny) 

    lower_index = [grid_x, grid_y, k-1] 
    upper_index = [grid_x, grid_y, k] 

    tiled_x = numpy.lib.stride_tricks.as_strided(x, shape=(nx, ny, nz), 
     strides=(0, 0, x.itemsize)) 

    z_upper = tiled_x[upper_index] 
    z_lower = tiled_x[lower_index] 

    z_step = z_upper - z_lower 
    z_delta = new_z - z_lower 

    y_lower = y[lower_index] 
    result = y_lower + z_delta * (y[upper_index] - y_lower)/z_step 

    return result 

# both should be the same (giving a small difference) 
print numpy.max(
     numpy.abs(original_interpolation() - faster_interpolation())) 

Questo dà i seguenti orari sulla mia macchina:

In [8]: timeit foo.original_interpolation() 
10 loops, best of 3: 102 ms per loop 

In [9]: timeit foo.faster_interpolation() 
1000 loops, best of 3: 564 us per loop 

Andando a nx = 300, ny = 300 e nz = 500, dà un aumento di velocità 130x:

In [2]: timeit original_interpolation() 
1 loops, best of 3: 8.27 s per loop 

In [3]: timeit faster_interpolation() 
10 loops, best of 3: 60.1 ms per loop 

avresti bisogno di un scrivere il proprio algoritmo per interpolazione cubica, ma non dovrebbe essere così difficile.

2

Anche se ci sono diverse risposte belle, stanno ancora facendo 250k interpolazioni in un 500-lunga serie fisso:

j250k = np.searchsorted(X500, X250k) # indices in [0, 500) 

Questo può essere accelerato con un LUT , LookUp Tavolo, con dire 5k slot:

lut = np.interp(np.arange(5000), X500, np.arange(500)).round().astype(int) 
xscale = (X - X.min()) * (5000 - 1) \ 
     /(X.max() - X.min()) 
j = lut.take(xscale.astype(int), mode="clip") # take(floats) in numpy 1.7 ? 

#--------------------------------------------------------------------------- 
# X  | |  | |    | 
# j  0 1  2 3    4 ... 
# LUT |....|.......|.|.............|.... -> int j (+ offset in [0, 1)) 
#--------------------------------------------------------------------------- 

searchsorted è abbastanza veloce, il tempo ~ LN2 500, quindi questo non è probabilmente molto più veloce.
Ma le LUT sono molto veloci in C, un semplice compromesso di velocità/memoria.