#!/usr/bin/env python3 """Munin plugin to monitor naxsi rule hits needs logtail command being installed =head1 NAME naxsi - monitor naxsi rule hits =head1 CONFIGURATION Following config is needed: [naxsi_exceptions] env.nginx_error_log_files /var/log/nginx/error.log /var/log/nginx/error2.log env.warning_level 2 env.critical_level 20 =head1 AUTHOR Jirka Schaefer =head1 LICENSE GPLv3 =head1 MAGIC MARKERS #%# family=auto #%# capabilities=autoconf =head1 NOTES Used to monitor naxsi exception hits and be able to tune whitelists early and prevent false positives. =cut """ import os import subprocess import sys import re import json state_file = os.path.join(os.environ.get('MUNIN_PLUGSTATE'), os.environ.get('MUNIN_STATEFILE')) warning_level = os.environ.get('warning_level', 20) critical_level = os.environ.get('critical_level', 2) def run_binary(args): """Run binary and return output.""" try: return subprocess.run(args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, check=False, encoding='utf-8', errors='ignore').stdout except FileNotFoundError: return '' def get_values(): logfile=os.environ.get('nginx_error_log_files', '').split(' ') if not logfiles: raise ValueError('no nginx err log file configured') output = '' for logfile in logfiles: output += run_binary([ 'logtail', # '-t', # FIXME test mode,: remove '-f', logfile ] ) with open(state_file, 'rt') as f: rule_ids = json.loads(f.read() or '[]') res = dict((k, 0) for k in rule_ids) for line in output.splitlines(): if 'NAXSI_FMT' in line: matches = re.findall(f'&id\d=(\d+)&', line) for rule_id in matches: res[rule_id] = res.setdefault(rule_id, 0)+1 if not res: #raise ValueError(f'no results in {len(output.splitlines())} lines:\n{output}') return for k,v in res.items(): print(f"ID_{k}.value {v}") with open(state_file, 'wt') as f: f.write(json.dumps(list(res.keys()))) def print_config(): print('graph_title Naxsi exceptions count by rule id') print('graph_vlabel count') print('graph_args --base 1000 --lower-limit 0') print('graph_category waf') print('env.nginx_error_log_file') with open(state_file, 'rt') as f: rule_ids = json.loads(f.read() or '[]') for k in rule_ids: print(f'ID_{k}.label ID_{k}') print(f'ID_{k}.warning {warning_level}') print(f'ID_{k}.critical {critical_level}') if __name__ == '__main__': if len(sys.argv) > 1 and sys.argv[1] == 'autoconf': print('no') elif len(sys.argv) > 1 and sys.argv[1] == 'config': print_config() else: get_values()