#!/usr/bin/env python3 """ =head1 NAME crowdsec - Plugin to monitor CrowdSec =head1 ABOUT Requires Python 3.6 Requires CrowdSec 1.4 =head1 AUTHOR Copyright (c) 2024 d0m84 =head1 CONFIGURATION Add the following to your @@CONFDIR@@/munin-node: [crowdsec] user root cli_path /usr/bin/cscli =head1 LICENSE GNU GPLv2 or any later version =begin comment This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. =end comment =head1 MAGIC MARKERS #%# family=auto #%# capabilities=autoconf =cut """ import subprocess import json import sys from textwrap import dedent import os import hashlib import shutil def autoconf(): if shutil.which('cscli') is None: print('no (cscli not found)') else: print('yes') def call_cli(*args): cli = os.environ.get('cli_path') cli = cli if cli is not None else shutil.which('cscli') o = subprocess.check_output([cli, '-o', 'json'] + list(args)) return json.loads(o) class State(): def __init__(self): self.state_file = f'{os.environ["MUNIN_PLUGSTATE"]}/crowdsec_s.json' if not os.path.isfile(self.state_file): self.content = {} with open(self.state_file, 'w') as file: json.dump(self.content, file) else: with open(self.state_file, 'r') as file: self.content = json.load(file) def read_section(self, section): if section in self.content: return self.content[section] else: return None def write_section(self, section, data): self.content[section] = data with open(self.state_file, 'w') as file: json.dump(self.content, file) class Decisions(): def __init__(self): _state = state.read_section('scenarios') self.state = _state if _state is not None else {} self.data = call_cli('decisions', 'list') if self.data is not None: self.decisions = len(self.data) self.banned_ips = len(set([d['source']['ip'] for d in self.data if d['source']['scope'] == 'Ip'])) # noqa: E501 self.banned_cidrs = len(set([d['source']['ip'] for d in self.data if d['source']['scope'] == 'Range'])) # noqa: E501 self.decisions_by_scenario = {} for d in self.data: s = d['scenario'].replace('#', '-') if s in self.decisions_by_scenario: self.decisions_by_scenario[s] += 1 else: self.decisions_by_scenario[s] = 1 else: self.decisions, self.banned_ips, self.banned_cidrs = 0, 0, 0 self.decisions_by_scenario = {} def config(self): print(dedent(""" multigraph decisions graph_title CrowdSec Decisions graph_args --base 1000 --lower-limit 0 graph_vlabel Amount graph_category security banned_ips.label Banned IP addresses banned_ips.type GAUGE banned_ips.min 0 banned_cidrs.label Banned IP ranges banned_cidrs.type GAUGE banned_cidrs.min 0 decisions.label Active Decisions decisions.type GAUGE decisions.min 0 multigraph scenarios graph_title CrowdSec Scenarios graph_args --base 1000 --lower-limit 0 graph_vlabel Decisions by Scenario graph_category security """)) # current active for scenario in self.decisions_by_scenario.keys(): hash = hashlib.sha1(scenario.encode()).hexdigest() if scenario not in self.state: self.state[scenario] = hash print(dedent(f""" multigraph scenarios {hash}.label {scenario} {hash}.type GAUGE {hash}.min 0 """)) # known via state for scenario, hash in self.state.items(): if scenario not in self.decisions_by_scenario.keys(): print(dedent(f""" multigraph scenarios {hash}.label {scenario} {hash}.type GAUGE {hash}.min 0 """)) state.write_section('scenarios', self.state) def current(self): print(dedent(f""" multigraph decisions banned_ips.value {self.banned_ips} banned_cidrs.value {self.banned_cidrs} decisions.value {self.decisions} """)) if len(self.decisions_by_scenario) > 0: print('multigraph scenarios') for k, v in self.decisions_by_scenario.items(): name = hashlib.sha1(k.encode()).hexdigest() print(f'{name}.value {v}') class Acquisitions(): def __init__(self): # requires crowdsec >= 1.5.3 try: self.data = call_cli('metrics') except json.JSONDecodeError: self.data = {} def config(self): if len(self.data) > 0: print(dedent(""" multigraph acquisitions graph_title CrowdSec Acquisitions graph_args --base 1000 --lower-limit 0 graph_vlabel Lines per ${graph_period} graph_category security parsed.label Parsed parsed.type DERIVE parsed.min 0 reads.label Read reads.type DERIVE reads.min 0 unparsed.label Unparsed unparsed.type DERIVE unparsed.min 0 """)) def current(self): if len(self.data) > 0: r = {'reads': 0, 'parsed': 0, 'unparsed': 0} for source in self.data['acquisition']: for type in ['reads', 'parsed', 'unparsed']: if type in self.data['acquisition'][source]: r[type] += self.data['acquisition'][source][type] print(dedent(f""" multigraph acquisitions reads.value {r['reads']} unparsed.value {r['unparsed']} parsed.value {r['parsed']} """)) if __name__ == "__main__": if len(sys.argv) == 2 and sys.argv[1] == 'autoconf': autoconf() elif len(sys.argv) == 2 and sys.argv[1] == 'config': state = State() Decisions().config() Acquisitions().config() else: state = State() Decisions().current() Acquisitions().current()