Files
Chess_Probabilistic_Program…/lichess_bot/lib/model.py
2024-01-25 23:11:25 +01:00

285 lines
12 KiB
Python

"""Store information about a challenge, game or player in a class."""
import math
from urllib.parse import urljoin
import logging
import datetime
from enum import Enum
from lib.timer import Timer, msec, seconds, sec_str, to_msec, to_seconds, years
from lib.config import Configuration
from typing import Any
from collections import defaultdict
logger = logging.getLogger(__name__)
class Challenge:
"""Store information about a challenge."""
def __init__(self, challenge_info: dict[str, Any], user_profile: dict[str, Any]) -> None:
""":param user_profile: Information about our bot."""
self.id = challenge_info["id"]
self.rated = challenge_info["rated"]
self.variant = challenge_info["variant"]["key"]
self.perf_name = challenge_info["perf"]["name"]
self.speed = challenge_info["speed"]
self.increment: int = challenge_info.get("timeControl", {}).get("increment")
self.base: int = challenge_info.get("timeControl", {}).get("limit")
self.days: int = challenge_info.get("timeControl", {}).get("daysPerTurn")
self.challenger = Player(challenge_info.get("challenger") or {})
self.opponent = Player(challenge_info.get("destUser") or {})
self.from_self = self.challenger.name == user_profile["username"]
def is_supported_variant(self, challenge_cfg: Configuration) -> bool:
"""Check whether the variant is supported."""
return self.variant in challenge_cfg.variants
def is_supported_time_control(self, challenge_cfg: Configuration) -> bool:
"""Check whether the time control is supported."""
speeds = challenge_cfg.time_controls
increment_max: int = challenge_cfg.max_increment
increment_min: int = challenge_cfg.min_increment
base_max: int = challenge_cfg.max_base
base_min: int = challenge_cfg.min_base
days_max: int = challenge_cfg.max_days
days_min: int = challenge_cfg.min_days
if self.speed not in speeds:
return False
require_non_zero_increment = (self.challenger.is_bot
and self.speed == "bullet"
and challenge_cfg.bullet_requires_increment)
increment_min = max(increment_min, 1 if require_non_zero_increment else 0)
if self.base is not None and self.increment is not None:
# Normal clock game
return (increment_min <= self.increment <= increment_max
and base_min <= self.base <= base_max)
elif self.days is not None:
# Correspondence game
return days_min <= self.days <= days_max
else:
# Unlimited game
return days_max == math.inf
def is_supported_mode(self, challenge_cfg: Configuration) -> bool:
"""Check whether the mode is supported."""
return ("rated" if self.rated else "casual") in challenge_cfg.modes
def is_supported_recent(self, config: Configuration, recent_bot_challenges: defaultdict[str, list[Timer]]) -> bool:
"""Check whether we have played a lot of games with this opponent recently. Only used when the oppoennt is a BOT."""
# Filter out old challenges
recent_bot_challenges[self.challenger.name] = [timer for timer
in recent_bot_challenges[self.challenger.name]
if not timer.is_expired()]
max_recent_challenges = config.max_recent_bot_challenges
return (not self.challenger.is_bot
or max_recent_challenges is None
or len(recent_bot_challenges[self.challenger.name]) < max_recent_challenges)
def decline_due_to(self, requirement_met: bool, decline_reason: str) -> str:
"""
Get the reason lichess_bot declined an incoming challenge.
:param requirement_met: Whether a requirement is met.
:param decline_reason: The reason we declined the challenge if the requirement wasn't met.
:return: `decline_reason` if `requirement_met` is false else returns an empty string.
"""
return "" if requirement_met else decline_reason
def is_supported(self, config: Configuration,
recent_bot_challenges: defaultdict[str, list[Timer]]) -> tuple[bool, str]:
"""Whether the challenge is supported."""
try:
if self.from_self:
return True, ""
allowed_opponents: list[str] = list(filter(None, config.allow_list)) or [self.challenger.name]
decline_reason = (self.decline_due_to(config.accept_bot or not self.challenger.is_bot, "noBot")
or self.decline_due_to(not config.only_bot or self.challenger.is_bot, "onlyBot")
or self.decline_due_to(self.is_supported_time_control(config), "timeControl")
or self.decline_due_to(self.is_supported_variant(config), "variant")
or self.decline_due_to(self.is_supported_mode(config), "casual" if self.rated else "rated")
or self.decline_due_to(self.challenger.name not in config.block_list, "generic")
or self.decline_due_to(self.challenger.name in allowed_opponents, "generic")
or self.decline_due_to(self.is_supported_recent(config, recent_bot_challenges), "later"))
return not decline_reason, decline_reason
except Exception:
logger.exception(f"Error while checking challenge {self.id}:")
return False, "generic"
def score(self) -> int:
"""Give a rating estimate to the opponent."""
rated_bonus = 200 if self.rated else 0
challenger_master_title = self.challenger.title if not self.challenger.is_bot else None
titled_bonus = 200 if challenger_master_title else 0
challenger_rating_int = self.challenger.rating or 0
return challenger_rating_int + rated_bonus + titled_bonus
def mode(self) -> str:
"""Get the mode of the challenge (rated or casual)."""
return "rated" if self.rated else "casual"
def __str__(self) -> str:
"""Get a string representation of `Challenge`."""
return f"{self.perf_name} {self.mode()} challenge from {self.challenger} ({self.id})"
def __repr__(self) -> str:
"""Get a string representation of `Challenge`."""
return self.__str__()
class Termination(str, Enum):
"""The possible game terminations."""
MATE = "mate"
TIMEOUT = "outoftime"
RESIGN = "resign"
ABORT = "aborted"
DRAW = "draw"
class Game:
"""Store information about a game."""
def __init__(self, game_info: dict[str, Any], username: str, base_url: str, abort_time: datetime.timedelta) -> None:
""":param abort_time: How long to wait before aborting the game."""
self.username = username
self.id: str = game_info["id"]
self.speed = game_info.get("speed")
clock = game_info.get("clock") or {}
ten_years_in_ms = to_msec(years(10))
self.clock_initial = msec(clock.get("initial", ten_years_in_ms))
self.clock_increment = msec(clock.get("increment", 0))
self.perf_name = (game_info.get("perf") or {}).get("name", "{perf?}")
self.variant_name = game_info["variant"]["name"]
self.mode = "rated" if game_info.get("rated") else "casual"
self.white = Player(game_info["white"])
self.black = Player(game_info["black"])
self.initial_fen = game_info.get("initialFen")
self.state: dict[str, Any] = game_info["state"]
self.is_white = (self.white.name or "").lower() == username.lower()
self.my_color = "white" if self.is_white else "black"
self.opponent_color = "black" if self.is_white else "white"
self.me = self.white if self.is_white else self.black
self.opponent = self.black if self.is_white else self.white
self.base_url = base_url
self.game_start = datetime.datetime.fromtimestamp(to_seconds(msec(game_info["createdAt"])),
tz=datetime.timezone.utc)
self.abort_time = Timer(abort_time)
self.terminate_time = Timer(self.clock_initial + self.clock_increment + abort_time + seconds(60))
self.disconnect_time = Timer(seconds(0))
def url(self) -> str:
"""Get the url of the game."""
return f"{self.short_url()}/{self.my_color}"
def short_url(self) -> str:
"""Get the short url of the game."""
return urljoin(self.base_url, self.id)
def pgn_event(self) -> str:
"""Get the event to write in the PGN file."""
if self.variant_name in ["Standard", "From Position"]:
return f"{self.mode.title()} {self.perf_name.title()} game"
else:
return f"{self.mode.title()} {self.variant_name} game"
def time_control(self) -> str:
"""Get the time control of the game."""
return f"{sec_str(self.clock_initial)}+{sec_str(self.clock_increment)}"
def is_abortable(self) -> bool:
"""Whether the game can be aborted."""
# Moves are separated by spaces. A game is abortable when less
# than two moves (one from each player) have been played.
return " " not in self.state["moves"]
def ping(self, abort_in: datetime.timedelta, terminate_in: datetime.timedelta, disconnect_in: datetime.timedelta) -> None:
"""
Tell the bot when to abort, terminate, and disconnect from a game.
:param abort_in: How many seconds to wait before aborting.
:param terminate_in: How many seconds to wait before terminating.
:param disconnect_in: How many seconds to wait before disconnecting.
"""
if self.is_abortable():
self.abort_time = Timer(abort_in)
self.terminate_time = Timer(terminate_in)
self.disconnect_time = Timer(disconnect_in)
def should_abort_now(self) -> bool:
"""Whether we should abort the game."""
return self.is_abortable() and self.abort_time.is_expired()
def should_terminate_now(self) -> bool:
"""Whether we should terminate the game."""
return self.terminate_time.is_expired()
def should_disconnect_now(self) -> bool:
"""Whether we should disconnect form the game."""
return self.disconnect_time.is_expired()
def my_remaining_time(self) -> datetime.timedelta:
"""How many seconds we have left."""
wtime = msec(self.state["wtime"])
btime = msec(self.state["btime"])
return wtime if self.is_white else btime
def result(self) -> str:
"""Get the result of the game."""
class GameEnding(str, Enum):
WHITE_WINS = "1-0"
BLACK_WINS = "0-1"
DRAW = "1/2-1/2"
INCOMPLETE = "*"
winner = self.state.get("winner")
termination = self.state.get("status")
if winner == "white":
result = GameEnding.WHITE_WINS
elif winner == "black":
result = GameEnding.BLACK_WINS
elif termination in [Termination.DRAW, Termination.TIMEOUT]:
result = GameEnding.DRAW
else:
result = GameEnding.INCOMPLETE
return result.value
def __str__(self) -> str:
"""Get a string representation of `Game`."""
return f"{self.url()} {self.perf_name} vs {self.opponent} ({self.id})"
def __repr__(self) -> str:
"""Get a string representation of `Game`."""
return self.__str__()
class Player:
"""Store information about a player."""
def __init__(self, player_info: dict[str, Any]) -> None:
""":param player_info: Contains information about a player."""
self.title = player_info.get("title")
self.rating = player_info.get("rating")
self.provisional = player_info.get("provisional")
self.aiLevel = player_info.get("aiLevel")
self.is_bot = self.title == "BOT" or self.aiLevel is not None
self.name: str = f"AI level {self.aiLevel}" if self.aiLevel else player_info.get("name", "")
def __str__(self) -> str:
"""Get a string representation of `Player`."""
if self.aiLevel:
return self.name
else:
rating = f'{self.rating}{"?" if self.provisional else ""}'
return f'{self.title or ""} {self.name} ({rating})'.strip()
def __repr__(self) -> str:
"""Get a string representation of `Player`."""
return self.__str__()