Receiving data via MQTT for Analytics and Visualization
In this example, data from a sensor providing temperature and humidity as an MQTT Publisher is visualized and processed via the Data Services and the analytics element. Data will be displayed before and after processing.
The instructions below cover the following steps:
- Provisioning an MQTT broker as a Docker workload
- Provisioning an MQTT Publisher simulation as a Docker workload
- Deploying the provisioned Docker workloads to the target node
- Creating and provisioning an analytics app with the Nerve Data Services SDK
- Deploying the analytics app as a Docker workload
- Local Data Visualization of temperature data before and after processing
Provisioning and deploying the sensor simulation and the MQTT broker
In the instructions below two Docker workloads will be provisioned and deployed:
An MQTT broker must to be deployed to the node first in order for the sensor simulation to function. The EMQX MQTT broker is used in this example that can be downloaded from the Docker Hub registry.
After that the temperature and humidity sensors simulation MQTT publisher is deployed. Download the Data Services MQTT demo sensor found under Example Applications from the Nerve Software Center. This is the Docker image that is required for provisioning the demo sensor as a Docker workload.
- Log in to the Management System. Make sure that the user has the permissions to access the Data Services.
-
Provision a Docker workload for the EMQX MQTT broker by following Provisioning a Docker workload. This example uses emqx-4.1.0 as the workload name. Use the following workload version settings:
Setting Value Name Enter any name for the workload version. Release name Enter any release name. DOCKER IMAGE Select From registry and enter emqx/emqx:v4.1.0
.Container name emqx
Network name host
-
Provision a Docker workload for the sensor simulation by following Provisioning a Docker workload. Use the following workload version settings:
Setting Value Name Enter any name for the workload version. Release name Enter any release name. DOCKER IMAGE Select Upload to add the Docker image of the sensor simulation that has been downloaded from the Nerve Software Center. New environment variable Select the + icon and enter the following information: - Env. variable
MQTT_PUB_TOPIC
- Variable value
demo-sensor-topic
Container name ttt-mqtt-demo-sensor-1.0
Network name host
- Env. variable
-
Deploy both provisioned Docker workloads above by following Deploying a workload.
Preparing the Nerve Data Services SDK
The Nerve Data Services SDK is required for working with analytics apps. They are created, built and provisioned with it. Download the Nerve Data Services SDK found under Nerve Tools from the Nerve Software Center. Refer to Data analytics for more information.
Creating and provisioning an analytics app
Before working with the SDK, make sure that the Conda environment is active. If the Conda environment is active it will be displayed in parentheses in front. The default name of the Conda environment is nerve-ds-analytics
. Activate the Conda environment by entering the following command:
source miniconda/bin/activate <environmentname>
The Conda environment automatically deactivates after a restart so it needs to be activated whenever it is used.
-
Enter the following command to create an analytics app.
demo_sensor_analytics_app
is the name used for this example:nerve-analytics create demo_sensor_analytics_app .
-
Enter
cd demo_sensor_analytics_app
to navigate to the newly created folder. -
Edit the
demo_sensor_analytics_app.py
file and insert the following code:import signal import sys from nerve_dp_analytics.stream.inputs.input_zeromq import Stream_Input_Zeromq from nerve_dp_analytics.batch.outputs.output_timescaledb import Batch_Output_Timescaledb running = True def sig_hdlr(signal, frame): global running running = False if siz: try: siz.clear() except Exception as e: print(e) if bot: try: bot.clear() except Exception as e: print(e) print('Exiting...') sys.exit(0) # catch CTRL+C signal.signal(signal.SIGINT, sig_hdlr) def celsius_to_kelvin(value): return value + 273.15 def normalize_humidity(value): return value / 100 try: siz = Stream_Input_Zeromq('demo-sensor-analytics-app-siz', host='172.20.10.1', port=5555, topic='demo-sensor-topic') bot = Batch_Output_Timescaledb('demo-sensor-analytics-app-bot', table_name='demo_sensor_analyzed_data', vars={'temperature': 'real', 'humidity': 'real'}) while(running): try: data = siz.receive(Stream_Input_Zeromq.DTYPE_LIST) new_data = list() for d in data: nd = dict() nd['timestamp'] = d['timestamp'] nd['temperature'] = celsius_to_kelvin(d['temperature']) nd['humidity'] = normalize_humidity(d['humidity']) new_data.append(nd) for nd in new_data: bot.send(nd) except Exception as e: print(e) except Exception as e: print(e)
This analytics app receives data from the Gateway through the ZeroMQ Stream Input, which is a default way of transferring data between the Gateway and analytics. Processed data is stored in a TimescaleDB via the TimescaleDB Batch Output. When only
table_name
is provided for this output, the analytics write data into the default database of the node that has the node serial number as a name.In this example, basic processing is done on the data provided by the demo sensor. Temperature data is converted from Celsius to Kelvin while humidity data is normalized to a range between 0 and 1.
Note
The ZeroMQ Publisher output of the Gateway must publish messages on
172.20.10.1
if analytics are running on the node in thenerve-ds
Docker network. Consequently, the ZeroMQ Stream Input of the analytics must listen on the same IP address. -
Edit the
Dockerfile
and insert the following:FROM python:3.8.3-slim-buster WORKDIR /nerve COPY nerve_dp_analytics_api-1.0-py3-none-any.whl . RUN pip install wheel nerve_dp_analytics_api-1.0-py3-none-any.whl WORKDIR / COPY demo_sensor_analytics_app.py . CMD [ "python", "-u", "demo_sensor_analytics_app.py" ]
-
Enter the following command to build the Docker image containing the analytics app.
nerve-ds-2.1.1
is used as the name in this example:nerve-analytics build -t nerve-ds-2.1.1
-
Enter the following command to provision the analytics app as a Docker workload in the Management System:
nerve-analytics provision -u https://<MS-URL> -n "Data Services Analytics - demoSensor App" -vn "nerve-ds-2.1.1" -rn "nerve-ds-2.1.1" -desc "Docker container running a Nerve Data Services analytics app that processes temperature and humidity data." -d analytics-demo-sensor-app -i demo_sensor_analytics_app:nerve-ds-2.1.1
This will provision a Docker workload with the following settings:
Setting Description Workload name Data Services Analytics - demoSensor App Description Docker container running a Nerve Data Services analytics app that processes temperature and humidity data. Version name nerve-ds-2.1.1 Release name nerve-ds-2.1.1 CPU resource in percentage 1 Container name analytics-demo-sensor-app Network name nerve-ds Note
Due to version differences, the workload is created with a maximum of 1% of allowed CPU usage. Change this setting to a value between 10 and 25.
All settings except Container name and Network name in the command above or in the Management System are suggestions and can be changed freely.
With the analytics app provisioned in the Management System, the app needs to be deployed to the node to analyze data coming from the demo sensor. Deploy the app to the node that has the demo sensor and the MQTT broker deployed by following Deploying a workload.
Configuring the Data Services Gateway
The configuration below defines an MQTT Subscriber connection to a ZeroMQ Publisher. Upon receiving data from the demo-sensor-topic
topic of the MQTT Subscriber, the Gateway forwards said data to the ZeroMQ Publisher. The ZeroMQ Publisher in turn publishes the data to the demo-sensor-topic
ZeroMQ topic that the analytics app listens to. The configuration also defines a connection from the MQTT Subscriber to the TimescaleDB database, which means that the same data received at the MQTT Subscriber end is also written directly into the TimescaleDB.
Note
Note that both topics have the same name in this example. However, they are different as they are topics of two different protocols, MQTT and ZeroMQ.
-
Access the Local UI on the node. This is Nerve Device specific. Refer to the table below for device specific links to the Local UI. The initial login credentials to the Local UI can be found in the customer profile.
Nerve Device Physical port Local UI MFN 100 P1 http://172.20.2.1:3333 Kontron KBox A-150-APL LAN 1 <wanip>:3333
To figure out the IP address of the WAN interface, refer to Finding out the IP address of the device in the Kontron KBox A-150-APL chapter of the device guide.Kontron KBox A-250 ETH 2 <wanip>:3333
To figure out the IP address of the WAN interface, refer to Finding out the IP address of the device in the Kontron KBox A-250 chapter of the device guide.Maxtang AXWL10 LAN1 <wanip>:3333
To figure out the IP address of the WAN interface, refer to Finding out the IP address of the device in the Maxtang AXWL10 chapter of the device guide.Siemens SIMATIC IPC127E X1 P1 http://172.20.2.1:3333 Siemens SIMATIC IPC427E X1 P1 http://172.20.2.1:3333 Supermicro SuperServer E100-9AP-IA LAN1 <wanip>:3333
To figure out the IP address of the WAN interface, refer to Finding out the IP address of the device in the Supermicro SuperServer E100-9AP-IA chapter of the device guide.Supermicro SuperServer 1019D-16C-FHN13TP LAN3 http://172.20.2.1:3333 Supermicro SuperServer 5029C-T LAN1 <wanip>:3333
To figure out the IP address of the WAN interface, refer to Finding out the IP address of the device in the Supermicro SuperServer 5029C-T chapter of the device guide.Toshiba FA2100T-700 First rear port http://172.20.2.1:3333 Vecow SPC-5600-i5-8500 LAN 1 http://172.20.2.1:3333 Winmate EACIL20 LAN1 <wanip>:3333
To figure out the IP address of the WAN interface, refer to Finding out the IP address of the device in the Winmate EACIL20 chapter of the device guide. -
Select the arrow next to Data to expand the Data Services sub menus in the navigation on the left.
- Select Gateway.
-
Select the Edit configuration icon on the right to enter editing mode.
-
Create a JSON file out of the following Gateway configuration:
{ "inputs": [ { "type": "MQTT_SUBSCRIBER", "name": "mqtt_subscriber", "clientId": "mqtt_subscriber_0", "serverUrl": "tcp://localhost:1883", "keepAliveInterval_s": 20, "cleanSession": false, "qos": 1, "connectors": [ { "name": "mqtt_subscriber_connector_0", "topic": "demo-sensor-topic", "variables": [ { "name": "temperature", "type": "int16" }, { "name": "humidity", "type": "uint16" } ] } ] } ], "outputs": [ { "type": "ZEROMQ_PUBLISHER", "name": "zeromq_publisher_0", "serverUrl": "tcp://172.20.10.1:5555", "connectors": [ { "name": "zeromq_publisher_connector_0", "topic": "demo-sensor-topic", "timestampRequired": true, "timestampFormat": "unix_ns" } ] }, { "type": "DB_TIMESCALE", "name": "timescaledb_0", "url": "<LOCAL>" } ], "connections": [ { "name": "mqttsub_zmqpub_0", "input": { "index": 0, "connector": 0 }, "output": { "index": 0, "connector": 0 } }, { "name": "mqttsub_timescaledb_0", "input": { "index": 0, "connector": 0 }, "output": { "index": 1, "connector": 0 } } ] }
-
Select the Import button.
-
Add the JSON configuration file containing the code above from the file browser.
-
Select the Deploy button. A success message pops up in the upper-right corner.
The configuration is now deployed. The graphical configuration tool now reflects the contents of the JSON file. Exit editing mode by selecting the arrow on the left. Details of each input and output can be opened by selecting the magnifying glass symbol next to each input and output.
Select the Logs tab to view the Gateway logs for more information.
Local data visualization at the node
To visualize the data received by the Gateway and data processed by the analytics app, open the local data visualization element through the Data Services UI on the node. Two queries will be added in the instructions below.
-
Select Data in the navigation on the left. The Grafana UI will open.
Note
Note that the navigation on the left collapses when Data is selected. Select the burger menu in the top-left to expand the navigation again.
-
Select + > Dashboard in the navigation on the left. A box will appear.
-
Select Add Query in the New Panel box.
-
Select the data source from the drop-down menu. The name of the data source is the serial number of the node.
-
Fill in the following query information to add the temperature data from the MQTT Subscriber:
Setting Value FROM mqttsub_timescaledb_0
Time column: "timestamp"SELECT Column: temperature Format as Time series -
Select Add Query to the right to add query B for temperature data analyzed by the analytics app.
-
Fill in the following query information:
Setting Value FROM demo_sensor_analyzed_data
Time column: "timestamp"SELECT Column: temperature Format as Time series -
Select the save icon in the upper-right corner to save the dashboard.
The dashboard can be accessed from the Grafana home menu.