IEI* TANK AIoT Developer Kit and Microsoft Azure*: Running Machine Learning on the Edge

ID 675500
Updated 9/20/2018
Version Latest
Public

author-image

By

Deploying a machine learning model remotely to the edge with Microsoft Azure* IoT Edge can help scale an IoT application. Microsoft Azure IoT Edge works by containerizing the solution into a module to be pushed down to machines at the edge. This paper will convert a Python* based motor defect detector solution to a deployable Azure module. The motor defect detector uses a k-means clustering algorithm to do predictive maintenance on motor obearings to determine if they will fail or not. The module does analysis on simulated data from the motors and then sends messages back to the IoT hub about the status of each bearing. 


Figure 1. Flow of module from Development Machine to Edge Device

Hardware

Setting Up Development Environment and Microsoft Azure*

Follow the Python module tutorial here to make sure all prerequisites are setup. It will walk through setting up Micorsoft Visual Studio* Code and other resources on the development machine as well as creating all the necessary dependencies in Azure. The main things in Azure to have are a standard tier IoT Hub, the edge device registered to the hub, and a registry to store the container images.

Tips

Cookiecutter needs to be added as an environment variable to the Path on the development machine. It is installed at the location below on the development machine used.

C:\Users\\AppData\Roaming\Python\Python36\Scripts\

The development machine should use Python 3 and also needs the following things installed so Visual Studio Code doesn’t show errors on the code in the tutorial:

pip install iothub_client pandas numpy sklearn scipy

Pylint might also needs to be installed if not already for Visual Studio Code.

pip install pylint

Restart Visual Studio code so it can find the installations.

Creating the Module

On the development machine we will use Visual Studio Code to create the module to be deployed to the Tank edge device.

  1. Create a New IoT Edge Solution by right clicking on modules.
  2. Call it MotorDefectDetectorSolution.
  3. Select Python Module and call the module KmeansModule.
  4. Enter in the registry address: .azurecr.io/KmeansModule
  5. The new Edge Solution will open.
  6. Copy in kmeanModel.npy from the motor defect detector GitHub* to the KmeansModule. This is the model file.
  7. Create and copy in utils.py from below. utils.py handles most of the mathematical calculations. It has been edited from the GitHub utils.py by changing it to use the first test set by default and removing the unused plotting functions.
    #importing the libraries
    import numpy as np
    import pandas as pd
    #import matplotlib.pyplot as plt
    
    from scipy.fftpack import fft
    from scipy.spatial.distance import cdist
    #from sklearn import cluster
    
    #cal_labels function take no_of_files as input and generate the label based on 70-30 split.
    #files for the testset1 = 2148,testset2 = 984,testset3 = 6324
    def cal_Labels(files):
        range_low = files*0.7
        range_high = files*1.0
        label = []
        for i in range(0,files):
            if(i= range_low and i <= range_high):
                label.append(1)
            else:
                label.append(2)
        return label
    
    # cal_amplitude take the fftdata, n = no of maximun amplitude as input and return the top5 frequecy which has the highest amplitude
    def cal_amplitude(fftData,n):
        ifa = []
        ia = []
        amp = abs(fftData[0:int(len(fftData)/2)])
        freq = np.linspace(0,10000,num = int(len(fftData)/2))
        ida = np.array(amp).argsort()[-n:][::-1]
        ia.append([amp[i] for i in ida])
        ifa.append([freq[i] for i in ida])
        return(ifa,ia)
    
    # this function calculate the top n freq which has the heighest amplitude and retuen the list for each maximum
    def cal_max_freq(files,path):
        freq_max1, freq_max2, freq_max3, freq_max4, freq_max5 = ([] for _ in range(5))
        for f in files:
            temp = pd.read_csv(path+f,  sep = "\t",header = None)
            temp_freq_max1,temp_freq_max2,temp_freq_max3,temp_freq_max4,temp_freq_max5 = ([] for _ in range(5))
            rhigh = 8
            for i in range(0,rhigh):
                t = fft(temp[i])
                ff,aa = cal_amplitude(t,5)
                temp_freq_max1.append(np.array(ff)[:,0])
                temp_freq_max2.append(np.array(ff)[:,1])
                temp_freq_max3.append(np.array(ff)[:,2])
                temp_freq_max4.append(np.array(ff)[:,3])
                temp_freq_max5.append(np.array(ff)[:,4])
            freq_max1.append(temp_freq_max1)
            freq_max2.append(temp_freq_max2)
            freq_max3.append(temp_freq_max3)
            freq_max4.append(temp_freq_max4)
            freq_max5.append(temp_freq_max5)
        return(freq_max1,freq_max2,freq_max3,freq_max4,freq_max5)
    
    
    def create_dataframe(freq_max1,freq_max2,freq_max3,freq_max4,freq_max5,bearing):
        result = pd.DataFrame()
        result['fmax1'] = list((np.array(freq_max1))[:,bearing])
        result['fmax2'] = list((np.array(freq_max2))[:,bearing])
        result['fmax3'] = list((np.array(freq_max3))[:,bearing])
        result['fmax4'] = list((np.array(freq_max4))[:,bearing])
        result['fmax5'] = list((np.array(freq_max5))[:,bearing])
        x = result[["fmax1","fmax2","fmax3","fmax4","fmax5"]]
        return x

    Code 1. util.py

  8. Copy in the code from main.py below into the default main.py. Main is where the program is going to run and send messages to topic about the status of the bearings. In order to simulate data being generated, the script will download the data sets from NASA* used in the original GitHub project and extract the first test set. Then it will copy the first test set files one by one to folder /tmp/test. This folder is where the program will pull the data from, hence simulating the motor running and gathering data over time. 

    import random
    import time
    import sys
    import iothub_client
    # pylint: disable=E0611
    from iothub_client import IoTHubModuleClient, IoTHubClientError, IoTHubTransportProvider
    from iothub_client import IoTHubMessage, IoTHubMessageDispositionResult, IoTHubError
    
    import pandas as pd
    import numpy as np
    from utils import cal_max_freq
    import os
    import urllib
    import shutil
    
    def checkBearings(hubManager):
        datadir= '/tmp/1st_test/'
        filedir = '/tmp/test/'
        try:
            if not os.path.exists(datadir): 
                os.system("mkdir /tmp/test")
                print("data not found7, downloading")
                urllib.request.urlretrieve("https://ti.arc.nasa.gov/c/3/", "/tmp/IMS.7z")
                print("downloaded, now unzipping")
                os.system("7za x /tmp/IMS.7z -o/tmp/")
                os.system("unrar x /tmp/1st_test.rar /tmp/")
                print("unzipped")
                files = [x for x in os.listdir(datadir)]
                oldest= min(os.listdir(datadir), key=lambda f: os.path.getctime("{}/{}".format(datadir, f)))
                os.rename(datadir + oldest, filedir  + oldest)
        except IOError as e:
            print(e)
            print("error end")
    
        # load the model
        filename = "kmeanModel.npy"
        model = np.load(filename).item() 
    
        # iteration for 1st_test data
        rhigh = 8
    
        moredata= True
    
        while moredata:
            try:
                # load the files
                all_files = os.listdir(filedir)
                freq_max1,freq_max2,freq_max3,freq_max4,freq_max5  =  cal_max_freq(all_files,filedir)
            except IOError as e:
                print("you have entered either the wrong data directory path or filepath ")
                print(e)
                print("error end")
    
    
            #testlabels = []
            for i in range(0,rhigh):
                print("checking for the bearing",i+1)
                result = pd.DataFrame()
                result['freq_max1'] = list((np.array(freq_max1))[:,i])
                result['freq_max2'] = list((np.array(freq_max2))[:,i])
                result['freq_max3'] = list((np.array(freq_max3))[:,i])
                result['freq_max4'] = list((np.array(freq_max4))[:,i])
                result['freq_max5'] = list((np.array(freq_max5))[:,i])
    
                X = result[["freq_max1","freq_max2","freq_max3","freq_max4","freq_max5"]]
    
                label = model.predict(X)
                labelfive = list(label[-100:]).count(5)
                labelsix = list(label[-100:]).count(6)
                labelseven = list(label[-100:]).count(7)
                totalfailur = labelfive+labelsix+labelseven#+labelfour
                ratio = (totalfailur/100)*100
                if(ratio >= 25):
                    print("bearing"+ str(i+1) + " is suspected to fail")
                    hubManager.send_event_to_output("output2", "bearing"+ str(i+1) + " is suspected to fail", 0)
                else:
                    print("bearing"+ str(i+1) + " is working in normal condition")
                    hubManager.send_event_to_output("output2", "bearing"+ str(i+1) + " is working in normal condition", 0)
    
            files = [x for x in os.listdir(datadir)]
            if len(files):
                oldest= min(os.listdir(datadir), key=lambda f: os.path.getctime("{}/{}".format(datadir, f)))
                os.rename(datadir + oldest, filedir  + oldest)
            else:
                moredata = False
                print("done")
    		
    
    # messageTimeout - the maximum time in milliseconds until a message times out.
    # The timeout period starts at IoTHubModuleClient.send_event_async.
    # By default, messages do not expire.
    MESSAGE_TIMEOUT = 10000
    
    # global counters
    RECEIVE_CALLBACKS = 0
    SEND_CALLBACKS = 0
    
    # Choose HTTP, AMQP or MQTT as transport protocol.  Currently only MQTT is supported.
    PROTOCOL = IoTHubTransportProvider.MQTT
    
    # Callback received when the message that we're forwarding is processed.
    def send_confirmation_callback(message, result, user_context):
        global SEND_CALLBACKS
        print ( "Confirmation[%d] received for message with result = %s" % (user_context, result) )
        map_properties = message.properties()
        key_value_pair = map_properties.get_internals()
        print ( "    Properties: %s" % key_value_pair )
        SEND_CALLBACKS += 1
        print ( "    Total calls confirmed: %d" % SEND_CALLBACKS )
    
    
    # receive_message_callback is invoked when an incoming message arrives on the specified 
    # input queue (in the case of this sample, "input1").  Because this is a filter module, 
    # we will forward this message onto the "output1" queue.
    def receive_message_callback(message, hubManager):
        global RECEIVE_CALLBACKS
        message_buffer = message.get_bytearray()
        size = len(message_buffer)
        #print ( "    Data: <<<%s>>> & Size=%d" % (message_buffer[:size].decode('utf-8'), size) )
        map_properties = message.properties()
        key_value_pair = map_properties.get_internals()
        #print ( "    Properties: %s" % key_value_pair )
        RECEIVE_CALLBACKS += 1
        #print ( "    Total calls received: %d" % RECEIVE_CALLBACKS )
        #hubManager.forward_event_to_output("output1", message, 0)
        return IoTHubMessageDispositionResult.ACCEPTED
    
    
    class HubManager(object):
    
        def __init__(
                self,
                protocol=IoTHubTransportProvider.MQTT):
            self.client_protocol = protocol
            self.client = IoTHubModuleClient()
            self.client.create_from_environment(protocol)
    
            # set the time until a message times out
            self.client.set_option("messageTimeout", MESSAGE_TIMEOUT)
            
            # sets the callback when a message arrives on "input1" queue.  Messages sent to 
            # other inputs or to the default will be silently discarded.
            self.client.set_message_callback("input1", receive_message_callback, self)
    
        # Forwards the message received onto the next stage in the process.
        def forward_event_to_output(self, outputQueueName, event, send_context):
            self.client.send_event_async(
                outputQueueName, event, send_confirmation_callback, send_context)
        # Send the message 
        def send_event_to_output(self, outputQueueName, messsage, send_context):
            event=IoTHubMessage(messsage)
            self.client.send_event_async(
                outputQueueName, event, send_confirmation_callback, send_context)
    
    def main(protocol):
        try:
            print ( "\nPython %s\n" % sys.version )
            print ( "IoT Hub Client for Python3" )
    
            hub_manager = HubManager(protocol)
    
            print ( "Starting the IoT Hub Python sample using protocol %s..." % hub_manager.client_protocol )
            print ( "The sample is now waiting for messages and will indefinitely.  Press Ctrl-C to exit. ")
    
            checkBearings(hub_manager)
    
            while True:
                time.sleep(1)
    
        except IoTHubError as iothub_error:
            print ( "Unexpected error %s from IoTHub" % iothub_error )
            return
        except KeyboardInterrupt:
            print ( "IoTHubModuleClient sample stopped" )
    
    if __name__ == '__main__':
        main(PROTOCOL)
    

    Code 2. main.py

  9. Update the requirements.txt. This will install the dependencies of the Motor Fault Detector.
     

    azure-iothub-device-client==1.4.0
    numpy>=1.11.2
    scipy>=1.1.0
    pandas>=0.23.4
    scikit-learn>=0.19.1
    sklearn>=0.0

    Code 3. requirements.txt

  10. And update Dockerfile.amd64.

    Note that the container only comes with Python 2.7 by default so Python 3 needs to be installed and the Python path updated. 

    FROM ubuntu:xenial
    
    WORKDIR /app
    
    RUN apt-get update && \
        apt-get install -y --no-install-recommends libcurl4-openssl-dev libboost-python-dev p7zip-full unrar python3-pip python3-dev python3-setuptools && \
        cd /usr/local/bin && \
        ln -s /usr/bin/python3 python && \
        pip3 install --upgrade pip && \
        rm -rf /var/lib/apt/lists/* 
    
    COPY requirements.txt ./
    RUN pip3 install -r requirements.txt
    
    COPY . .
    
    RUN useradd -ms /bin/bash moduleuser
    USER moduleuser
    
    CMD [ "python3", "-u", "./main.py" ]

    Code 4. Dockerfile.amd64

    With the added files, the module structure should look as below in Visual Studio Code.


    Figure 2. IoT Edge Solution in Visual Studio Code

Deploying the Module

  1. In Visual Studio Code, right click on deployment.template.solution and Build and Push IoT Edge Solution.


    Figure 3. Build and Push IoT Edge Solution location

    It will take some time to build the container with all the requirements. 

  2. Then right click on the Azure IoT Hub Device you want to deploy to and select Create Deployment for Single Device.


    Figure 4. Deploy the module

  3. Log into the Iot Hub Edge Device, in this case, the Tank.
    Use the below command to monitor the progress:
    sudo iotedge logs KmeansModule –f
    It will take some time to download the data and extract it. 
    The module will first download the data, extract the data, and then start copying it into the folder. It will then send the messages back to the IoT hub.
    The messages can be seen on the development machine by right clicking on the  next to Azure IOT HUB Devices and selecting Start Monitoring D2C Message in Visual Studio Code. 


    Figure 5. Monitoring the Device to Cloud messages


    Figure 6. Iot Hub Messages

Useful Module Commands on the Tank

List the module installed:

iotedge list

Remove the container:

sudo docker rm -f KmeansModule

Look at the logs of the container:

sudo iotedge logs KmeansModule –f

Conclusion 

Now the motor defect detector Python project has been converted into a module on the Azure Iot Hub and deployed to an edge device. As a next step, Azure can turn those messages into actionable events with routing.

Learn more at IEI* TANK AIoT Developer Kit

About the Author

Whitney Foster is a software engineer at Intel in the Core and Visual Computing Group working on scale enabling projects for Internet of Things and Computer Vision.