import re
from typing import List, Tuple, Optional, Union
from twisted.internet import reactor
from twisted.logger import Logger
from piqueserver import commands
from piqueserver.release import format_release
from pyspades.constants import (ERROR_BANNED, DESTROY_BLOCK, SPADE_DESTROY,
GRENADE_DESTROY, ERROR_KICKED)
from pyspades.server import ServerConnection
from pyspades.common import escape_control_codes, prettify_timespan
from pyspades.types import AttributeSet, RateLimiter
# TODO: move these where they belong
from pyspades.team import Team
from pyspades.world import Grenade
CHAT_WINDOW_SIZE = 5
CHAT_PER_SECOND = 0.5
HookValue = Optional[bool]
log = Logger()
[docs]class FeatureConnection(ServerConnection):
def __init__(self, *args, **kwargs):
self.printable_name = None
self.admin = False
self.last_switch = None
self.mute = False
self.deaf = False
self.login_retries = None
self.god = False
self.god_build = False
self.fly = False
self.invisible = False
self.building = True
self.killing = True
self.streak = 0
self.best_streak = 0
self.chat_limiter = RateLimiter(
CHAT_WINDOW_SIZE, CHAT_WINDOW_SIZE / CHAT_PER_SECOND)
self.user_types = None
self.rights = None
self.can_complete_line_build = True
self.current_send_lines_types = []
super().__init__(*args, **kwargs)
[docs] def on_connect(self) -> None:
protocol = self.protocol
client_ip = self.address[0]
if client_ip in self.protocol.bans:
name, reason, timestamp = self.protocol.bans[client_ip]
if timestamp is not None and reactor.seconds() >= timestamp:
protocol.remove_ban(client_ip)
protocol.save_bans()
else:
log.info('banned user {} ({}) attempted to join'
.format(name, client_ip))
self.disconnect(ERROR_BANNED)
return
manager = self.protocol.ban_manager
if manager is not None:
reason = manager.get_ban(client_ip)
if reason is not None:
log.info(('federated banned user (%s) attempted to join, '
'banned for %r') % (client_ip, reason))
self.disconnect(ERROR_BANNED)
return
ServerConnection.on_connect(self)
[docs] def on_join(self) -> None:
if self.protocol.motd is not None:
self.send_lines(self.protocol.motd, 'motd')
[docs] def on_login(self, name: str) -> None:
self.printable_name = escape_control_codes(name)
if len(self.printable_name) > 15:
self.kick(silent=True)
log.info('{name} (IP {ip}, ID {pid}) entered the game!',
name=self.printable_name,
ip=self.address[0], pid=self.player_id)
self.protocol.irc_say('* %s (IP %s, ID %s) entered the game!' %
(self.name, self.address[0], self.player_id))
if self.user_types is None:
self.user_types = AttributeSet()
self.rights = AttributeSet()
if self.protocol.everyone_is_admin:
self.on_user_login('admin', False)
[docs] def get_spawn_location(self) -> Tuple[int, int, int]:
get_location = self.protocol.map_info.get_spawn_location
if get_location is not None:
result = get_location(self)
if result is not None:
return result
return ServerConnection.get_spawn_location(self)
[docs] def on_disconnect(self) -> None:
if self.name is not None:
log.info('{name} disconnected!', name=self.printable_name)
self.protocol.irc_say('* %s (IP %s) disconnected' %
(self.name, self.address[0]))
self.protocol.player_memory.append((self.name, self.address[0]))
else:
log.info('{ip} disconnected', ip=self.address[0])
ServerConnection.on_disconnect(self)
[docs] def on_command(self, command: str, parameters: List[str]) -> None:
result = commands.handle_command(self, command, parameters)
if result:
for i in reversed(result.split("\n")):
self.send_chat(i)
def _can_build(self) -> bool:
if not self.building:
return False
if not self.god and not self.protocol.building:
return False
return True
[docs] def on_block_build_attempt(self, x: int, y: int, z: int) -> bool:
return self._can_build()
[docs] def on_line_build_attempt(self, points) -> bool:
return self._can_build()
[docs] def on_line_build(self, points) -> None:
if self.god:
self.refill()
if self.god_build:
if self.protocol.god_blocks is None:
self.protocol.god_blocks = set()
self.protocol.god_blocks.update(points)
elif self.protocol.user_blocks is not None:
self.protocol.user_blocks.update(points)
[docs] def on_block_build(self, x: int, y: int, z: int) -> None:
if self.god:
self.refill()
if self.god_build:
if self.protocol.god_blocks is None:
self.protocol.god_blocks = set()
self.protocol.god_blocks.add((x, y, z))
elif self.protocol.user_blocks is not None:
self.protocol.user_blocks.add((x, y, z))
[docs] def on_block_destroy(self, x: int, y: int, z: int, mode: int) -> bool:
map_on_block_destroy = self.protocol.map_info.on_block_destroy
if map_on_block_destroy is not None:
result = map_on_block_destroy(self, x, y, z, mode)
if not result:
return result
if not self.building:
return False
if not self.god:
if not self.protocol.building:
return False
is_indestructable = self.protocol.is_indestructable
if mode == DESTROY_BLOCK:
if is_indestructable(x, y, z):
return False
elif mode == SPADE_DESTROY:
if (is_indestructable(x, y, z) or
is_indestructable(x, y, z + 1) or
is_indestructable(x, y, z - 1)):
return False
elif mode == GRENADE_DESTROY:
for nade_x in range(x - 1, x + 2):
for nade_y in range(y - 1, y + 2):
for nade_z in range(z - 1, z + 2):
if is_indestructable(nade_x, nade_y, nade_z):
return False
[docs] def on_block_removed(self, x: int, y: int, z: int) -> None:
if self.protocol.user_blocks is not None:
self.protocol.user_blocks.discard((x, y, z))
if self.protocol.god_blocks is not None:
self.protocol.god_blocks.discard((x, y, z))
[docs] def on_hit(self, hit_amount: float, player: 'FeatureConnection',
_type: int, grenade: Grenade) -> HookValue:
if not self.protocol.killing:
self.send_chat(
"You can't kill anyone right now! Damage is turned OFF")
return False
if not self.killing:
self.send_chat("%s. You can't kill anyone." % player.name)
return False
elif player.god:
if not player.invisible:
self.send_chat("You can't hurt %s! That player is in "
"*god mode*" % player.name)
return False
if self.god:
self.protocol.broadcast_chat(
'%s, killing in god mode is forbidden!' % self.name, irc=True)
self.protocol.broadcast_chat(
'%s returned to being a mere human.' % self.name, irc=True)
self.god = False
self.god_build = False
[docs] def on_kill(self, killer: Optional['FeatureConnection'], _type: int,
grenade: None) -> None:
self.streak = 0
if killer is None or self.team is killer.team:
return
if not grenade or grenade.name == 'grenade':
# doesn't give streak kills on airstrikes (or other types of
# explosions)
killer.streak += 1
killer.best_streak = max(killer.streak, killer.best_streak)
killer.team.kills += 1
[docs] def on_reset(self) -> None:
self.streak = 0
self.best_streak = 0
[docs] def on_animation_update(self, jump: bool, crouch: bool, sneak: bool,
sprint: bool) -> Tuple[bool, bool, bool, bool]:
if self.fly and crouch and self.world_object.velocity.z != 0.0:
jump = True
return jump, crouch, sneak, sprint
[docs] def on_fall(self, damage: int) -> HookValue:
if self.god:
return False
if not self.protocol.fall_damage:
return False
[docs] def on_grenade(self, time_left: float) -> None:
if self.god:
self.refill()
[docs] def on_team_join(self, team: Team) -> HookValue:
if self.team is not None:
if self.protocol.teamswitch_interval:
teamswitch_interval = self.protocol.teamswitch_interval
teamswitch_allowed = self.protocol.teamswitch_allowed
if not teamswitch_allowed:
self.send_chat('Switching teams is not allowed')
return False
if (self.last_switch is not None and
reactor.seconds() - self.last_switch < teamswitch_interval):
self.send_chat(
'You must wait before switching teams again')
return False
if team.locked:
self.send_chat('Team is locked')
if not team.spectator and not team.other.locked:
return team.other
return False
balanced_teams = self.protocol.balanced_teams
if balanced_teams and not team.spectator:
other_team = team.other
if other_team.count() < team.count() + 1 - balanced_teams:
if other_team.locked:
return False
self.send_chat('Team is full, moved to %s' % other_team.name)
return other_team
self.last_switch = reactor.seconds()
[docs] def on_chat(self, value: str, global_message: bool) -> Union[str, bool]:
"""
notifies when the server receives a chat message
return False to block sending the message
"""
message = '<{}> {}'.format(self.name, value)
if self.mute:
message = '(MUTED) {}'.format(message)
self.send_chat('(Chat not sent - you are muted)')
return False
if global_message:
if self.protocol.global_chat:
# forward message to IRC
self.protocol.irc_say(message)
else:
self.send_chat('(Chat not sent - global chat disabled)')
return False
# antispam:
current_time = reactor.seconds()
self.chat_limiter.record_event(current_time)
if self.chat_limiter.above_limit():
self.mute = True
self.protocol.broadcast_chat(
'%s has been muted for excessive spam' % (
self.name),
irc=True)
log.info("<{name}> {message}", name=escape_control_codes(
self.name), message=escape_control_codes(value))
return value
[docs] def kick(self, reason=None, silent=False):
if not silent:
if reason is not None:
message = '{} was kicked: {}'.format(self.name, reason)
else:
message = '%s was kicked' % self.name
self.protocol.broadcast_chat(message, irc=True)
log.info(message)
# FIXME: Client should handle disconnect events the same way in both
# main and initial loading network loops
self.disconnect(ERROR_KICKED)
[docs] def ban(self, reason=None, duration=None):
reason = ': ' + reason if reason is not None else ''
duration = duration or None
if duration is None:
message = '{} permabanned{}'.format(self.name, reason)
else:
message = '{} banned for {}{}'.format(self.name,
prettify_timespan(duration), reason)
if self.protocol.on_ban_attempt(self, reason, duration):
self.protocol.broadcast_chat(message, irc=True)
self.protocol.on_ban(self, reason, duration)
if self.address[0] == "127.0.0.1":
self.protocol.broadcast_chat("Ban ignored: localhost")
else:
self.protocol.add_ban(self.address[0], reason, duration,
self.name)
[docs] def send_lines(self, lines: List[str], key: str = 'unknown') -> None:
"""
Send a list of lines to the player.
'key' is a unique identifier for the lines being sent - for example,
a message saying '3 medkits are ready!' could use the key 'medkits.ready'.
The key is used to avoid sending two messages of the same variety at once,
to protect the server against a vulnerability which exploits this function.
The key should always be specified when calling this function. The default
value of 'unknown' exists simply for backwards compatibility.
"""
# Detect if the send_lines key is already being sent to the player.
# If the caller of this function forgot to specify a key (thus,
# 'unknown' is used as per the default), we'll skip this detection.
if key != 'unknown':
if key in self.current_send_lines_types:
log.info(
"Skipped sending lines to '{}': already being sent key "
"'{}'".format(self.printable_name, key))
return
self.current_send_lines_types.append(key)
current_time = 0
for line in lines:
reactor.callLater(current_time, self.send_chat, line)
current_time += 2
reactor.callLater(current_time, self._completed_send_lines, key)
def _completed_send_lines(self, type: str) -> None:
self.current_send_lines_types.remove(type)
[docs] def on_hack_attempt(self, reason):
log.warn('Hack attempt detected from {}: {}'
.format(self.printable_name, reason))
self.kick(reason)
[docs] def on_user_login(self, user_type, verbose=True):
log.info("'{username}' logged in as {user_type}", username=self.name,
user_type=user_type)
if user_type == 'admin':
self.admin = True
self.speedhack_detect = False
# notify of new release to admin on /login
new_release = self.protocol.new_release
if user_type == 'admin' and new_release:
self.send_chat("!" * 30)
self.send_chat(format_release(new_release))
self.send_chat("!" * 30)
self.user_types.add(user_type)
rights = set(commands.get_rights(user_type))
self.rights.update(rights)
if verbose:
message = ' logged in as %s' % (user_type)
self.send_chat('You' + message)
self.protocol.irc_say("* " + self.name + message)
[docs] def timed_out(self):
if self.name is not None:
log.info('%s timed out' % self.printable_name)
ServerConnection.timed_out(self)