2016-01-04 28 views
36

Sto tentando di precaricare i dati di allenamento per nascondere la latenza di I/O. Vorrei scrivere codice Python personalizzato che carichi i dati dal disco e preprocessi i dati (ad esempio aggiungendo una finestra di contesto). In altre parole, un thread esegue la pre-elaborazione dei dati e l'altro esegue la formazione. È possibile in TensorFlow?Come precaricare i dati utilizzando una funzione python personalizzata in tensorflow

Aggiornamento: Ho un esempio funzionante basato sull'esempio di @ mrry.

import numpy as np 
import tensorflow as tf 
import threading 

BATCH_SIZE = 5 
TRAINING_ITERS = 4100 

feature_input = tf.placeholder(tf.float32, shape=[128]) 
label_input = tf.placeholder(tf.float32, shape=[128]) 

q = tf.FIFOQueue(200, [tf.float32, tf.float32], shapes=[[128], [128]]) 
enqueue_op = q.enqueue([label_input, feature_input]) 

label_batch, feature_batch = q.dequeue_many(BATCH_SIZE) 
c = tf.reshape(feature_batch, [BATCH_SIZE, 128]) + tf.reshape(label_batch, [BATCH_SIZE, 128]) 

sess = tf.Session() 

def load_and_enqueue(sess, enqueue_op, coord): 
    with open('dummy_data/features.bin') as feature_file, open('dummy_data/labels.bin') as label_file: 
    while not coord.should_stop(): 
     feature_array = np.fromfile(feature_file, np.float32, 128) 
     if feature_array.shape[0] == 0: 
     print('reach end of file, reset using seek(0,0)') 
     feature_file.seek(0,0) 
     label_file.seek(0,0) 
     continue 
     label_value = np.fromfile(label_file, np.float32, 128) 

     sess.run(enqueue_op, feed_dict={feature_input: feature_array, 
             label_input: label_value}) 

coord = tf.train.Coordinator() 
t = threading.Thread(target=load_and_enqueue, args=(sess,enqueue_op, coord)) 
t.start() 

for i in range(TRAINING_ITERS): 
    sum = sess.run(c) 
    print('train_iter='+str(i)) 
    print(sum) 

coord.request_stop() 
coord.join([t]) 
+3

Ho appena creato un taccuino sulle code che spiega anche un caso d'uso simile, spero che possa essere utile anche ad altri: https://gist.github.com/akiross/23b6ae42812841bb79af4976a2525cf9 – AkiRoss

risposta

49

Questo è un caso di uso comune, e la maggior parte delle implementazioni di utilizzare tensorflow code per disaccoppiare il codice di pre-elaborazione dal codice di formazione. C'è a tutorial on how to use queues, ma i passi principali sono i seguenti:

  1. Definire una coda, q, che il buffer dei dati preprocessati. TensorFlow supporta il semplice tf.FIFOQueue che produce elementi nell'ordine in cui sono stati accodati e il più avanzato tf.RandomShuffleQueue che produce elementi in un ordine casuale. Un elemento di coda è una tupla di uno o più tensori (che possono avere diversi tipi e forme). Tutte le code supportano operazioni a singolo elemento (enqueue, dequeue) e batch (enqueue_many, dequeue_many), ma per utilizzare le operazioni batch è necessario specificare le forme di ogni tensore in un elemento di coda durante la costruzione della coda.

  2. Creare un sottografo che accoda gli elementi preelaborati nella coda. Un modo per farlo sarebbe definire alcune operazioni tf.placeholder() per tensioni corrispondenti a un singolo esempio di input, quindi passarle a q.enqueue(). (Se la preelaborazione produce un batch in una volta, dovresti usare invece q.enqueue_many()). In questo sottotitolo potresti anche includere le operazioni TensorFlow.

  3. Costruire un sottografo che esegue l'allenamento. Questo apparirà come un normale grafico TensorFlow, ma riceverà il suo input chiamando q.dequeue_many(BATCH_SIZE).

  4. Inizia la sessione.

  5. Creare uno o più thread che eseguono la logica di preelaborazione, quindi eseguire l'operazione di enqueue, alimentando i dati preelaborati. È possibile trovare le classi di utilità tf.train.Coordinator e tf.train.QueueRunner utili per questo.

  6. Eseguire il grafico di allenamento (ottimizzatore, ecc.) Come di consueto.

EDIT: Ecco una semplice funzione di load_and_enqueue() e frammento di codice per iniziare:

# Features are length-100 vectors of floats 
feature_input = tf.placeholder(tf.float32, shape=[100]) 
# Labels are scalar integers. 
label_input = tf.placeholder(tf.int32, shape=[]) 

# Alternatively, could do: 
# feature_batch_input = tf.placeholder(tf.float32, shape=[None, 100]) 
# label_batch_input = tf.placeholder(tf.int32, shape=[None]) 

q = tf.FIFOQueue(100, [tf.float32, tf.int32], shapes=[[100], []]) 
enqueue_op = q.enqueue([feature_input, label_input]) 

# For batch input, do: 
# enqueue_op = q.enqueue_many([feature_batch_input, label_batch_input]) 

feature_batch, label_batch = q.dequeue_many(BATCH_SIZE) 
# Build rest of model taking label_batch, feature_batch as input. 
# [...] 
train_op = ... 

sess = tf.Session() 

def load_and_enqueue(): 
    with open(...) as feature_file, open(...) as label_file: 
    while True: 
     feature_array = numpy.fromfile(feature_file, numpy.float32, 100) 
     if not feature_array: 
     return 
     label_value = numpy.fromfile(feature_file, numpy.int32, 1)[0] 

     sess.run(enqueue_op, feed_dict={feature_input: feature_array, 
             label_input: label_value}) 

# Start a thread to enqueue data asynchronously, and hide I/O latency. 
t = threading.Thread(target=load_and_enqueue) 
t.start() 

for _ in range(TRAINING_EPOCHS): 
    sess.run(train_op) 
+1

Grazie per il vostro consiglio. Ho un'altra domanda. Nel mio esperimento, la funzione di formazione e l'etichetta sono memorizzate in due file binari separati. Dovrei creare due code, una per funzionalità e una per etichetta? Se vogliamo ottenere una coppia casuale (funzionalità, etichetta) dalle due code, come faccio a verificare che la funzione corrisponda all'etichetta corretta? In altre parole, come posso garantire la mappatura one-to-one? –

+0

Per mantenere il mapping one-to-one, è necessario creare una singola coda in cui ogni elemento è una tupla di un tensore di funzionalità e un tensore dell'etichetta. È possibile farlo specificando un elenco di tipi (e forme) al costruttore della coda. Ciò assicura che i componenti della stessa tupla siano sempre separati dalla coda. – mrry

+0

Le caratteristiche e le etichette sono memorizzate separatamente in due grandi file binari. Quindi ho bisogno di creare feat_queue = tf.train.string_input_producer (feat_filenames) e label_queue = tf.train.string_input_producer (label_filenames). Poi avrò anche due tf.FixedLengthRecordReader per ottenere feat_queue ed etichetta da label_queue separatamente. Infine accodamento [feat, label] in un'altra coda. Ecco il problema Quando uso FixedLengthRecordReader per ottenere feat ed etichetta, sono sempre mappati correttamente? –

6

In altre parole, un thread fa i dati di pre-elaborazione e l'altro fa la formazione. È possibile in TensorFlow?

Sì, lo è. La soluzione di mrry funziona, ma esiste più semplice.

recupero dei dati

tf.py_func avvolge una funzione pitone e lo utilizza come operatore tensorflow. Quindi possiamo caricare i dati allo sess.run() ogni volta.Il problema con questo approccio è che i dati vengono caricati durante il sess.run() tramite il thread principale.

Un esempio minimo:

def get_numpy_tensor(): 
    return np.array([[1,2],[3,4]], dtype=np.float32) 
tensorflow_tensor = tf.py_func(get_numpy_tensor, [], tf.float32) 

Un esempio più complesso:

def get_numpy_tensors(): 
    # Load data from the disk into numpy arrays. 
    input = np.array([[1,2],[3,4]], dtype=np.float32) 
    target = np.int32(1) 
    return input, target 
tensorflow_input, tensorflow_target = tf.py_func(get_numpy_tensors, [], [tf.float32, tf.int32]) 

tensorflow_input, tensorflow_target = 2*tensorflow_input, 2*tensorflow_target 

sess = tf.InteractiveSession() 
numpy_input, numpy_target = sess.run([tensorflow_input, tensorflow_target]) 
assert np.all(numpy_input==np.array([[2,4],[6,8]])) and numpy_target==2 

dati prefetching in un altro thread

fare la fila i nostri dati in un altro thread (in modo che non lo faranno sess.run() dobbiamo aspettare per i dati), possiamo usare tf.train.batch() sui nostri operatori da tf.py_func().

Un esempio minimo:

tensor_shape = get_numpy_tensor().shape 
tensorflow_tensors = tf.train.batch([tensorflow_tensor], batch_size=32, shapes=[tensor_shape]) 
# Run `tf.train.start_queue_runners()` once session is created. 

Possiamo omettere l'argomento shapes se tensorflow_tensor ha la forma specificata:

tensor_shape = get_numpy_tensor().shape 
tensorflow_tensor.set_shape(tensor_shape) 
tensorflow_tensors = tf.train.batch([tensorflow_tensor], batch_size=32) 
# Run `tf.train.start_queue_runners()` once session is created. 

Un esempio più complesso:

input_shape, target_shape = (2, 2),() 
def get_numpy_tensors(): 
    input = np.random.rand(*input_shape).astype(np.float32) 
    target = np.random.randint(10, dtype=np.int32) 
    print('f', end='') 
    return input, target 
tensorflow_input, tensorflow_target = tf.py_func(get_numpy_tensors, [], [tf.float32, tf.int32]) 
batch_size = 2 
tensorflow_inputs, tensorflow_targets = tf.train.batch([tensorflow_input, tensorflow_target], batch_size, shapes=[input_shape, target_shape], capacity=2) 
# Internal queue will contain at most `capasity=2` times `batch_size=2` elements `[tensorflow_input, tensorflow_target]`. 

tensorflow_inputs, tensorflow_targets = 2*tensorflow_inputs, 2*tensorflow_targets 

sess = tf.InteractiveSession() 
tf.train.start_queue_runners() # Internally, `tf.train.batch` uses a QueueRunner, so we need to ask tf to start it. 
for _ in range(10): 
    numpy_inputs, numpy_targets = sess.run([tensorflow_inputs, tensorflow_targets]) 
    assert numpy_inputs.shape==(batch_size, *input_shape) and numpy_targets.shape==(batch_size, *target_shape) 
    print('r', end='') 

# Prints `fffffrrffrfrffrffrffrffrffrffrf`. 

In caso get_numpy_tensor() restituisce una serie di tensori, il n tf.train.batch(..., enqueue_many=True) aiuterà.