Source code for piqueserver.bansubscribe

# Copyright (c) Mathias Kaerlev 2011-2012.

# This file is part of pyspades.

# pyspades is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# pyspades is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# GNU General Public License for more details.

# You should have received a copy of the GNU General Public License
# along with pyspades.  If not, see <>.

import asyncio
import json
from itertools import chain
from typing import List

import aiohttp
from twisted.logger import Logger

from piqueserver.config import cast_duration, config
from piqueserver.networkdict import NetworkDict

log = Logger()

# format is [{"ip" : "", "reason : "blah"}, ...]

[docs]def validate_bansub_config(c): if not isinstance(c, list): return False for item in c: if not item.get('url') or not isinstance(item.get('whitelist'), list): return False return True
bans_config = config.section('bans') bans_config_urls = bans_config.option('bansubscribe', default=[], validate=validate_bansub_config) bans_config_interval = bans_config.option('bansubscribe_interval', default="5min", cast=cast_duration)
[docs]class BanManager: bans = None def __init__(self, protocol): self.protocol = protocol self.urls = [(entry.get('url'), entry.get('whitelist')) for entry in bans_config_urls.get()]
[docs] async def start(self): while True: await self.update_bans() await asyncio.sleep(bans_config_interval.get())
[docs] async def fetch_filtered_bans(self, url: str, whitelist: List[str]): try: async with aiohttp.ClientSession() as session: async with session.get(url) as resp: # doesn't set json content type ¯\_(ツ)_/¯ banslist = json.loads(await resp.text()) return [ban for ban in banslist if ban.get('name', None) not in whitelist] except Exception as e: log.error("Failed to fetch bans from {url}: {err}", url=url, err=e) return []
[docs] async def update_bans(self): coros = [] for url, whitelist in self.urls: coros.append(self.fetch_filtered_bans(url, whitelist))"fetching bans from bansubscribe urls") banlists = await asyncio.gather(*coros) bans = list(chain(*banlists)) new_bans = NetworkDict() for ban in bans: new_bans[ban['ip']] = ban['reason'] self.bans = new_bans"successfully updated bans from bansubscribe urls")
[docs] def get_ban(self, ip): if self.bans is None: return None try: return self.bans[ip] except KeyError: return None