Export MQTT Topics to InfluxDB

8 minutes
February 4, 2023

Intro

The goal of this blog series is to collect the sensor data of different IoT devices via MQTT, store them in InfluxDB 2.6 and visualize them with Grafana.

The previous blogs from this series:

In this post I will describe how to collect the sensor data via a MQTT Broker and store them in InfluxDB 2.6 with MQTT Exporter. Like all the other services, MQTT Exporter will run containerized managed with podman kube on openSUSE MicroOS.

The example configuration is for Shelly Plug S, Shelly H&T and Shelly Plus H&T devices. As explained in Setup MQTT Broker, IoT Devices and Security, I use custom MQTT prefixes with topics in the form of shellies/shelly-plug-sX, shellies/shelly-ht-XX and shelly-ht/shelly-plus-ht-XX.

Connect to MQTT Broker

The configuration file for MQTT Broker is divided in three parts: how to connect with the MQTT Broker, how to connect with the InfluxDB and the mapping between the topics and how to store.

For security reasons we will only use TLS encrypted connections, so that the password to authenticate against MQTT will not transfered in plaintext. The part for MQTT looks like:

mqtt:
  broker: mqtt.example.com
  # port: 8883
  protocol: mqtts
  user: <username>
  password: <password>
  client_id: mqtt-exporter-shellies
  topic_paths:
   - shellies/#
   - shelly-ht/#
  device_id_regex: ".*?/(?P<deviceid>.*?)/.*"
  metric_per_topic_regex: ".*/(?P<metricname>.*)"
  qos: 0

The variables have the following meaning:

  • broker defines the hostname or IP address under which the MQTT broker is reachable.
  • port and protocol define on which port with which protocol the MQTT broker is listening. The default port 1883 is unecrypted and implies mqtt as protocol, port 8883 is the default port for TLS encrypted connections and uses mqtts as protocol. Same the other way around: mqtt implies port 1883 if not otherwise specified and mqtts implies port 8883 if not otherwise specified.
  • user is the username for authentication with the MQTT broker. It’s also possible to specify it via the environment variable MQTT_BROKER.
  • password is the password for authentication with the MQTT broker. For security reason it is adviced to not write that password in this configuration file, but specify it as environment variable MQTT_PASSWORD.
  • client_id is the unique client ID. If not specified, hostname-PID will be used.
  • topic_paths defines the tpic paths to subscribe to. This can be more than one, but you can only specify one device_id_regex and metric_per_topic_regex. This must fit for all subscribed topics. If this is not possible, an own MQTT exporter instance needs to be started for every topic.
  • device_id_regex is a regular expression to extract the device ID from the topic path. The default regular expression, assumes that the last “element” of the topic_path is the device id. The regular expression must contain a named capture group with the name deviceid. For example the expression for tasamota based sensors is “tasmota/discovery/(?P.)/.”.
  • metric_per_topic_regex is the regular expression used to extract the metric name from the topic. Must contain a named group for metricname.
  • qos defines the MQTT QoS level, default is 0, which is good enough for this use case.

Connect to InfluxDB

MQTT Exporter works with InfluxDB >= 1.8 and InfluxDB 2.x. If InfluxDB 2.x is used and the token has permissions to create new buckets, MQTT Exporter can create the bucket if it does not exist.

influxdb:
  server: influxdb.example.com
  database: shellies
  organization: my-org
  #token: <token>
  • server is the hostname or IP address of the InfluxDB.
  • database is the database or bucket, into which MQTT Exporter should write the data
  • organization is only used with InfluxDB 2 and specifies where the bucket can be found or should be created.
  • token is the authentication token to connect with InfluxDB. For InfluxDB 1.x it is username:password, for InfluxDB 2.x this is a token generated in the UI or commandline tool and specifies which access rights the user has. For security reasons, to avoid that the token is accidently commited to a public git repo or something similar, the token can also be specified via the environment variable INFLUXDB_TOKEN. This should be preferred method.

Metrics mapping

This section defines which MQTT metrics/topics will be exported, in which format and what the key in the database will be. Only metrics listed here will be exported. The first few examples assume that the MQTT message is just one single value. The case where a MQTT message is a json struct is handled below.

Sinle value MQTT message

metrics:
  - mqtt_name: power
    name: power
    unit: Watt
    type: float

mqtt_name and type are required fields. name is optional and will be set to mqtt_name if not specified. unit is optional and is stored as tag["unit"].

mqtt_name will be compared with the metricname of metric_per_topic_regex. If they match, this entry is used to store the value of the MQTT topic in InfluxDB in the format specified by type. Valid types are float, int and string. name specifies the key under which the value is stored.

Mapping of strings to integers

Sometimes it’s easier to compare simple integer values instead of strings in the query language, especially if you have e.g. a switch, which can only have the values “on” and “off”. In this case you can specify a mapping of strings to integers with string_value_mapping instead of specifing type. Beside the map you can specify an error value, which is used if the value of the MQTT topic doesn’t match any string in the mapping.

  - mqtt_name: 0
    name: switch
    string_value_mapping:
      map:
        off: 0
        low: 1
        on: 2
      error_value: -1

MQTT json message

In the case the clients send arbitrary JSON messages on the topic we need a way to specify the topic name and the “way” inside the json struct to the value.

As example:

shelly-ht/shelly-plus-ht-01/events/rpc {
  "src": "shellyplusht-08b61fce63c4",
  "dst": "shelly-ht/shelly-plus-ht-01/events",
  "method": "NotifyFullStatus",
  "params": {
    "ts": 1674406876.85,
    "ble": {},
    "cloud": {
      "connected": false
    },
    "devicepower:0": {
      "id": 0,
      "battery": {
        "V": 6.16,
        "percent": 100
      },
      "external": {
        "present": false
      }
    },
    "ht_ui": {},
    "humidity:0": {
      "id": 0,
      "rh": 56.9
    },
    "mqtt": {
      "connected": true
    },
    "sys": {
      "mac": "08B61FCE63C4",
      "restart_required": false,
      "time": null,
      "unixtime": null,
      "uptime": 1,
      "ram_size": 235504,
      "ram_free": 165344,
      "fs_size": 458752,
      "fs_free": 131072,
      "cfg_rev": 16,
      "kvs_rev": 0,
      "webhook_rev": 0,
      "available_updates": {},
      "wakeup_reason": {
        "boot": "deepsleep_wake",
        "cause": "periodic"
      },
      "wakeup_period": 7200
    },
    "temperature:0": {
      "id": 0,
      "tC": 19.9,
      "tF": 67.7
    },
    "wifi": {
      "sta_ip": "172.17.0.80",
      "status": "got ip",
      "ssid": "my-wifi",
      "rssi": -67
    },
    "ws": {
      "connected": false
    }
  }
}

Assume we want now the temperature in Celsius from the above struct, this means we have a MQTT topic shelly-ht/shelly-plus-ht-01/events/rpc which leads to the metricname rpc. Inside JSON struct we want to have the value of the key tC of temperature:0 which can be found in params. MQTT Exporter uses gojsonq to find the key, please look at that examples for more informations how to construct the search. In short: the different parts of the json path are seperated with a dot: .

The mqtt_name has now the form of: metricname.json.path. In our concrete example, this would be rpc for the metricname and params.temperature:0.tC. The metricname and json path are also seperated by a dot. The resulting mqtt_name for this example would be: rpc.params.temperature:0.tC

  - mqtt_name: rpc.params.temperature:0.tC
    name: temperature
    unit: C
    type: float

Additional tags

A list of additional tags can be specified in the form “key: value”. They will always be added if the metricname matches this rule.

  - mqtt_name: rpc.params.devicepower:0.battery.percent
    name: battery_voltage
    unit: "%"
    type: float
    const_tags:
      device: "Shelly H&T"
      battery: "Type C"

Full example configuration file

Configuration for Shelly devices

This is the full configuration file I use for my Shelly devices:

mqtt:
  broker: mqtt.example.com
  # port: 8883
  protocol: mqtts
  user: <username>
  password: <password>
  client_id: mqtt-exporter-shellies
  topic_paths:
   - shellies/#
   - shelly-ht/#
  device_id_regex: ".*?/(?P<deviceid>.*?)/.*"
  metric_per_topic_regex: ".*/(?P<metricname>.*)"
  qos: 0
influxdb:
  server: influxdb.example.com
  database: shellies
  organization: my-org
  #token: <token>
metrics:
  # The first metrics are for the Shelly Plug S
  - mqtt_name: temperature
    name: temperature
    unit: C
    type: float
  - mqtt_name: power
    name: power
    unit: Watt
    type: float
  - mqtt_name: energy
    name: energy
    unit: Watt/Minute
    type: int
  - mqtt_name: 0
    name: switch
    string_value_mapping:
      map:
        off: 0
        low: 1
        on: 2
      error_value: -1
  - mqtt_name: info.update.has_update
    name: firmware_update
    unit: Boolean
    string_value_mapping:
      map:
        true: 1
        false: 0
      error_value: -1
  - mqtt_name: info.update.old_version
    name: current_firmware_version
    unit: Version
    type: string
  - mqtt_name: info.wifi_sta.ip
    name: ipaddress
    type: string
  # the following metrics are for the Shelly Plus H&T
  - mqtt_name: rpc.params.temperature:0.tC
    name: temperature
    unit: C
    type: float
  - mqtt_name: rpc.params.humidity:0.rh
    name: humidity
    unit: rh
    type: float
  - mqtt_name: rpc.params.devicepower:0.battery.V
    name: battery_voltage
    unit: Volt
    type: float
  - mqtt_name: rpc.params.devicepower:0.battery.percent
    name: battery_voltage
    unit: "%"
    type: float
  - mqtt_name: rpc.params.wifi.sta_ip
    name: ip_address
    type: string
  - mqtt_name: humidity
    name: humidity
    unit: rh
    type: float
  - mqtt_name: battery
    name: battery
    unit: "%"
    type: int
health_check: ":8080"

Configuration for Sonoff SNZB-02 devices

This is an example configuration file for Sonoff SNZB-02 devices connected via Zigbee2MQTT. I changed the device name to something readable following the scheme SONOFF-SNZB-02-X, where X is a consecutive number. The metric name for the measurement variable in InfluxDB is SONOFF-SNZB-02, without the consecutive number.

The MQTT topic used by the Sonoff devices is zigbee2mqtt/SONOFF-SNZB-02-X, an example message looks like:

zigbee2mqtt/SONOFF-SNZB-02-2 {
  "battery": 100,
  "humidity": 64.87,
  "linkquality": 244,
  "temperature": 6.6,
  "voltage": 3000
}

The configuration file:

mqtt:
  broker: broker.example.com
  port: 8883
  topic_paths:
    - zigbee2mqtt/#
  device_id_regex: "zigbee2mqtt/(?P<deviceid>.*)"
  metric_per_topic_regex: ".*/(?P<metricname>.*)-[0-9]"
  qos: 0
influxdb:
  server: influxdb.example.com
  database: zigbee
  organization: my-org
metrics:
  - mqtt_name: SONOFF-SNZB-02.temperature
    name: temperature
    type: float
  - mqtt_name: SONOFF-SNZB-02.humidity
    name: humidity
    type: float
  - mqtt_name: SONOFF-SNZB-02.battery
    name: battery
    type: float
  - mqtt_name: SONOFF-SNZB-02.linkquality
    name: linkquality
    type: int
  - mqtt_name: SONOFF-SNZB-02.voltage
    name: voltage
    type: int

Health Check

Liveness and readiness health checks are needed if the service runs in Kubernetes. The livness probe tells kubernetes that the application is alive, if the service does not answer, the service will be restarted. The readiness probe tells kubernetes, when the container is ready to serve traffic.

The endpoints are:

  • IP:Port/healthz for the liveness probe
  • IP:Port/readyz for the readiness probe

The IP:Port will be defined with the health_check option in the configuration file. If this config variable is not set, the health check stay disabled.

Example:

health_check: ":8080"