Skip to content
PDF

Receiving data via MQTT for Analytics and Visualization

In this example, data from a sensor providing temperature and humidity as an MQTT Publisher is visualized and processed via the Data Services and the analytics element. Data will be displayed before and after processing.

The instructions below cover the following steps:

  • Provisioning an MQTT broker as a Docker workload
  • Provisioning an MQTT Publisher simulation as a Docker workload
  • Deploying the provisioned Docker workloads to the target node
  • Creating and provisioning an analytics app with the Nerve Data Services SDK
  • Deploying the analytics app as a Docker workload
  • Local Data Visualization of temperature data before and after processing

Provisioning and deploying the sensor simulation and the MQTT broker

In the instructions below two Docker workloads will be provisioned and deployed:

An MQTT broker must to be deployed to the node first in order for the sensor simulation to function. The EMQX MQTT broker is used in this example that can be downloaded from the Docker Hub registry.

After that the temperature and humidity sensors simulation MQTT publisher is deployed. Download the Data Services MQTT demo sensor found under Example Applications from the Nerve Software Center. This is the Docker image that is required for provisioning the demo sensor as a Docker workload.

  1. Log in to the Management System. Make sure that the user has the permissions to access the Data Services.
  2. Provision a Docker workload for the EMQX MQTT broker by following Provisioning a Docker workload. This example uses emqx-4.1.0 as the workload name. Use the following workload version settings:

    Setting Value
    Name Enter any name for the workload version.
    Release name Enter any release name.
    DOCKER IMAGE Select From registry and enter emqx/emqx:v4.1.0.
    Container name emqx
    Network name host
  3. Provision a Docker workload for the sensor simulation by following Provisioning a Docker workload. Use the following workload version settings:

    Setting Value
    Name Enter any name for the workload version.
    Release name Enter any release name.
    DOCKER IMAGE Select Upload to add the Docker image of the sensor simulation that has been downloaded from the Nerve Software Center.
    New environment variable Select the + icon and enter the following information:
    • Env. variable
      MQTT_PUB_TOPIC
    • Variable value
      demo-sensor-topic
    Container name ttt-mqtt-demo-sensor-1.0
    Network name host
  4. Deploy both provisioned Docker workloads above by following Deploying a workload.

Preparing the Nerve Data Services SDK

The Nerve Data Services SDK is required for working with analytics apps. They are created, built and provisioned with it. Download the Nerve Data Services SDK found under Nerve Tools from the Nerve Software Center. Refer to Data analytics for more information.

Creating and provisioning an analytics app

Before working with the SDK, make sure that the Conda environment is active. If the Conda environment is active it will be displayed in parentheses in front. The default name of the Conda environment is nerve-ds-analytics. Activate the Conda environment by entering the following command:

source miniconda/bin/activate <environmentname>

The Conda environment automatically deactivates after a restart so it needs to be activated whenever it is used.

  1. Enter the following command to create an analytics app. demo_sensor_analytics_app is the name used for this example:

    nerve-analytics create demo_sensor_analytics_app .
    
  2. Enter cd demo_sensor_analytics_app to navigate to the newly created folder.

  3. Edit the demo_sensor_analytics_app.py file and insert the following code:

    import signal
    import sys
    
    from nerve_dp_analytics.stream.inputs.input_zeromq import Stream_Input_Zeromq
    from nerve_dp_analytics.batch.outputs.output_timescaledb import Batch_Output_Timescaledb
    
    running = True
    
    def sig_hdlr(signal, frame):
        global running
        running = False
    
        if siz:
            try:
                siz.clear()
            except Exception as e:
                print(e)
    
        if bot:
            try:
                bot.clear()
            except Exception as e:
                print(e)
    
        print('Exiting...')
        sys.exit(0)
    
    # catch CTRL+C
    signal.signal(signal.SIGINT, sig_hdlr)
    
    def celsius_to_kelvin(value):
        return value + 273.15
    
    def normalize_humidity(value):
        return value / 100
    
    try:
        siz = Stream_Input_Zeromq('demo-sensor-analytics-app-siz',
                                  host='172.20.10.1',
                                  port=5555,
                                  topic='demo-sensor-topic')
    
        bot = Batch_Output_Timescaledb('demo-sensor-analytics-app-bot',
                                       table_name='demo_sensor_analyzed_data',
                                       vars={'temperature': 'real',
                                             'humidity': 'real'})
    
        while(running):
            try:
                data = siz.receive(Stream_Input_Zeromq.DTYPE_LIST)
                new_data = list()
    
                for d in data:
                    nd = dict()
    
                    nd['timestamp'] = d['timestamp']
                    nd['temperature'] = celsius_to_kelvin(d['temperature'])
                    nd['humidity'] = normalize_humidity(d['humidity'])
    
                    new_data.append(nd)
    
                for nd in new_data:
                    bot.send(nd)
            except Exception as e:
                print(e)
    except Exception as e:
        print(e)
    

    This analytics app receives data from the Gateway through the ZeroMQ Stream Input, which is a default way of transferring data between the Gateway and analytics. Processed data is stored in a TimescaleDB via the TimescaleDB Batch Output. When only table_name is provided for this output, the analytics write data into the default database of the node that has the node serial number as a name.

    In this example, basic processing is done on the data provided by the demo sensor. Temperature data is converted from Celsius to Kelvin while humidity data is normalized to a range between 0 and 1.

    Note

    The ZeroMQ Publisher output of the Gateway must publish messages on 172.20.10.1 if analytics are running on the node in the nerve-ds Docker network. Consequently, the ZeroMQ Stream Input of the analytics must listen on the same IP address.

  4. Edit the Dockerfile and insert the following:

    FROM python:3.8.3-slim-buster
    
    WORKDIR /nerve
    
    COPY nerve_dp_analytics_api-1.0-py3-none-any.whl .
    
    RUN pip install wheel nerve_dp_analytics_api-1.0-py3-none-any.whl
    
    WORKDIR /
    
    COPY demo_sensor_analytics_app.py .
    
    CMD [ "python", "-u", "demo_sensor_analytics_app.py" ]
    
  5. Enter the following command to build the Docker image containing the analytics app. nerve-ds-2.1.1 is used as the name in this example:

    nerve-analytics build -t nerve-ds-2.1.1
    
  6. Enter the following command to provision the analytics app as a Docker workload in the Management System:

    nerve-analytics provision -u https://<MS-URL> -n "Data Services Analytics - demoSensor App" -vn "nerve-ds-2.1.1" -rn "nerve-ds-2.1.1" -desc "Docker container running a Nerve Data Services analytics app that processes temperature and humidity data." -d analytics-demo-sensor-app -i demo_sensor_analytics_app:nerve-ds-2.1.1
    

    This will provision a Docker workload with the following settings:

    Setting Description
    Workload name Data Services Analytics - demoSensor App
    Description Docker container running a Nerve Data Services analytics app that processes temperature and humidity data.
    Version name nerve-ds-2.1.1
    Release name nerve-ds-2.1.1
    CPU resource in percentage 1
    Container name analytics-demo-sensor-app
    Network name nerve-ds

    Note

    Due to version differences, the workload is created with a maximum of 1% of allowed CPU usage. Change this setting to a value between 10 and 25.

All settings except Container name and Network name in the command above or in the Management System are suggestions and can be changed freely.

With the analytics app provisioned in the Management System, the app needs to be deployed to the node to analyze data coming from the demo sensor. Deploy the app to the node that has the demo sensor and the MQTT broker deployed by following Deploying a workload.

Configuring the Data Services Gateway

The configuration below defines an MQTT Subscriber connection to a ZeroMQ Publisher. Upon receiving data from the demo-sensor-topic topic of the MQTT Subscriber, the Gateway forwards said data to the ZeroMQ Publisher. The ZeroMQ Publisher in turn publishes the data to the demo-sensor-topic ZeroMQ topic that the analytics app listens to. The configuration also defines a connection from the MQTT Subscriber to the TimescaleDB database, which means that the same data received at the MQTT Subscriber end is also written directly into the TimescaleDB.

Note

Note that both topics have the same name in this example. However, they are different as they are topics of two different protocols, MQTT and ZeroMQ.

  1. Access the Local UI on the node. This is Nerve Device specific. Refer to the table below for device specific links to the Local UI. The initial login credentials to the Local UI can be found in the customer profile.

    Nerve Device Physical port Local UI
    MFN 100 P1 http://172.20.2.1:3333
    Kontron KBox A-150-APL LAN 1 <wanip>:3333

    To figure out the IP address of the WAN interface, refer to Finding out the IP address of the device in the Kontron KBox A-150-APL chapter of the device guide.
    Kontron KBox A-250 ETH 2 <wanip>:3333

    To figure out the IP address of the WAN interface, refer to Finding out the IP address of the device in the Kontron KBox A-250 chapter of the device guide.
    Maxtang AXWL10 LAN1 <wanip>:3333

    To figure out the IP address of the WAN interface, refer to Finding out the IP address of the device in the Maxtang AXWL10 chapter of the device guide.
    Siemens SIMATIC IPC127E X1 P1 http://172.20.2.1:3333
    Siemens SIMATIC IPC427E X1 P1 http://172.20.2.1:3333
    Supermicro SuperServer E100-9AP-IA LAN1 <wanip>:3333

    To figure out the IP address of the WAN interface, refer to Finding out the IP address of the device in the Supermicro SuperServer E100-9AP-IA chapter of the device guide.
    Supermicro SuperServer 1019D-16C-FHN13TP LAN3 http://172.20.2.1:3333
    Supermicro SuperServer 5029C-T LAN1 <wanip>:3333

    To figure out the IP address of the WAN interface, refer to Finding out the IP address of the device in the Supermicro SuperServer 5029C-T chapter of the device guide.
    Toshiba FA2100T-700 First rear port http://172.20.2.1:3333
    Vecow SPC-5600-i5-8500 LAN 1 http://172.20.2.1:3333
    Winmate EACIL20 LAN1 <wanip>:3333

    To figure out the IP address of the WAN interface, refer to Finding out the IP address of the device in the Winmate EACIL20 chapter of the device guide.
  2. Select the arrow next to Data to expand the Data Services sub menus in the navigation on the left.

  3. Select Gateway.
  4. Select the Edit configuration icon on the right to enter editing mode.

    !Edit configuration

  5. Create a JSON file out of the following Gateway configuration:

    {
      "inputs": [
        {
          "type": "MQTT_SUBSCRIBER",
          "name": "mqtt_subscriber",
          "clientId": "mqtt_subscriber_0",
          "serverUrl": "tcp://localhost:1883",
          "keepAliveInterval_s": 20,
          "cleanSession": false,
          "qos": 1,
          "connectors": [
            {
              "name": "mqtt_subscriber_connector_0",
              "topic": "demo-sensor-topic",
              "variables": [
                {
                  "name": "temperature",
                  "type": "int16"
                },
                {
                  "name": "humidity",
                  "type": "uint16"
                }
              ]
            }
          ]
        }
      ],
      "outputs": [
        {
          "type": "ZEROMQ_PUBLISHER",
          "name": "zeromq_publisher_0",
          "serverUrl": "tcp://172.20.10.1:5555",
          "connectors": [
            {
              "name": "zeromq_publisher_connector_0",
              "topic": "demo-sensor-topic",
              "timestampRequired": true,
              "timestampFormat": "unix_ns"
            }
          ]
        },
        {
          "type": "DB_TIMESCALE",
          "name": "timescaledb_0",
          "url": "<LOCAL>"
        }
      ],
      "connections": [
        {
          "name": "mqttsub_zmqpub_0",
          "input": {
            "index": 0,
            "connector": 0
          },
          "output": {
            "index": 0,
            "connector": 0
          }
        },
        {
          "name": "mqttsub_timescaledb_0",
          "input": {
            "index": 0,
            "connector": 0
          },
          "output": {
            "index": 1,
            "connector": 0
          }
        }
      ]
    }
    
  6. Select the Import button.

    !Import configuration

  7. Add the JSON configuration file containing the code above from the file browser.

  8. Select the Deploy button. A success message pops up in the upper-right corner.

    !Deploy configuration

The configuration is now deployed. The graphical configuration tool now reflects the contents of the JSON file. Exit editing mode by selecting the arrow on the left. Details of each input and output can be opened by selecting the magnifying glass symbol next to each input and output.

Select the Logs tab to view the Gateway logs for more information.

!Gateway logs

Local data visualization at the node

To visualize the data received by the Gateway and data processed by the analytics app, open the local data visualization element through the Data Services UI on the node. Two queries will be added in the instructions below.

  1. Select Data in the navigation on the left. The Grafana UI will open.

    Note

    Note that the navigation on the left collapses when Data is selected. Select the burger menu in the top-left to expand the navigation again.

  2. Select + > Dashboard in the navigation on the left. A box will appear.

    !Create dashboard

  3. Select Add Query in the New Panel box.

    !Add query

  4. Select the data source from the drop-down menu. The name of the data source is the serial number of the node.

    !Select data source

  5. Fill in the following query information to add the temperature data from the MQTT Subscriber:

    Setting Value
    FROM mqttsub_timescaledb_0

    Time column: "timestamp"
    SELECT Column: temperature
    Format as Time series
  6. Select Add Query to the right to add query B for temperature data analyzed by the analytics app.

  7. Fill in the following query information:

    Setting Value
    FROM demo_sensor_analyzed_data

    Time column: "timestamp"
    SELECT Column: temperature
    Format as Time series
  8. Select the save icon in the upper-right corner to save the dashboard.

    !Save dashboard

The dashboard can be accessed from the Grafana home menu.

!Home menu access