# Copyright (c) Mathias Kaerlev 2011-2012.
# This file is part of pyspades.
# pyspades is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# pyspades is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with pyspades. If not, see <http://www.gnu.org/licenses/>.
import json
from twisted.internet.task import LoopingCall
from twisted.internet.defer import DeferredList
from twisted.web.client import getPage
from twisted.logger import Logger
from piqueserver.networkdict import NetworkDict
from piqueserver.config import config, cast_duration
log = Logger()
# format is [{"ip" : "1.1.1.1", "reason : "blah"}, ...]
[docs]def validate_bansub_config(c):
if not isinstance(c, list):
return False
for item in c:
if not item.get('url') or not isinstance(item.get('whitelist'), list):
return False
return True
bans_config = config.section('bans')
bans_config_urls = bans_config.option('bansubscribe', default=[], validate=validate_bansub_config)
bans_config_interval = bans_config.option('bansubscribe_interval', default="5min",
cast=cast_duration)
[docs]class BanManager:
bans = None
new_bans = None
def __init__(self, protocol):
self.protocol = protocol
self.urls = [(entry.get('url'), entry.get('whitelist')) for entry in
bans_config_urls.get()]
self.loop = LoopingCall(self.update_bans)
self.loop.start(bans_config_interval.get(), now=True)
[docs] def update_bans(self):
self.new_bans = NetworkDict()
defers = []
for url, url_filter in self.urls:
defers.append(getPage(url.encode('utf8')).addCallback(self.got_bans,
url_filter))
DeferredList(defers).addCallback(self.bans_finished)
[docs] def got_bans(self, data, name_filter):
bans = json.loads(data)
for entry in bans:
name = entry.get('name', None)
if name is not None and name in name_filter:
continue
self.new_bans[str(entry['ip'])] = str(entry['reason'])
[docs] def bans_finished(self, _result):
self.bans = self.new_bans
self.new_bans = None
log.info("successfully updated bans from bansubscribe urls")
[docs] def get_ban(self, ip):
if self.bans is None:
return None
try:
return self.bans[ip]
except KeyError:
return None