Let's Flow within Kubeflow

Ala Raddaoui

In this blog post, we will go through how to train MNIST using distributed Tensorflow* and Kubeflow* from scratch.


Machine learning (ML) and deep learning (DL) have been around for more than half a century now, yet it is just as of late that these ideas have begun to flourish—thanks to advancements in compute capabilities and the deluge of data. This is due, essentially, to the fact that ML/DL algorithms need vast amounts of information to register the desired level of accuracy. Likewise, this high volume of data requires high processing power so it can yield the expected intelligence and knowledge. With the emergence of Cloud and other distributed frameworks, we started to treat a set number of servers as "cattle versus pets" in an attempt to utilize their collective assets for storage and computation.

Kubernetes* is a cloud platform and a container orchestration tool which is changing the industry standard on how to deploy and manage applications at scale. Another project—Kubeflow—has recently emerged to utilize what Kubernetes offers, and makes it easy to deploy distributed machine learning workloads with just a few commands.

What is Kubeflow?

As stated in the official repo, the Kubeflow is a new open source project hosted on Github dedicated to streamlining the use of ML stacks on Kubernetes to be easy, fast, and extensible.

The vision of the project is to simplify the creation of production-ready ML systems, ensure the mobility of ML stacks between clusters, and easily scale the training to any cluster size. Since Kubernetes runs on top of any Cloud (AWS, GCP, AZURE, etc.), or even bare metal, Kubeflow stacks will be able to run wherever you have a Kubernetes cluster running—making them cloud agnostic and easy to move. Currently, Kubeflow only supports the TensorFlow library for training and serving your models, but support for other ML libraries like PyTorch* and MXnet are on the way.

How does it work?


In order to follow along with this tutorial, you will need to get the following items:

  • Access to a Kubernetes cluster >= 1.8
  • Ksonnet version > 0.9.0
  • Docker client


STEP 1: Install Kubeflow on your Kubernetes Cluster

Let’s start by creating a sandbox namespace on our Kubernetes cluster to use for this tutorial which will make it easy to clean your environment later.

kubectl create ns ${NAMESPACE}
# set kube context to point to the new namespace
kubectl config set-context $(kubectl config current-context) --namespace=${NAMESPACE}

Now, we will install Kubeflow with ksonnet. ksonnet is just a framework for writing, sharing, and deploying Kubernetes manifests. ksonnet looks at Kubernetes manifests as part of components inside of an application that you want to deploy. For more information hop on to their official website.

Let’s start by creating a ksonnet application that will contain initially Kubeflow components and to which we will add your ML stack later on.

# initialize the ksonnet app
ks init mykubeflow-app
cd mykubeflow-app
ks env set default --namespace ${NAMESPACE}
# Which version of KubeFlow to use
# add kubeflow registry and install the packages
ks registry add kubeflow
ks pkg install kubeflow/core@${VERSION}
ks pkg install kubeflow/tf-serving@${VERSION}
ks pkg install kubeflow/tf-job@${VERSION}
# generate kube manifests for kubeflow components
ks generate kubeflow-core kubeflow-core
#apply the kubeflow components to the default environment
ks apply default -c kubeflow-core

Applying a ksonnet component to default is basically applying the Kubernetes manifest generated by ksonnet to your Kubernetes environment pointed out by our kubectl context. This will install a tf-job operator, JupyterHub, and TFserving even though In this tutorial we will be only using tf-job operator.

STEP 3: Run the Distributed TensorFlow Example

Step 2 or 3?

So we intentionally kept Step 2 for later to keep the explanation simple. This won’t affect the execution line.

For the sake of this example, we will perform training and validation on a MNIST model, with model replicas taken from the official TensorFlow Github repo mnist_replica.py. You can replace this training script later with your own.

When we installed the tf-job operator, a tf-job custom resource definition was added to our Kubernetes cluster which we will use to train our model.

Let’s go ahead and create a tf-job resource and set it up with all the pieces of our TensorFlow training job. Then the tf-job operator will take care of all the complexities of linking and creating the TF cluster that will train our model.

To do that we will just use the tf-job prototype that we already installed to ksonnet when we installed the ksonnet packages.

# generate a tf-job component with name dist-mnist
ks generate tf-job ${JOB_NAME} --name=${JOB_NAME} --namespace=${NAMESPACE}

Now, take a moment to examine all the parameters that we can tweak within our newly created dist-mnist component.

ks param list
COMPONENT              PARAM          VALUE
=========              =====         =====
dist-mnist             args          "null"
dist-mnist             image         "null"
dist-mnist             image_gpu.    "null"
dist-mnist             name        "dist-mnist"
dist-mnist             namespace   "mykubeflow"
dist-mnist             num_gpus 0
dist-mnist             num_masters 1
dist-mnist             num_ps  0
dist-mnist             num_workers 0

As you can see there are parameters to specify how many workers, parameter servers, masters we want to use for this training, and the docker image we want to use when we spin up those instances. This image should contain the training logic for workers, ps, and master if there’s any of those. Also, the number of GPUs to attach to workers if GPU training is wanted.

So let’s get started!

We will configure our tf-job to perform the distributed MNIST example.

At its core, Kubernetes is a container orchestration system, so anything we run in Kubernetes should be baked in a docker container. To run the distributed MNIST, we will set the image parameter to point to our docker image that will contain the training script and run it once it’s spawned up. We will also set the number of workers to 3 and ps to 2 just for the sake of this example but it's all yours to play with.

We won’t use a master in our case since the mnist_replica.py uses the first worker as the chief one as opposed to master which will manage and coordinate training across our workers.

ks param set ${JOB_NAME} image ${IMAGE}
ks param set ${JOB_NAME} num_ps 2
ks param set ${JOB_NAME} num_workers 3
ks param set ${JOB_NAME} num_masters 0
ks param set  ${JOB_NAME} args -- python,/opt/mnist_replica.py

The last line shows arguments that we want to pass to our docker container when it's created. You can see that we are passing python /opt/mnist_replica.py to basically run the script that we have baked into our docker container. You can pass more arguments like the number of hidden units or training steps depending on your training example.

If curious to see how our ks manifest looks like before we apply it to Kubernetes, execute this command:

ks show default -c ${JOB_NAME}

Finally, apply the component to your environment to start the training.

ks apply default -c ${JOB_NAME}

Good, so now what? How can I see the results?

STEP 4: Visualize Progress and Final Results of Training

To see final results, let's get the pods that were created by TFJob to perform the training:

Kubectl get pods
dist-mnist-ps-fyvd-0-vnmvh       1/1 Running 0 1m
dist-mnist-ps-fyvd-1-8bzjt       1/1 Running 0 1m
dist-mnist-worker-fyvd-0-spc47   1/1 Running 0 1m
dist-mnist-worker-fyvd-1-nb52c   1/1 Running 0 1m
dist-mnist-worker-fyvd-2-xhcgk   1/1 Running 0 1m

As we can see there are 5 pods created (3 workers and 2 PS).

If we look at the log of each worker, we can see the python output of the mnist_replica.py script each one is running.

Let’s visualize a sample log from the chief worker (with task 0) worker-XYZ-0.

kubectl logs dist-mnist-worker-fyvd-0-spc47
INFO|2018-03-15T18:40:34|/opt/launcher.py|63| Launcher started.
INFO|2018-03-15T18:40:34|/opt/launcher.py|78| Command to run: python /opt/mnist_replica.py --job_name=worker --ps_hosts=dist-mnist-ps-fyvd-0:2222,dist-mnist-ps-fyvd-1:2222 --worker_hosts=dist-mnist-worker-fyvd-0:2222,dist-mnist-worker-fyvd-1:2222,dist-mnist-worker-fyvd-2:2222 --task_index=0
INFO|2018-03-15T18:40:34|/opt/launcher.py|30| Running python /opt/mnist_replica.py --job_name=worker --ps_hosts=dist-mnist-ps-fyvd-0:2222,dist-mnist-ps-fyvd-1:2222 --worker_hosts=dist-mnist-worker-fyvd-0:2222,dist-mnist-worker-fyvd-1:2222,dist-mnist-worker-fyvd-2:2222 --task_index=0
INFO|2018-03-15T18:40:53|/opt/launcher.py|42| Successfully downloaded train-images-idx3-ubyte.gz 9912422 bytes.
INFO|2018-03-15T18:40:53|/opt/launcher.py|42| Extracting /tmp/mnist-data/train-images-idx3-ubyte.gz
INFO|2018-03-15T18:40:53|/opt/launcher.py|42| Successfully downloaded train-labels-idx1-ubyte.gz 28881 bytes.
INFO|2018-03-15T18:40:53|/opt/launcher.py|42| Extracting /tmp/mnist-data/train-labels-idx1-ubyte.gz
INFO|2018-03-15T18:40:53|/opt/launcher.py|42| Successfully downloaded t10k-images-idx3-ubyte.gz 1648877 bytes.
INFO|2018-03-15T18:40:53|/opt/launcher.py|42| Extracting /tmp/mnist-data/t10k-images-idx3-ubyte.gz
INFO|2018-03-15T18:40:53|/opt/launcher.py|42| Successfully downloaded t10k-labels-idx1-ubyte.gz 4542 bytes.
INFO|2018-03-15T18:40:53|/opt/launcher.py|42| Extracting /tmp/mnist-data/t10k-labels-idx1-ubyte.gz
INFO|2018-03-15T18:40:53|/opt/launcher.py|42| job name = worker
INFO|2018-03-15T18:40:53|/opt/launcher.py|42| task index = 0
INFO|2018-03-15T18:40:53|/opt/launcher.py|42| Worker 0: Initializing session...
INFO|2018-03-15T18:40:53|/opt/launcher.py|42| Worker 0: Session initialization complete.
INFO|2018-03-15T18:40:53|/opt/launcher.py|42| Training begins @ 1521139252.877434
INFO|2018-03-15T18:40:53|/opt/launcher.py|42| 1521139253.190525: Worker 0: training step 1 done (global step: 2)
INFO|2018-03-15T18:40:53|/opt/launcher.py|42| 1521139253.201047: Worker 0: training step 2 done (global step: 5)
INFO|2018-03-15T18:40:54|/opt/launcher.py|42| 1521139254.140446: Worker 0: training step 100 done (global step: 199)
INFO|2018-03-15T18:40:54|/opt/launcher.py|42| 1521139254.148050: Worker 0: training step 101 done (global step: 200)
INFO|2018-03-15T18:40:54|/opt/launcher.py|42| Training ends @ 1521139254.148146
INFO|2018-03-15T18:40:54|/opt/launcher.py|42| Training elapsed time: 1.270712 s
INFO|2018-03-15T18:40:54|/opt/launcher.py|42| After 200 training step(s), validation cross entropy = 899.37
INFO|2018-03-15T18:40:54|/opt/launcher.py|85| Finished: python /opt/mnist_replica.py --job_name=worker --ps_hosts=dist-mnist-ps-fyvd-0:2222,dist-mnist-ps-fyvd-1:2222 --worker_hosts=dist-mnist-worker-fyvd-0:2222,dist-mnist-worker-fyvd-1:2222,dist-mnist-worker-fyvd-2:2222 --task_index=0

Other workers should also get you a similar log output.

To make sure the TFJob is successfully completed, visualize the yaml output of the TFJob and you should see phase: Done and state: Succeeded in the fields.

kubectl get tfjob/dist-mnist -o yaml
  phase: Done
  reason: ""
  - ReplicasStates:
      Succeeded: 3
    state: Succeeded
    tf_replica_type: WORKER
  - ReplicasStates:
      Running: 2
    state: Running
    tf_replica_type: PS
  state: Succeeded

Okay, this all looks nice but I don’t know how to build that docker image? I want to build my own!

STEP 2: Build the Docker Image to be Used for Workers and Parameter Servers

To explain that, let’s see how we would run the mnist_replica.py script if we were to go through the standard way of calling the python script in different terminals in your local machine for simplicity.

If we opt for the same number of workers and parameter servers: 3 workers and 2 PSs, we will need to manually specify the cluster specifications, the job type, and the task index each time we run the script.

In terminal 1:

python mnist_replica.py --job_name=worker --task_index=0
\ --ps_hosts=localhost:2221,localhost:2222
\ --worker_hosts=localhost:2223,localhost:2224,localhost:2225

In terminal 2:

python mnist_replica.py --job_name=worker --task_index=1
\ --ps_hosts=localhost:2221,localhost:2222
\ --worker_hosts=localhost:2223,localhost:2224,localhost:2225

In terminal 3:

python mnist_replica.py --job_name=worker --task_index=2
\ --ps_hosts=localhost:2221,localhost:2222
\ --worker_hosts=localhost:2223,localhost:2224,localhost:2225

In terminal 4:

python mnist_replica.py --job_name=ps --task_index=0
\ --ps_hosts=localhost:2221,localhost:2222
\ --worker_hosts=localhost:2223,localhost:2224,localhost:2225

In terminal 5:

python mnist_replica.py --job_name=ps --task_index=1
\ --ps_hosts=localhost:2221,localhost:2222
\ --worker_hosts=localhost:2223,localhost:2224,localhost:2225

As you can see, this is very disturbing especially when we deploy it within different instances/servers. Manually gathering all IP addresses and ports of all nodes involved in our TensorFlow cluster is definitely not a job any engineer or data scientist wants to do especially when the size of the cluster grows. This is where Kubeflow with its TFJob operator intervenes to simplify this operation.

The TFJob operator will mainly create a Docker container for each worker, ps, or master in different nodes of the cluster. Each container will run the image that we specified earlier with ksonnet.

The operator will also pass an environment variable called TF_CONFIG to each docker container which contains a dictionary of the cluster spec, the task that the container will play in the cluster (worker, ps, or a master) and its assigned task-id.

To pass on this information into our distributed MNIST script without having to manually change it, we use a launcher.py script in our container to extract all the information from the TF_CONFIG environment variable and then call our distributed MNIST script with the right arguments exactly as we would call it if we ran it from a terminal.

To create the docker image, we need a Dockerfile defining step in order to build the image. For this example, we are using the following Dockerfile which you can edit to put your training script here instead of the MNIST one.

FROM gcr.io/tensorflow/tensorflow:1.7.0-rc0
RUN apt-get update
RUN mkdir -p /opt
COPY mnist_replica.py /opt
COPY launcher.py /opt
RUN chmod u+x /opt/*
ENTRYPOINT ["/opt/launcher.py"]

From the last line, we can see that any args we pass to the container will be passed to launcher.py and for our case, we passed earlier python /opt/mnist_replica.py as our argument.

Finally, to build the image, these are the steps to follow:

docker_registry=docker.io/raddaoui # change value to your docker registry URL
docker build . -f Dockerfile.tfjob -t $docker_registry/tfjob_mnist_image:2.0
docker push $docker_registry/tfjob_mnist_image:2.0


In this blog, we went through how to install Kubeflow, create a TFJob to train a distributed MNIST model, visualize training output and how to create the docker image for your workers. For convenience, we have put all materials here. As of now, you have everything to get up and running with TFJobs and Kubeflow. Other components will be covered in future blogs!

Stay Connected

Keep tabs on all the latest news with our monthly newsletter.