diff --git a/plugins/naxsi/naxsi_exceptions b/plugins/naxsi/naxsi_exceptions new file mode 100644 index 00000000..163c04cd --- /dev/null +++ b/plugins/naxsi/naxsi_exceptions @@ -0,0 +1,116 @@ +#!/usr/bin/env python3 + +"""Munin plugin to monitor naxsi rule hits + +needs logtail command being installed + +=head1 NAME + +naxsi - monitor naxsi rule hits + +=head1 CONFIGURATION + +Following config is needed: + + [naxsi_exceptions] + env.nginx_error_log_file /var/log/nginx/error.log + env.warning_level 2 + env.critical_level 20 + +=head1 AUTHOR + +Jirka Schaefer + +=head1 LICENSE + +GPLv3 + +=head1 MAGIC MARKERS + + #%# family=auto + #%# capabilities=autoconf + +=head1 NOTES + +Used to monitor naxsi exception hits and be able to tune whitelists early and prevent false positives. + +=cut +""" + +import os +import subprocess +import sys +import re +import json + +state_file = os.path.join(os.environ.get('MUNIN_PLUGSTATE'), os.environ.get('MUNIN_STATEFILE')) +warning_level = os.environ.get('warning_level', 20) +critical_level = os.environ.get('critical_level', 2) + + +def run_binary(args): + """Run binary and return output.""" + try: + return subprocess.run(args, stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, check=False, + encoding='utf-8', errors='ignore').stdout + except FileNotFoundError: + return '' + + +def get_values(): + logfile=os.environ.get('nginx_error_log_file') + if not logfile: + raise ValueError('no nginx err log file configured') + + output = run_binary([ + 'logtail', + # '-t', # FIXME test mode,: remove + '-f', + logfile ] + ) + + + with open(state_file, 'rt') as f: + rule_ids = json.loads(f.read() or '[]') + + res = dict((k, 0) for k in rule_ids) + + for line in output.splitlines(): + if 'NAXSI_FMT' in line: + matches = re.findall(f'&id\d=(\d+)&', line) + for rule_id in matches: + res[rule_id] = res.setdefault(rule_id, 0)+1 + + if not res: + #raise ValueError(f'no results in {len(output.splitlines())} lines:\n{output}') + return + + for k,v in res.items(): + print(f"ID_{k}.value {v}") + + with open(state_file, 'wt') as f: + f.write(json.dumps(list(res.keys()))) + + +def print_config(): + print('graph_title Naxsi exceptions count by rule id') + print('graph_vlabel count') + print('graph_args --base 1000 --lower-limit 0') + print('graph_category waf') + print('env.nginx_error_log_file') + with open(state_file, 'rt') as f: + rule_ids = json.loads(f.read() or '[]') + for k in rule_ids: + print(f'ID_{k}.label ID_{k}') + print(f'ID_{k}.warning {warning_level}') + print(f'ID_{k}.critical {critical_level}') + + +if __name__ == '__main__': + if len(sys.argv) > 1 and sys.argv[1] == 'autoconf': + print('no') + elif len(sys.argv) > 1 and sys.argv[1] == 'config': + print_config() + else: + get_values()