mirror of
https://github.com/munin-monitoring/contrib.git
synced 2025-07-21 18:41:03 +00:00
* add naxsi exceptions plugin * Update naxsi_exceptions * Update naxsi_exceptions * rename dir to simpler version * Update naxsi_exceptions * Update naxsi_exceptions
116 lines
2.7 KiB
Python
116 lines
2.7 KiB
Python
#!/usr/bin/env python3
|
|
|
|
"""Munin plugin to monitor naxsi rule hits
|
|
|
|
needs logtail command being installed
|
|
|
|
=head1 NAME
|
|
|
|
naxsi - monitor naxsi rule hits
|
|
|
|
=head1 CONFIGURATION
|
|
|
|
Following config is needed:
|
|
|
|
[naxsi_exceptions]
|
|
env.nginx_error_log_file /var/log/nginx/error.log
|
|
env.warning_level 2
|
|
env.critical_level 20
|
|
|
|
=head1 AUTHOR
|
|
|
|
Jirka Schaefer <patroqueeet@gmail.com>
|
|
|
|
=head1 LICENSE
|
|
|
|
GPLv3
|
|
|
|
=head1 MAGIC MARKERS
|
|
|
|
#%# family=auto
|
|
#%# capabilities=autoconf
|
|
|
|
=head1 NOTES
|
|
|
|
Used to monitor naxsi exception hits and be able to tune whitelists early and prevent false positives.
|
|
|
|
=cut
|
|
"""
|
|
|
|
import os
|
|
import subprocess
|
|
import sys
|
|
import re
|
|
import json
|
|
|
|
state_file = os.path.join(os.environ.get('MUNIN_PLUGSTATE'), os.environ.get('MUNIN_STATEFILE'))
|
|
warning_level = os.environ.get('warning_level', 20)
|
|
critical_level = os.environ.get('critical_level', 2)
|
|
|
|
|
|
def run_binary(args):
|
|
"""Run binary and return output."""
|
|
try:
|
|
return subprocess.run(args, stdout=subprocess.PIPE,
|
|
stderr=subprocess.STDOUT, check=False,
|
|
encoding='utf-8', errors='ignore').stdout
|
|
except FileNotFoundError:
|
|
return ''
|
|
|
|
|
|
def get_values():
|
|
logfile=os.environ.get('nginx_error_log_file')
|
|
if not logfile:
|
|
raise ValueError('no nginx err log file configured')
|
|
|
|
output = run_binary([
|
|
'logtail',
|
|
# '-t', # FIXME test mode,: remove
|
|
'-f',
|
|
logfile ]
|
|
)
|
|
|
|
|
|
with open(state_file, 'rt') as f:
|
|
rule_ids = json.loads(f.read() or '[]')
|
|
|
|
res = dict((k, 0) for k in rule_ids)
|
|
|
|
for line in output.splitlines():
|
|
if 'NAXSI_FMT' in line:
|
|
matches = re.findall(f'&id\d=(\d+)&', line)
|
|
for rule_id in matches:
|
|
res[rule_id] = res.setdefault(rule_id, 0)+1
|
|
|
|
if not res:
|
|
#raise ValueError(f'no results in {len(output.splitlines())} lines:\n{output}')
|
|
return
|
|
|
|
for k,v in res.items():
|
|
print(f"ID_{k}.value {v}")
|
|
|
|
with open(state_file, 'wt') as f:
|
|
f.write(json.dumps(list(res.keys())))
|
|
|
|
|
|
def print_config():
|
|
print('graph_title Naxsi exceptions count by rule id')
|
|
print('graph_vlabel count')
|
|
print('graph_args --base 1000 --lower-limit 0')
|
|
print('graph_category waf')
|
|
print('env.nginx_error_log_file')
|
|
with open(state_file, 'rt') as f:
|
|
rule_ids = json.loads(f.read() or '[]')
|
|
for k in rule_ids:
|
|
print(f'ID_{k}.label ID_{k}')
|
|
print(f'ID_{k}.warning {warning_level}')
|
|
print(f'ID_{k}.critical {critical_level}')
|
|
|
|
|
|
if __name__ == '__main__':
|
|
if len(sys.argv) > 1 and sys.argv[1] == 'autoconf':
|
|
print('no')
|
|
elif len(sys.argv) > 1 and sys.argv[1] == 'config':
|
|
print_config()
|
|
else:
|
|
get_values()
|