From 17e0fce840ad78921f6751724f28f351a6894cd8 Mon Sep 17 00:00:00 2001 From: heeplr <32984777+heeplr@users.noreply.github.com> Date: Tue, 26 Mar 2019 13:46:37 +0100 Subject: [PATCH] DOCSIS status monitoring MUNIN Plugin to monitor status of Arris TG3442 / TG2492LG-85 and compatible cable modems --- plugins/router/arris-tg3442 | 250 ++++++++++++++++++++++++++++++++++++ 1 file changed, 250 insertions(+) create mode 100644 plugins/router/arris-tg3442 diff --git a/plugins/router/arris-tg3442 b/plugins/router/arris-tg3442 new file mode 100644 index 00000000..35307c74 --- /dev/null +++ b/plugins/router/arris-tg3442 @@ -0,0 +1,250 @@ +#!/usr/bin/env python3 + +""" +# MUNIN Plugin to monitor status of Arris TG3442 / TG2492LG-85 +# and compatible cable modems +# +# Connect to the web-frontend and get current DOCSIS status of upstream and +# downstream channels. (Signal Power, SNR, Lock Status) +# +# +# Requirements: +# - BeautifulSoup +# - pycryptodome +# +# Configuration: +# [arris] +# env.url http://192.168.100.1 +# env.username admin +# env.password yourpassword +# +# Parameters: +# url - URL to web-frontend +# username - defaults to "admin" +# password - valid password +# +# +# References: https://www.arris.com/products/touchstone-tg3442-cable-voice-gateway/ +# +# +# +# Copyright (c) 2019 Daniel Hiepler +# +# Permission to use, copy, and modify this software with or without fee +# is hereby granted, provided that this entire notice is included in +# all source code copies of any software which is or includes a copy or +# modification of this software. +# +# THIS SOFTWARE IS BEING PROVIDED "AS IS", WITHOUT ANY EXPRESS OR +# IMPLIED WARRANTY. IN PARTICULAR, NONE OF THE AUTHORS MAKES ANY +# REPRESENTATION OR WARRANTY OF ANY KIND CONCERNING THE +# MERCHANTABILITY OF THIS SOFTWARE OR ITS FITNESS FOR ANY PARTICULAR +# PURPOSE. +# +# +# Magic markers +# #%# family=contrib +""" + +import binascii +from bs4 import BeautifulSoup +from Crypto.Cipher import AES +import hashlib +import json +import re +import requests +import sys +import os + + +def login(session, url, username, password): + """login to """ + # get login page + r = session.get(f"{url}") + # parse HTML + h = BeautifulSoup(r.text, "lxml") + # get session id from javascript in head + current_session_id = re.search(r".*var currentSessionId = '(.+)';.*", h.head.text)[1] + + # encrypt password + salt = os.urandom(8) + iv = os.urandom(8) + key = hashlib.pbkdf2_hmac( + 'sha256', + bytes(password.encode("ascii")), + salt, + iterations=1000, + dklen=128/8 + ) + secret = { "Password": password, "Nonce": current_session_id } + plaintext = bytes(json.dumps(secret).encode("ascii")) + associated_data = "loginPassword" + # initialize cipher + cipher = AES.new(key, AES.MODE_CCM, iv) + # set associated data + cipher.update(bytes(associated_data.encode("ascii"))) + # encrypt plaintext + encrypt_data = cipher.encrypt(plaintext) + # append digest + encrypt_data += cipher.digest() + # return + login_data = { + 'EncryptData': binascii.hexlify(encrypt_data).decode("ascii"), + 'Name': username, + 'Salt': binascii.hexlify(salt).decode("ascii"), + 'Iv': binascii.hexlify(iv).decode("ascii"), + 'AuthData': associated_data + } + + # login + r = session.put( + f"{url}/php/ajaxSet_Password.php", + headers={ + "Content-Type": "application/json", + "csrfNonce": "undefined" + }, + data=json.dumps(login_data) + ) + + # parse result + result = json.loads(r.text) + # success? + if result['p_status'] == "Fail": + print("login failure", file=sys.stderr) + exit(-1) + # remember CSRF nonce + csrf_nonce = result['nonce'] + + # prepare headers + session.headers.update({ + "X-Requested-With": "XMLHttpRequest", + "csrfNonce": csrf_nonce, + "Origin": f"{url}/", + "Referer": f"{url}/" + }) + # set credentials cookie + session.cookies.set( + "credential", + "eyAidW5pcXVlIjoiMjgwb2FQU0xpRiIsICJmYW1pbHkiOiI4NTIiLCAibW9kZWxuYW1lIjoiV" + "EcyNDkyTEctODUiLCAibmFtZSI6InRlY2huaWNpYW4iLCAidGVjaCI6dHJ1ZSwgIm1vY2EiOj" + "AsICJ3aWZpIjo1LCAiY29uVHlwZSI6IldBTiIsICJnd1dhbiI6ImYiLCAiRGVmUGFzc3dkQ2h" + "hbmdlZCI6IllFUyIgfQ==" + ) + + # set session + r = session.post(f"{url}/php/ajaxSet_Session.php") + +def docsis_status(session): + """get current DOCSIS status page, parse and return channel data""" + r = session.get(f"{url}/php/status_docsis_data.php") + # extract json from javascript + json_downstream_data = re.search(r".*json_dsData = (.+);.*", r.text)[1] + json_upstream_data = re.search(r".*json_usData = (.+);.*", r.text)[1] + # parse json + downstream_data = json.loads(json_downstream_data) + upstream_data = json.loads(json_upstream_data) + # convert lock status to numeric values + for d in [ upstream_data, downstream_data ]: + for c in d: + if c['LockStatus'] == "ACTIVE" or c['LockStatus'] == "Locked": + c['LockStatus'] = 1 + else: + c['LockStatus'] = 0 + return downstream_data, upstream_data + + +# ----------------------------------------------------------------------------- +if __name__ == "__main__": + # get config + url = os.getenv("url") + username = os.getenv("username") + password = os.getenv("password") + # validate config + if not url or not username or not password: + print("Set url, username and password first.", file=sys.stderr) + exit(1) + # create session + session = requests.Session() + # login with username and password + login(session, url, username, password) + # get DOCSIS status + downstream, upstream = docsis_status(session) + # prepare munin graph info + graph_descriptions = [ + { + "name": "up_signal", + "title": "DOCSIS Upstream signal strength", + "vlabel": "dBmV", + "info": "DOCSIS upstream signal strength by channel", + "data": upstream, + "key": "PowerLevel" + }, + { + "name": "up_lock", + "title": "DOCSIS Upstream lock", + "vlabel": "locked", + "info": "DOCSIS upstream channel lock status", + "data": upstream, + "key": "LockStatus" + }, + { + "name": "down_signal", + "title": "DOCSIS Downstream signal strength", + "vlabel": "dBmV", + "info": "DOCSIS downstream signal strength by channel", + "data": downstream, + "key": "PowerLevel" + }, + { + "name": "down_lock", + "title": "DOCSIS Downstream lock", + "vlabel": "locked", + "info": "DOCSIS downstream channel lock status", + "data": downstream, + "key": "LockStatus" + }, + { + "name": "down_snr", + "title": "DOCSIS Downstream signal/noise ratio", + "vlabel": "dB", + "info": "SNR/MER", + "data": downstream, + "key": "SNRLevel" + } + ] + + # configure ? + if len(sys.argv) > 1 and "config" in sys.argv[1]: + # process all graphs + for g in graph_descriptions: + # graph config + print( + f"multigraph docsis_{g['name']}\n" + f"graph_title {g['title']}\n" \ + f"graph_category network\n" \ + f"graph_vlabel {g['vlabel']}\n" \ + f"graph_info {g['info']}\n" \ + f"graph_scale no\n" + ) + + # channels + for c in g['data']: + # only use channels with PowerLevel + if not c['PowerLevel']: + continue + print( + f"channel_{c['ChannelID']}.label {c['ChannelID']} ({c['Frequency']} MHz)\n" + f"channel_{c['ChannelID']}.info Channel type: {c['ChannelType']}, Modulation: {c['Modulation']}" + ) + + # output values ? + else: + # process all graphs + for g in graph_descriptions: + print(f"multigraph docsis_{g['name']}") + # channels + for c in g['data']: + # only use channels with PowerLevel + if not c['PowerLevel']: + continue + print(f"channel_{c['ChannelID']}.value {c[g['key']]}")