2016-06-08 38 views
9

Sono nuovo in tensorflow distribuito. Ho trovato questo test mnist distribuito qui: https://github.com/tensorflow/tensorflow/blob/master/tensorflow/tools/dist_test/python/mnist_replica.pycome eseguire un esempio di mnist distribuito di tensorflow

Ma non so come farlo funzionare. Ho usato il seguente script:

python distributed_mnist.py --num_workers=3 --num_parameter_servers=1 --worker_index=0 --worker_grpc_url="grpc://tf-worker0:2222"\ 
    & python distributed_mnist.py --num_workers=3 --num_parameter_servers=1 --worker_index=1 --worker_grpc_url="grpc://tf-worker1:2222"\ 
    & python distributed_mnist.py --num_workers=3 --num_parameter_servers=1 --worker_index=2 --worker_grpc_url="grpc://tf-worker2:2222" 

Ho appena trovato questi parametri mancanti, quindi li ho passati al programma. Ecco cosa è successo:

I tensorflow/stream_executor/dso_loader.cc:105] successfully opened CUDA library libcublas.so locally 
I tensorflow/stream_executor/dso_loader.cc:105] successfully opened CUDA library libcublas.so locally 
I tensorflow/stream_executor/dso_loader.cc:105] successfully opened CUDA library libcudnn.so locally 
I tensorflow/stream_executor/dso_loader.cc:105] successfully opened CUDA library libcudnn.so locally 
I tensorflow/stream_executor/dso_loader.cc:105] successfully opened CUDA library libcufft.so locally 
I tensorflow/stream_executor/dso_loader.cc:105] successfully opened CUDA library libcufft.so locally 
I tensorflow/stream_executor/dso_loader.cc:105] successfully opened CUDA library libcuda.so.1 locally 
I tensorflow/stream_executor/dso_loader.cc:105] successfully opened CUDA library libcuda.so.1 locally 
I tensorflow/stream_executor/dso_loader.cc:105] successfully opened CUDA library libcurand.so locally 
I tensorflow/stream_executor/dso_loader.cc:105] successfully opened CUDA library libcurand.so locally 
I tensorflow/stream_executor/dso_loader.cc:105] successfully opened CUDA library libcublas.so locally 
I tensorflow/stream_executor/dso_loader.cc:105] successfully opened CUDA library libcudnn.so locally 
I tensorflow/stream_executor/dso_loader.cc:105] successfully opened CUDA library libcufft.so locally 
I tensorflow/stream_executor/dso_loader.cc:105] successfully opened CUDA library libcuda.so.1 locally 
I tensorflow/stream_executor/dso_loader.cc:105] successfully opened CUDA library libcurand.so locally 
Extracting /tmp/mnist-data/train-images-idx3-ubyte.gz 
Extracting /tmp/mnist-data/train-images-idx3-ubyte.gz 
Extracting /tmp/mnist-data/train-images-idx3-ubyte.gz 
Extracting /tmp/mnist-data/train-labels-idx1-ubyte.gz 
Extracting /tmp/mnist-data/t10k-images-idx3-ubyte.gz 
Extracting /tmp/mnist-data/train-labels-idx1-ubyte.gz 
Extracting /tmp/mnist-data/train-labels-idx1-ubyte.gz 
Extracting /tmp/mnist-data/t10k-images-idx3-ubyte.gz 
Extracting /tmp/mnist-data/t10k-images-idx3-ubyte.gz 
Extracting /tmp/mnist-data/t10k-labels-idx1-ubyte.gz 
Extracting /tmp/mnist-data/t10k-labels-idx1-ubyte.gz 
Extracting /tmp/mnist-data/t10k-labels-idx1-ubyte.gz 
Worker GRPC URL: grpc://tf-worker0:2222 
Worker index = 0 
Number of workers = 3 
Worker GRPC URL: grpc://tf-worker2:2222 
Worker index = 2 
Number of workers = 3 
Worker GRPC URL: grpc://tf-worker1:2222 
Worker index = 1 
Number of workers = 3 
Worker 0: Initializing session... 
Worker 2: Waiting for session to be initialized... 
Worker 1: Waiting for session to be initialized... 
E0608 20:37:13.514249023 7501 resolve_address_posix.c:126] getaddrinfo: Name or service not known 
D0608 20:37:13.514287961 7501 dns_resolver.c:189]   dns resolution failed: retrying in 15 seconds 
E0608 20:37:13.548052986 7502 resolve_address_posix.c:126] getaddrinfo: Name or service not known 
D0608 20:37:13.548091527 7502 dns_resolver.c:189]   dns resolution failed: retrying in 15 seconds 
E0608 20:37:13.555449386 7503 resolve_address_posix.c:126] getaddrinfo: Name or service not known 
D0608 20:37:13.555473898 7503 dns_resolver.c:189]   dns resolution failed: retrying in 15 seconds 
^CE0608 20:37:28.517451603 7504 resolve_address_posix.c:126] getaddrinfo: Name or service not known 
D0608 20:37:28.517491102 7504 dns_resolver.c:189]   dns resolution failed: retrying in 15 seconds 
E0608 20:37:28.551002331 7505 resolve_address_posix.c:126] getaddrinfo: Name or service not known 
D0608 20:37:28.551029795 7505 dns_resolver.c:189]   dns resolution failed: retrying in 15 seconds 
E0608 20:37:28.556681378 7506 resolve_address_posix.c:126] getaddrinfo: Name or service not known 
D0608 20:37:28.556709728 7506 dns_resolver.c:189]   dns resolution failed: retrying in 15 seconds 

Qualcuno sa come eseguirlo correttamente? Molte grazie!

risposta

14

I valori del flag --worker_grpc_url nella riga di comando fanno riferimento a indirizzi che non esistono.

Questo script è progettato per essere eseguito in un particolare ambiente Kubernetes e non autonomo. In particolare tf-worker0:2222, tf-worker1:2222 e tf-worker2:2222 fare riferimento ai nomi dei contenitori Kubernetes creati da una versione automatica di questo test. Richiederebbe modifiche considerevoli per funzionare come test indipendente.

La documentazione per TensorFlow distribuito include code for an example trainer program. Il modo più semplice per provare MNIST su TensorFlow distribuito è incollare il modello nel modello. Ad esempio, qualcosa del genere dovrebbe funzionare:

import math 
import tensorflow as tf 
from tensorflow.examples.tutorials.mnist import input_data 

# Flags for defining the tf.train.ClusterSpec 
tf.app.flags.DEFINE_string("ps_hosts", "", 
          "Comma-separated list of hostname:port pairs") 
tf.app.flags.DEFINE_string("worker_hosts", "", 
          "Comma-separated list of hostname:port pairs") 

# Flags for defining the tf.train.Server 
tf.app.flags.DEFINE_string("job_name", "", "One of 'ps', 'worker'") 
tf.app.flags.DEFINE_integer("task_index", 0, "Index of task within the job") 
tf.app.flags.DEFINE_integer("hidden_units", 100, 
          "Number of units in the hidden layer of the NN") 
tf.app.flags.DEFINE_string("data_dir", "/tmp/mnist-data", 
          "Directory for storing mnist data") 
tf.app.flags.DEFINE_integer("batch_size", 100, "Training batch size") 

FLAGS = tf.app.flags.FLAGS 

IMAGE_PIXELS = 28 

def main(_): 
    ps_hosts = FLAGS.ps_hosts.split(",") 
    worker_hosts = FLAGS.worker_hosts.split(",") 

    # Create a cluster from the parameter server and worker hosts. 
    cluster = tf.train.ClusterSpec({"ps": ps_hosts, "worker": worker_hosts}) 

    # Create and start a server for the local task. 
    server = tf.train.Server(cluster, 
          job_name=FLAGS.job_name, 
          task_index=FLAGS.task_index) 

    if FLAGS.job_name == "ps": 
    server.join() 
    elif FLAGS.job_name == "worker": 

    # Assigns ops to the local worker by default. 
    with tf.device(tf.train.replica_device_setter(
     worker_device="/job:worker/task:%d" % FLAGS.task_index, 
     cluster=cluster)): 

     # Variables of the hidden layer 
     hid_w = tf.Variable(
      tf.truncated_normal([IMAGE_PIXELS * IMAGE_PIXELS, FLAGS.hidden_units], 
           stddev=1.0/IMAGE_PIXELS), name="hid_w") 
     hid_b = tf.Variable(tf.zeros([FLAGS.hidden_units]), name="hid_b") 

     # Variables of the softmax layer 
     sm_w = tf.Variable(
      tf.truncated_normal([FLAGS.hidden_units, 10], 
           stddev=1.0/math.sqrt(FLAGS.hidden_units)), 
      name="sm_w") 
     sm_b = tf.Variable(tf.zeros([10]), name="sm_b") 

     x = tf.placeholder(tf.float32, [None, IMAGE_PIXELS * IMAGE_PIXELS]) 
     y_ = tf.placeholder(tf.float32, [None, 10]) 

     hid_lin = tf.nn.xw_plus_b(x, hid_w, hid_b) 
     hid = tf.nn.relu(hid_lin) 

     y = tf.nn.softmax(tf.nn.xw_plus_b(hid, sm_w, sm_b)) 
     loss = -tf.reduce_sum(y_ * tf.log(tf.clip_by_value(y, 1e-10, 1.0))) 

     global_step = tf.Variable(0) 

     train_op = tf.train.AdagradOptimizer(0.01).minimize(
      loss, global_step=global_step) 

     saver = tf.train.Saver() 
     summary_op = tf.summary.merge_all() 
     init_op = tf.initialize_all_variables() 

    # Create a "supervisor", which oversees the training process. 
    sv = tf.train.Supervisor(is_chief=(FLAGS.task_index == 0), 
          logdir="/tmp/train_logs", 
          init_op=init_op, 
          summary_op=summary_op, 
          saver=saver, 
          global_step=global_step, 
          save_model_secs=600) 

    mnist = input_data.read_data_sets(FLAGS.data_dir, one_hot=True) 

    # The supervisor takes care of session initialization, restoring from 
    # a checkpoint, and closing when done or an error occurs. 
    with sv.managed_session(server.target) as sess: 
     # Loop until the supervisor shuts down or 1000000 steps have completed. 
     step = 0 
     while not sv.should_stop() and step < 1000000: 
     # Run a training step asynchronously. 
     # See `tf.train.SyncReplicasOptimizer` for additional details on how to 
     # perform *synchronous* training. 

     batch_xs, batch_ys = mnist.train.next_batch(FLAGS.batch_size) 
     train_feed = {x: batch_xs, y_: batch_ys} 

     _, step = sess.run([train_op, global_step], feed_dict=train_feed) 
     if step % 100 == 0: 
      print "Done step %d" % step 

    # Ask for all the services to stop. 
    sv.stop() 

if __name__ == "__main__": 
    tf.app.run() 
+0

Grazie mille per la spiegazione! Ci proverò. – xyd

+0

Grazie mille! solo contribuendo con i miei 2 cent al tuo fantastico post. tf.merge_all_summaries() sembra essere deprecato e sta dando errore all'ultima versione o usa tf.merge_all o tf.contrib.deprecated.merge_all_summaries –

+0

@sunilmanikani Grazie per averlo indicato ... Ho aggiornato il codice per usare 'tf .summary.merge_all() '. – mrry