#!/usr/bin/env python3 """ =head1 NAME gpsd_ - Munin plugin to show GPSd quality =head1 CONFIGURATION Requires Python3 and the C library. You should specify the host which gpsd is running on by creating a symlink to this file, e.g.: ln -s /path/to/gps_ /etc/munin/plugins/gpsd_localhost =head1 ENVIRONMENT VARIABLES env.gpsd_collect_time Maximum time to collect data for (in seconds, default 5) =head1 AUTHOR Paul Saunders =head1 MAGIC MARKERS #%# family=contrib #%# capabilities=multigraph =cut """ import os import sys import colorsys import datetime # Helper functions def debug(msg): if os.environ.get("MUNIN_DEBUG", "0") == "1": print(f"# {msg}", file=sys.stderr) def rgb_to_hex(rgb): return "%02X%02X%02X" % tuple(map(int, rgb)) def hex_to_rgb(hexa): return tuple(int(hexa[i : i + 2], 16) for i in (0, 2, 4)) def desaturate(hexa): debug(f"Desaturating {hexa} ({hex_to_rgb(hexa)})") (h, _, v) = colorsys.rgb_to_hsv(*hex_to_rgb(hexa)) debug(f"Got ({h}, _, {v})") return rgb_to_hex(colorsys.hsv_to_rgb(h, 1 / 3, v)) def need_multigraph(): print(f"graph_title {sys.argv[0]} needs MULTIGRAPH") print( "graph_info This plugin requires multigraph capabilities," " but this node does not support it" ) print("graph_total total") sys.exit(1) # The main functionlity def config(): """ Note that we don't graph all values that GPSd could give us. For example, graphing position could be a security issue (revealing the physical location of an internet host). However, graphing things like the _precision_ of position and the number of visible satellites (without listing which satellites) _should_ give a reasonable balance between security and usability. """ ################## # GPS Satellites # ################## print("multigraph gps_satellites") print("graph_title GPS Satellites") print("graph_vlabel # in view") print("graph_category gnss") print("graph_printf %4.0lf") # Integers only graph_order = [] for constellation in sorted( GPS_CONSTELLATIONS, key=lambda item: item.get("gnssid") ): fieldname = constellation.get("name").lower() # Start with the used sats graph_order.append(f"{fieldname}_used") print(f"{fieldname}_used.label {constellation['name']:7} (Used)") print(f"{fieldname}_used.draw AREASTACK") print(f"{fieldname}_used.type GAUGE") print( f"{fieldname}_used.info The number of {constellation['name']}" f" used by the solution" ) print(f"{fieldname}_used.colour {constellation['colour']}") # And now the unused sats graph_order.append(f"{fieldname}") print(f"{fieldname}.label {constellation['name']:7}") print(f"{fieldname}.draw AREASTACK") print(f"{fieldname}.type GAUGE") print(f"{fieldname}.info {constellation['info']}") print(f"{fieldname}.colour {desaturate(constellation['colour'])}") # Finally, create a summation line graph_order.append("total") print("total.label Total") print("total.info Total number of satellites reported by GPSd") print("total.type GAUGE") print("total.draw LINE") print("total.colour 000000") ################# # GPS Precision # ################# print("multigraph gps_precision") print("graph_title GPS Precision") print("graph_vlabel DOP") print("graph_category gnss") print("graph_args -l 0") print( "graph_info " ) for dop in GPS_PRECISIONS.keys(): fieldname = dop.lower() print(f"{fieldname}.label {GPS_PRECISIONS[dop]}") print(f"{fieldname}.draw LINE") print(f"{fieldname}.type GAUGE") print(f"{fieldname}.info {GPS_PRECISIONS[dop]} Dilution of Precision") print(f"{fieldname}.warning :10") print(f"{fieldname}.critical :20") ####################### # GPS Error Estimates # ####################### print("multigraph gps_error_estimate") print("graph_title GPS Error Estimates") print("graph_vlabel estimated error") print("graph_category gnss") for error in GPS_ERRORS: fieldname = error["key"].lower() print(f"{fieldname}.label {error['name']}") print(f"{fieldname}.draw LINE") print(f"{fieldname}.type GAUGE") print(f"{fieldname}.info {error['info']}") ################### # GPS Time Offset # ################### print("multigraph gps_time_offset") print("graph_title GPS Time Offset") print("graph_vlabel seconds") print("graph_category gnss") print("toff.label Time Offset") print("toff.draw LINE") print("toff.type GAUGE") print("toff.info Time offset between GPS and System clocks") sys.exit(0) # For the colours, USED variants are the standard Munin colours, # UNUSED colours are dimmed by dropping the saturation to 33%. GPS_CONSTELLATIONS = [ { "gnssid": 0, "name": "GPS", "info": "The original US system. Also called NavStar", "colour": "00CC00", }, { "gnssid": 1, "name": "SBAS", "info": "Space Based Augmentation system", "colour": "0066B3", }, { "gnssid": 2, "name": "Galileo", "info": "The European Galileo system", "colour": "FF8000", }, { "gnssid": 3, "name": "BeiDou", "info": "The Chinese BeiDou system", "colour": "FFCC00", }, { "gnssid": 5, "name": "QZSS", "info": ( "The Japanese Quasi-Zenith Satellite System." " Only visible around Japan and Australia" ), "colour": "330099", }, { "gnssid": 6, "name": "GLONASS", "info": "The Russian GLObal Navigation System", "colour": "990099", }, ] GPS_PRECISIONS = { "hdop": "Horizontal", "vdop": "Vertical", "pdop": "Position (3D)", "tdop": "Time", "gdop": "Geometric", } GPS_ERRORS = [ { "key": "epc", "name": "Climb", "info": "Estimated climb error in meters per second. Certainty unknown", }, { "key": "epd", "name": "Direction", "info": "Estimated track (direction) error in degrees. Certainty unknown", }, { "key": "eph", "name": "Position (2D)", "info": "Estimated horizontal position (2D) error in meters. Also known as Estimated Position Error (epe). Certainty unknown", }, { "key": "eps", "name": "Speed", "info": "Estimated speed error in metres per second. Certainty unknown", }, { "key": "ept", "name": "Time", "info": "Estimated time stamp error in seconds. Certainty unknown", }, { "key": "epx", "name": "Longitude", "info": "Longitude error estimate in meters. Certainty unknown", }, { "key": "epy", "name": "Latitude", "info": "Latitude error estimate in meters. Certainty unknown", }, { "key": "epv", "name": "Vertical", "info": "Estimated vertical error in meters. Certainty unknown", }, ] if __name__ == "__main__": for k, v in enumerate(sys.argv): debug(f"{k}: {v}") GPSD_SERVER = sys.argv[0].split("_", maxsplit=1)[1] GPS_COLLECT_TIME = os.environ.get("gpsd_collect_time", 5) if os.environ.get("MUNIN_CAP_MULTIGRAPH", None) is None: need_multigraph() if len(sys.argv) >= 2 and sys.argv[1] == "config": config() try: from gpsdclient import GPSDClient except ImportError: print("Unable to import gpsdclient. Do you need to 'pip install gpsdclient'?") client = GPSDClient(host=GPSD_SERVER) gps_stanza = {} debug(f"Gathering from {GPSD_SERVER} for up to {GPS_COLLECT_TIME}s...") start_date = datetime.datetime.now() for result in client.dict_stream(convert_datetime=True): debug(f"{result['class']}") if result["class"] == "SKY" and "SKY" not in gps_stanza.keys(): # First time we've seen a SKY message. Process it print("multigraph gps_satellites") for constellation in sorted( GPS_CONSTELLATIONS, key=lambda item: item.get("gnssid") ): fieldname = constellation.get("name").lower() used = len( [ x for x in result["satellites"] if x["gnssid"] == constellation["gnssid"] and x["used"] ] ) unused = len( [ x for x in result["satellites"] if x["gnssid"] == constellation["gnssid"] and not x["used"] ] ) print(f"{fieldname}_used.value {used}") print(f"{fieldname}.value {unused}") print(f"total.value {len(result['satellites'])}") print("multigraph gps_precision") for dop in GPS_PRECISIONS.keys(): fieldname = dop.lower() value = result[dop] or "U" print(f"{fieldname}.value {value}") if result["class"] == "TPV" and "TPV" not in gps_stanza.keys(): print("multigraph gps_error_estimate") for error in GPS_ERRORS: fieldname = error["key"].lower() try: value = result[error["key"]] except KeyError: value = "U" print(f"{fieldname}.value {value}") if result["class"] == "PPS" and "PPS" not in gps_stanza.keys(): print("multigraph gps_time_offset") value = (result["clock_sec"] - result["real_sec"]) + ( (result["clock_nsec"] - result["real_nsec"]) / 1e9 ) print(f"toff.value {value:.10f}") gps_stanza[result["class"]] = result # We want SKY, TPV and PPS if ( "SKY" in gps_stanza.keys() and "TPV" in gps_stanza.keys() and "PPS" in gps_stanza.keys() ): # Found them all, stop looping break if (datetime.datetime.now() - start_date).total_seconds() >= GPS_COLLECT_TIME: print( f"# Waited more than {GPS_COLLECT_TIME} seconds. This'll have to do: {gps_stanza.keys()}" )