#!/usr/bin/env python3 """ =head1 NAME switchbotmeterbt - Munin plugin to monitor temperature/humidity with SwitchBot Meter BLE =head1 CONFIGURATION Python 3 and bluepy are necessary. Example for Ubuntu: $ sudo apt install python3-pip $ sudo pip3 install bluepy =head1 ENVIRONMENT VARIABLES user : root privilege is necessary for BLE scan env.macaddr : Mac Address(es) of SwitchBot Meter(s) separated by white-space (Required) env.hcidev : HCI device index. (Optional, default is 0) env.tempunit : Temperature unit. (Optional, default is C) env.scantimeout : Timeout for BLE scan. (Optional, default is 5.0 seconds) Example: [switchbotmeterbt] user root env.macaddr aa:aa:aa:aa:aa:aa bb:bb:bb:bb:bb:bb cc:cc:cc:cc:cc:cc =head1 NOTES For more details about SwitchBot Meter, see https://www.switch-bot.com/products/switchbot-meter =head1 AUTHOR K.Cima https://github.com/shakemid =head1 LICENSE GPLv2 SPDX-License-Identifier: GPL-2.0-only =head1 Magic markers #%# family=contrib #%# capabilities= =cut """ import sys import os import subprocess from bluepy.btle import Scanner, DefaultDelegate class SwitchbotScanDelegate(DefaultDelegate): def __init__(self, macaddrs): super().__init__() self.sensorValues = {} self.macaddrs = macaddrs # Called when advertising data is received from an LE device def handleDiscovery(self, dev, isNewDev, isNewData): if dev.addr in self.macaddrs: for (adtype, desc, value) in dev.getScanData(): if desc == '16b Service Data': munin_debug(value) self._decodeSensorData(dev, value) def _decodeSensorData(self, dev, value): # Extract lower 6 octets valueBinary = bytes.fromhex(value[4:16]) # Refer to Meter BLE open API # https://github.com/OpenWonderLabs/python-host/wiki/Meter-BLE-open-API deviceType = chr(valueBinary[0] & 0b01111111) battery = valueBinary[2] & 0b01111111 tint = valueBinary[4] & 0b01111111 tdec = valueBinary[3] & 0b00001111 temperature = tint + tdec / 10 isTemperatureAboveFreezing = valueBinary[4] & 0b10000000 if not isTemperatureAboveFreezing: temperature = -temperature humidity = valueBinary[5] & 0b01111111 self.sensorValues[dev.addr] = { 'DeviceType': deviceType, 'Temperature': temperature, 'Humidity': humidity, 'Battery': battery, 'RSSI': dev.rssi } munin_debug(str(self.sensorValues)) class SwitchbotMeterPlugin(): def __init__(self): self.params = {} self.params['pluginname'] = os.path.basename(__file__) self.params['macaddrs'] = [str.lower(macaddr) for macaddr in os.getenv('macaddr').split()] self.params['hcidev'] = int(os.getenv('hcidev', 0)) self.params['tempunit'] = os.getenv('tempunit', 'C') self.params['scantimeout'] = float(os.getenv('scantimeout', 5.0)) self.graphs = { 'temp': { 'name': 'Temperature', 'attrs': [ 'graph_title SwitchBot Meter Temperature', 'graph_category sensors', 'graph_vlabel Temp ' + self.params['tempunit'], 'graph_scale no', 'graph_args --base 1000' ] }, 'humid': { 'name': 'Humidity', 'attrs': [ 'graph_title SwitchBot Meter Humidity', 'graph_category sensors', 'graph_vlabel Humid %', 'graph_scale no', 'graph_args --base 1000' ] }, 'batt': { 'name': 'Battery', 'attrs': [ 'graph_title SwitchBot Meter Battery', 'graph_category sensors', 'graph_vlabel %', 'graph_scale no', 'graph_args --base 1000 --lower-limit 0 --upper-limit 100' ] }, 'rssi': { 'name': 'RSSI', 'attrs': [ 'graph_title SwitchBot Meter RSSI', 'graph_category sensors', 'graph_vlabel dB', 'graph_scale no', 'graph_args --base 1000' ] } } def config(self): # print config for k in self.graphs.keys(): print('multigraph ' + self.params['pluginname'] + '_' + k) for line in self.graphs[k]['attrs']: print(line) for macaddr in self.params['macaddrs']: field = macaddr.replace(':', '') print(field + '.label ' + macaddr) def fetch(self): # scan to fetch data scanner = Scanner(self.params['hcidev']).withDelegate(SwitchbotScanDelegate(self.params['macaddrs'])) try: # sometimes it might fail scanner.scan(self.params['scantimeout']) for macaddr in self.params['macaddrs']: check = scanner.delegate.sensorValues[macaddr] except KeyError as e: munin_error('retry scan for exception: ' + str(type(e))) scanner.scan(self.params['scantimeout']) except Exception as e: munin_error('reset hci and retry scan for exception: ' + str(type(e))) subprocess.call(f'hciconfig hci{self.params["hcidev"]} down && hciconfig hci{self.params["hcidev"]} up', shell=True) scanner.scan(self.params['scantimeout']) # print value for k in self.graphs.keys(): print('multigraph ' + self.params['pluginname'] + '_' + k) for macaddr in self.params['macaddrs']: field = macaddr.replace(':', '') try: print(field + '.value ' + str(scanner.delegate.sensorValues[macaddr][self.graphs[k]['name']])) except KeyError: pass def munin_error(message): print(message, file=sys.stderr) def munin_debug(message): if os.getenv('MUNIN_DEBUG') == '1': print('# ' + message) def main(): plugin = SwitchbotMeterPlugin() if len(sys.argv) > 1 and sys.argv[1] == 'config': plugin.config() if os.getenv('MUNIN_CAP_DIRTYCONFIG') == '1': plugin.fetch() else: plugin.fetch() if __name__ == '__main__': main()