1338 lines
60 KiB
Python
1338 lines
60 KiB
Python
"""Provides communication with the engine."""
|
|
from __future__ import annotations
|
|
import os
|
|
import chess.engine
|
|
import chess.polyglot
|
|
import chess.syzygy
|
|
import chess.gaviota
|
|
import chess
|
|
import subprocess
|
|
import logging
|
|
import datetime
|
|
import time
|
|
import random
|
|
import math
|
|
from collections import Counter
|
|
from collections.abc import Generator, Callable
|
|
from contextlib import contextmanager
|
|
from lib import config, model, lichess
|
|
from lib.config import Configuration
|
|
from lib.timer import Timer, msec, seconds, msec_str, sec_str, to_seconds
|
|
from typing import Any, Optional, Union, Literal
|
|
OPTIONS_TYPE = dict[str, Any]
|
|
MOVE_INFO_TYPE = dict[str, Any]
|
|
COMMANDS_TYPE = list[str]
|
|
LICHESS_EGTB_MOVE = dict[str, Any]
|
|
CHESSDB_EGTB_MOVE = dict[str, Any]
|
|
MOVE = Union[chess.engine.PlayResult, list[chess.Move]]
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
out_of_online_opening_book_moves: Counter[str] = Counter()
|
|
|
|
|
|
@contextmanager
|
|
def create_engine(engine_config: config.Configuration) -> Generator[EngineWrapper, None, None]:
|
|
"""
|
|
Create the engine.
|
|
|
|
Use in a with-block to automatically close the engine when exiting the game.
|
|
|
|
:param engine_config: The options for the engine.
|
|
:return: An engine. Either UCI, XBoard, or Homemade.
|
|
"""
|
|
cfg = engine_config.engine
|
|
engine_path = os.path.abspath(os.path.join(cfg.dir, cfg.name))
|
|
engine_type = cfg.protocol
|
|
commands = [engine_path]
|
|
if cfg.engine_options:
|
|
for k, v in cfg.engine_options.items():
|
|
commands.append(f"--{k}={v}" if v is not None else f"--{k}")
|
|
|
|
stderr = None if cfg.silence_stderr else subprocess.DEVNULL
|
|
|
|
Engine: Union[type[UCIEngine], type[XBoardEngine], type[MinimalEngine]]
|
|
if engine_type == "xboard":
|
|
Engine = XBoardEngine
|
|
elif engine_type == "uci":
|
|
Engine = UCIEngine
|
|
elif engine_type == "homemade":
|
|
Engine = getHomemadeEngine(cfg.name)
|
|
else:
|
|
raise ValueError(
|
|
f" Invalid engine type: {engine_type}. Expected xboard, uci, or homemade.")
|
|
options = remove_managed_options(cfg.lookup(f"{engine_type}_options") or config.Configuration({}))
|
|
logger.debug(f"Starting engine: {commands}")
|
|
engine = Engine(commands, options, stderr, cfg.draw_or_resign, cwd=cfg.working_dir)
|
|
try:
|
|
yield engine
|
|
finally:
|
|
engine.ping()
|
|
engine.quit()
|
|
|
|
|
|
def remove_managed_options(config: config.Configuration) -> OPTIONS_TYPE:
|
|
"""Remove the options managed by python-chess."""
|
|
def is_managed(key: str) -> bool:
|
|
return chess.engine.Option(key, "", None, None, None, None).is_managed()
|
|
|
|
return {name: value for (name, value) in config.items() if not is_managed(name)}
|
|
|
|
|
|
PONDERPV_CHARACTERS = 6 # The length of ", Pv: ".
|
|
|
|
|
|
class EngineWrapper:
|
|
"""A wrapper used by all engines (UCI, XBoard, Homemade)."""
|
|
|
|
def __init__(self, options: OPTIONS_TYPE, draw_or_resign: config.Configuration) -> None:
|
|
"""
|
|
Initialize the values of the wrapper used by all engines (UCI, XBoard, Homemade).
|
|
|
|
:param options: The options to send to the engine.
|
|
:param draw_or_resign: Options on whether the bot should resign or offer draws.
|
|
"""
|
|
self.engine: Union[chess.engine.SimpleEngine, FillerEngine]
|
|
self.scores: list[chess.engine.PovScore] = []
|
|
self.draw_or_resign = draw_or_resign
|
|
self.go_commands = config.Configuration(options.pop("go_commands", {}) or {})
|
|
self.move_commentary: list[MOVE_INFO_TYPE] = []
|
|
self.comment_start_index = -1
|
|
|
|
def play_move(self,
|
|
board: chess.Board,
|
|
game: model.Game,
|
|
li: lichess.Lichess,
|
|
setup_timer: Timer,
|
|
move_overhead: datetime.timedelta,
|
|
can_ponder: bool,
|
|
is_correspondence: bool,
|
|
correspondence_move_time: datetime.timedelta,
|
|
engine_cfg: config.Configuration,
|
|
min_time: datetime.timedelta) -> None:
|
|
"""
|
|
Play a move.
|
|
|
|
:param board: The current position.
|
|
:param game: The game that the bot is playing.
|
|
:param li: Provides communication with lichess.org.
|
|
:param start_time: The time that the bot received the move.
|
|
:param move_overhead: The time it takes to communicate between the engine and lichess.org.
|
|
:param can_ponder: Whether the engine is allowed to ponder.
|
|
:param is_correspondence: Whether this is a correspondence or unlimited game.
|
|
:param correspondence_move_time: The time the engine will think if `is_correspondence` is true.
|
|
:param engine_cfg: Options for external moves (e.g. from an opening book), and for engine resignation and draw offers.
|
|
:param min_time: Minimum time to spend, in seconds.
|
|
:return: The move to play.
|
|
"""
|
|
polyglot_cfg = engine_cfg.polyglot
|
|
online_moves_cfg = engine_cfg.online_moves
|
|
draw_or_resign_cfg = engine_cfg.draw_or_resign
|
|
lichess_bot_tbs = engine_cfg.lichess_bot_tbs
|
|
|
|
best_move: MOVE
|
|
best_move = get_book_move(board, game, polyglot_cfg)
|
|
|
|
if best_move.move is None:
|
|
best_move = get_egtb_move(board,
|
|
game,
|
|
lichess_bot_tbs,
|
|
draw_or_resign_cfg)
|
|
|
|
if not isinstance(best_move, list) and best_move.move is None:
|
|
best_move = get_online_move(li,
|
|
board,
|
|
game,
|
|
online_moves_cfg,
|
|
draw_or_resign_cfg)
|
|
|
|
if isinstance(best_move, list) or best_move.move is None:
|
|
draw_offered = check_for_draw_offer(game)
|
|
|
|
time_limit, can_ponder = move_time(board, game, can_ponder,
|
|
setup_timer, move_overhead,
|
|
is_correspondence, correspondence_move_time)
|
|
|
|
try:
|
|
best_move = self.search(board, time_limit, can_ponder, draw_offered, best_move)
|
|
except chess.engine.EngineError as error:
|
|
BadMove = (chess.IllegalMoveError, chess.InvalidMoveError)
|
|
if any(isinstance(e, BadMove) for e in error.args):
|
|
logger.error("Ending game due to bot attempting an illegal move.")
|
|
game_ender = li.abort if game.is_abortable() else li.resign
|
|
game_ender(game.id)
|
|
raise
|
|
|
|
# Heed min_time
|
|
elapsed = setup_timer.time_since_reset()
|
|
if elapsed < min_time:
|
|
time.sleep(to_seconds(min_time - elapsed))
|
|
|
|
self.add_comment(best_move, board)
|
|
self.print_stats()
|
|
if best_move.resigned and len(board.move_stack) >= 2:
|
|
li.resign(game.id)
|
|
else:
|
|
li.make_move(game.id, best_move)
|
|
|
|
def add_go_commands(self, time_limit: chess.engine.Limit) -> chess.engine.Limit:
|
|
"""Add extra commands to send to the engine. For example, to search for 1000 nodes or up to depth 10."""
|
|
movetime_cfg = self.go_commands.movetime
|
|
if movetime_cfg is not None:
|
|
movetime = msec(movetime_cfg)
|
|
if time_limit.time is None or seconds(time_limit.time) > movetime:
|
|
time_limit.time = to_seconds(movetime)
|
|
time_limit.depth = self.go_commands.depth
|
|
time_limit.nodes = self.go_commands.nodes
|
|
return time_limit
|
|
|
|
def offer_draw_or_resign(self, result: chess.engine.PlayResult, board: chess.Board) -> chess.engine.PlayResult:
|
|
"""Offer draw or resign depending on the score of the engine."""
|
|
def actual(score: chess.engine.PovScore) -> int:
|
|
return score.relative.score(mate_score=40000)
|
|
|
|
can_offer_draw = self.draw_or_resign.offer_draw_enabled
|
|
draw_offer_moves = self.draw_or_resign.offer_draw_moves
|
|
draw_score_range: int = self.draw_or_resign.offer_draw_score
|
|
draw_max_piece_count = self.draw_or_resign.offer_draw_pieces
|
|
pieces_on_board = chess.popcount(board.occupied)
|
|
enough_pieces_captured = pieces_on_board <= draw_max_piece_count
|
|
if can_offer_draw and len(self.scores) >= draw_offer_moves and enough_pieces_captured:
|
|
scores = self.scores[-draw_offer_moves:]
|
|
|
|
def score_near_draw(score: chess.engine.PovScore) -> bool:
|
|
return abs(actual(score)) <= draw_score_range
|
|
if len(scores) == len(list(filter(score_near_draw, scores))):
|
|
result.draw_offered = True
|
|
|
|
resign_enabled = self.draw_or_resign.resign_enabled
|
|
min_moves_for_resign = self.draw_or_resign.resign_moves
|
|
resign_score: int = self.draw_or_resign.resign_score
|
|
if resign_enabled and len(self.scores) >= min_moves_for_resign:
|
|
scores = self.scores[-min_moves_for_resign:]
|
|
|
|
def score_near_loss(score: chess.engine.PovScore) -> bool:
|
|
return actual(score) <= resign_score
|
|
if len(scores) == len(list(filter(score_near_loss, scores))):
|
|
result.resigned = True
|
|
return result
|
|
|
|
def search(self, board: chess.Board, time_limit: chess.engine.Limit, ponder: bool, draw_offered: bool,
|
|
root_moves: MOVE) -> chess.engine.PlayResult:
|
|
"""
|
|
Tell the engine to search.
|
|
|
|
:param board: The current position.
|
|
:param time_limit: Conditions for how long the engine can search (e.g. we have 10 seconds and search up to depth 10).
|
|
:param ponder: Whether the engine can ponder.
|
|
:param draw_offered: Whether the bot was offered a draw.
|
|
:param root_moves: If it is a list, the engine will only play a move that is in `root_moves`.
|
|
:return: The move to play.
|
|
"""
|
|
time_limit = self.add_go_commands(time_limit)
|
|
result = self.engine.play(board,
|
|
time_limit,
|
|
info=chess.engine.INFO_ALL,
|
|
ponder=ponder,
|
|
draw_offered=draw_offered,
|
|
root_moves=root_moves if isinstance(root_moves, list) else None)
|
|
# Use null_score to have no effect on draw/resign decisions
|
|
null_score = chess.engine.PovScore(chess.engine.Mate(1), board.turn)
|
|
self.scores.append(result.info.get("score", null_score))
|
|
result = self.offer_draw_or_resign(result, board)
|
|
return result
|
|
|
|
def comment_index(self, move_stack_index: int) -> int:
|
|
"""
|
|
Get the index of a move for use in `comment_for_board_index`.
|
|
|
|
:param move_stack_index: The move number.
|
|
:return: The index of the move in `self.move_commentary`.
|
|
"""
|
|
if self.comment_start_index < 0:
|
|
return -1
|
|
else:
|
|
return move_stack_index - self.comment_start_index
|
|
|
|
def comment_for_board_index(self, index: int) -> MOVE_INFO_TYPE:
|
|
"""
|
|
Get the engine comments for a specific move.
|
|
|
|
:param index: The move number.
|
|
:return: The move comments.
|
|
"""
|
|
no_info: MOVE_INFO_TYPE = {}
|
|
comment_index = self.comment_index(index)
|
|
if comment_index < 0 or comment_index % 2 != 0:
|
|
return no_info
|
|
|
|
try:
|
|
return self.move_commentary[comment_index // 2]
|
|
except IndexError:
|
|
return no_info
|
|
|
|
def add_comment(self, move: chess.engine.PlayResult, board: chess.Board) -> None:
|
|
"""
|
|
Store the move's comments.
|
|
|
|
:param move: The move. Contains the comments in `move.info`.
|
|
:param board: The current position.
|
|
"""
|
|
if self.comment_start_index < 0:
|
|
self.comment_start_index = len(board.move_stack)
|
|
move_info: MOVE_INFO_TYPE = dict(move.info.copy()) if move.info else {}
|
|
if "pv" in move_info:
|
|
move_info["ponderpv"] = board.variation_san(move.info["pv"])
|
|
if "refutation" in move_info:
|
|
move_info["refutation"] = board.variation_san(move.info["refutation"])
|
|
if "currmove" in move_info:
|
|
move_info["currmove"] = board.san(move.info["currmove"])
|
|
self.move_commentary.append(move_info)
|
|
|
|
def print_stats(self) -> None:
|
|
"""Print the engine stats."""
|
|
for line in self.get_stats():
|
|
logger.info(line)
|
|
|
|
def readable_score(self, relative_score: chess.engine.PovScore) -> str:
|
|
"""Convert the score to a more human-readable format."""
|
|
score = relative_score.relative
|
|
cp_score = score.score()
|
|
if cp_score is None:
|
|
str_score = f"#{score.mate()}"
|
|
else:
|
|
str_score = str(round(cp_score / 100, 2))
|
|
return str_score
|
|
|
|
def readable_wdl(self, wdl: chess.engine.PovWdl) -> str:
|
|
"""Convert the WDL score to a percentage, so it is more human-readable."""
|
|
wdl_percentage = round(wdl.relative.expectation() * 100, 1)
|
|
return f"{wdl_percentage}%"
|
|
|
|
def readable_time(self, number: int) -> str:
|
|
"""Convert time given as a number into minutes and seconds, so it is more human-readable. e.g. 123 -> 2m 3s."""
|
|
minutes, seconds = divmod(number, 60)
|
|
if minutes >= 1:
|
|
return f"{minutes:0.0f}m {seconds:0.1f}s"
|
|
else:
|
|
return f"{seconds:0.1f}s"
|
|
|
|
def readable_number(self, number: int) -> str:
|
|
"""Convert number to a more human-readable format. e.g. 123456789 -> 123M."""
|
|
if number >= 1e9:
|
|
return f"{round(number / 1e9, 1)}B"
|
|
elif number >= 1e6:
|
|
return f"{round(number / 1e6, 1)}M"
|
|
elif number >= 1e3:
|
|
return f"{round(number / 1e3, 1)}K"
|
|
return str(number)
|
|
|
|
def to_readable_value(self, stat: str, info: MOVE_INFO_TYPE) -> str:
|
|
"""Change a value to a more human-readable format."""
|
|
readable: dict[str, Callable[[Any], str]] = {"Evaluation": self.readable_score, "Winrate": self.readable_wdl,
|
|
"Hashfull": lambda x: f"{round(x / 10, 1)}%",
|
|
"Nodes": self.readable_number,
|
|
"Speed": lambda x: f"{self.readable_number(x)}nps",
|
|
"Tbhits": self.readable_number,
|
|
"Cpuload": lambda x: f"{round(x / 10, 1)}%",
|
|
"Movetime": self.readable_time}
|
|
|
|
def identity(x: Any) -> str:
|
|
return str(x)
|
|
|
|
return str(readable.get(stat, identity)(info[stat]))
|
|
|
|
def get_stats(self, for_chat: bool = False) -> list[str]:
|
|
"""
|
|
Get the stats returned by the engine.
|
|
|
|
:param for_chat: Whether the stats will be sent to the game chat, which has a 140 character limit.
|
|
"""
|
|
can_index = self.move_commentary and self.move_commentary[-1]
|
|
info: MOVE_INFO_TYPE = self.move_commentary[-1].copy() if can_index else {}
|
|
|
|
def to_readable_item(stat: str, value: Any) -> tuple[str, Any]:
|
|
readable = {"wdl": "winrate", "ponderpv": "PV", "nps": "speed", "score": "evaluation", "time": "movetime"}
|
|
stat = readable.get(stat, stat)
|
|
if stat == "string" and value.startswith("lichess_bot-source:"):
|
|
stat = "source"
|
|
value = value.split(":", 1)[1]
|
|
return stat.title(), value
|
|
|
|
info = dict(to_readable_item(key, value) for (key, value) in info.items())
|
|
if "Source" not in info:
|
|
info["Source"] = "Engine"
|
|
|
|
stats = ["Source", "Evaluation", "Winrate", "Depth", "Nodes", "Speed", "Pv"]
|
|
if for_chat and "Pv" in info:
|
|
bot_stats = [f"{stat}: {self.to_readable_value(stat, info)}"
|
|
for stat in stats if stat in info and stat != "Pv"]
|
|
len_bot_stats = len(", ".join(bot_stats)) + PONDERPV_CHARACTERS
|
|
ponder_pv = info["Pv"].split()
|
|
try:
|
|
while len(" ".join(ponder_pv)) + len_bot_stats > lichess.MAX_CHAT_MESSAGE_LEN:
|
|
ponder_pv.pop()
|
|
if ponder_pv[-1].endswith("."):
|
|
ponder_pv.pop()
|
|
info["Pv"] = " ".join(ponder_pv)
|
|
except IndexError:
|
|
pass
|
|
if not info["Pv"]:
|
|
info.pop("Pv")
|
|
return [f"{stat}: {self.to_readable_value(stat, info)}" for stat in stats if stat in info]
|
|
|
|
def get_opponent_info(self, game: model.Game) -> None:
|
|
"""Get the opponent's information and sends it to the engine."""
|
|
opponent = chess.engine.Opponent(name=game.opponent.name,
|
|
title=game.opponent.title,
|
|
rating=game.opponent.rating,
|
|
is_engine=game.opponent.is_bot)
|
|
self.engine.send_opponent_information(opponent=opponent, engine_rating=game.me.rating)
|
|
|
|
def name(self) -> str:
|
|
"""Get the name of the engine."""
|
|
engine_info: dict[str, str] = dict(self.engine.id)
|
|
name: str = engine_info["name"]
|
|
return name
|
|
|
|
def get_pid(self) -> str:
|
|
"""Get the pid of the engine."""
|
|
pid = "?"
|
|
if self.engine.transport is not None:
|
|
pid = str(self.engine.transport.get_pid())
|
|
return pid
|
|
|
|
def ping(self) -> None:
|
|
"""Ping the engine."""
|
|
self.engine.ping()
|
|
|
|
def send_game_result(self, game: model.Game, board: chess.Board) -> None:
|
|
"""
|
|
Inform engine of the game ending.
|
|
|
|
:param game: The final game state from lichess.
|
|
:param board: The final board state.
|
|
"""
|
|
termination = game.state.get("status")
|
|
winner = game.state.get("winner")
|
|
winning_color = chess.WHITE if winner == "white" else chess.BLACK
|
|
|
|
if termination == model.Termination.MATE:
|
|
self.engine.send_game_result(board)
|
|
elif termination == model.Termination.RESIGN:
|
|
resigner = "White" if winner == "black" else "Black"
|
|
self.engine.send_game_result(board, winning_color, f"{resigner} resigned")
|
|
elif termination == model.Termination.ABORT:
|
|
self.engine.send_game_result(board, None, "Game aborted", False)
|
|
elif termination == model.Termination.DRAW:
|
|
draw_reason = None if board.is_game_over(claim_draw=True) else "Draw by agreement"
|
|
self.engine.send_game_result(board, None, draw_reason)
|
|
elif termination == model.Termination.TIMEOUT:
|
|
if winner:
|
|
self.engine.send_game_result(board, winning_color, "Time forfeiture")
|
|
else:
|
|
self.engine.send_game_result(board, None, "Time out with insufficient material")
|
|
else:
|
|
self.engine.send_game_result(board, None, termination)
|
|
|
|
def quit(self) -> None:
|
|
"""Close the engine."""
|
|
self.engine.quit()
|
|
self.engine.close()
|
|
|
|
|
|
class UCIEngine(EngineWrapper):
|
|
"""The class used to communicate with UCI engines."""
|
|
|
|
def __init__(self, commands: COMMANDS_TYPE, options: OPTIONS_TYPE, stderr: Optional[int],
|
|
draw_or_resign: config.Configuration, **popen_args: str) -> None:
|
|
"""
|
|
Communicate with UCI engines.
|
|
|
|
:param commands: The engine path and commands to send to the engine. e.g. ["engines/engine.exe", "--option1=value1"]
|
|
:param options: The options to send to the engine.
|
|
:param stderr: Whether we should silence the stderr.
|
|
:param draw_or_resign: Options on whether the bot should resign or offer draws.
|
|
:param popen_args: The cwd of the engine.
|
|
"""
|
|
super().__init__(options, draw_or_resign)
|
|
self.engine = chess.engine.SimpleEngine.popen_uci(commands, timeout=10., debug=False, setpgrp=False, stderr=stderr,
|
|
**popen_args)
|
|
self.engine.configure(options)
|
|
|
|
|
|
class XBoardEngine(EngineWrapper):
|
|
"""The class used to communicate with XBoard engines."""
|
|
|
|
def __init__(self, commands: COMMANDS_TYPE, options: OPTIONS_TYPE, stderr: Optional[int],
|
|
draw_or_resign: config.Configuration, **popen_args: str) -> None:
|
|
"""
|
|
Communicate with XBoard engines.
|
|
|
|
:param commands: The engine path and commands to send to the engine. e.g. ["engines/engine.exe", "--option1=value1"]
|
|
:param options: The options to send to the engine.
|
|
:param stderr: Whether we should silence the stderr.
|
|
:param draw_or_resign: Options on whether the bot should resign or offer draws.
|
|
:param popen_args: The cwd of the engine.
|
|
"""
|
|
super().__init__(options, draw_or_resign)
|
|
self.engine = chess.engine.SimpleEngine.popen_xboard(commands, timeout=10., debug=False, setpgrp=False,
|
|
stderr=stderr, **popen_args)
|
|
egt_paths = options.pop("egtpath", {}) or {}
|
|
features = self.engine.protocol.features if isinstance(self.engine.protocol, chess.engine.XBoardProtocol) else {}
|
|
egt_features = features.get("egt", "")
|
|
if isinstance(egt_features, str):
|
|
egt_types_from_engine = egt_features.split(",")
|
|
egt_type: str
|
|
for egt_type in filter(None, egt_types_from_engine):
|
|
if egt_type in egt_paths:
|
|
options[f"egtpath {egt_type}"] = egt_paths[egt_type]
|
|
else:
|
|
logger.debug(f"No paths found for egt type: {egt_type}.")
|
|
self.engine.configure(options)
|
|
|
|
|
|
class MinimalEngine(EngineWrapper):
|
|
"""
|
|
Subclass this to prevent a few random errors.
|
|
|
|
Even though MinimalEngine extends EngineWrapper,
|
|
you don't have to actually wrap an engine.
|
|
|
|
At minimum, just implement `search`,
|
|
however you can also change other methods like
|
|
`notify`, etc.
|
|
"""
|
|
|
|
def __init__(self, commands: COMMANDS_TYPE, options: OPTIONS_TYPE, stderr: Optional[int],
|
|
draw_or_resign: Configuration, name: Optional[str] = None, **popen_args: str) -> None:
|
|
"""
|
|
Initialize the values of the engine that all homemade engines inherit.
|
|
|
|
:param options: The options to send to the engine.
|
|
:param draw_or_resign: Options on whether the bot should resign or offer draws.
|
|
"""
|
|
super().__init__(options, draw_or_resign)
|
|
|
|
self.engine_name = self.__class__.__name__ if name is None else name
|
|
|
|
self.engine = FillerEngine(self, name=self.engine_name)
|
|
|
|
def get_pid(self) -> str:
|
|
"""Homemade engines don't have a pid, so we return a question mark."""
|
|
return "?"
|
|
|
|
def search(self, board: chess.Board, time_limit: chess.engine.Limit, ponder: bool, draw_offered: bool,
|
|
root_moves: MOVE) -> chess.engine.PlayResult:
|
|
"""
|
|
Choose a move.
|
|
|
|
The method to be implemented in your homemade engine.
|
|
NOTE: This method must return an instance of "chess.engine.PlayResult"
|
|
"""
|
|
raise NotImplementedError("The search method is not implemented")
|
|
|
|
def notify(self, method_name: str, *args: Any, **kwargs: Any) -> None:
|
|
"""
|
|
Enable the use of `self.engine.option1`.
|
|
|
|
The EngineWrapper class sometimes calls methods on "self.engine".
|
|
|
|
"self.engine" is a filler property that notifies <self>
|
|
whenever an attribute is called.
|
|
|
|
Nothing happens unless the main engine does something.
|
|
|
|
Simply put, the following code is equivalent
|
|
self.engine.<method_name>(<*args>, <**kwargs>)
|
|
self.notify(<method_name>, <*args>, <**kwargs>)
|
|
"""
|
|
pass
|
|
|
|
|
|
class FillerEngine:
|
|
"""
|
|
Not meant to be an actual engine.
|
|
|
|
This is only used to provide the property "self.engine"
|
|
in "MinimalEngine" which extends "EngineWrapper"
|
|
"""
|
|
|
|
def __init__(self, main_engine: MinimalEngine, name: str = "") -> None:
|
|
""":param name: The name to send to the chat."""
|
|
self.id: dict[str, str] = {
|
|
"name": name
|
|
}
|
|
self.name = name
|
|
self.main_engine = main_engine
|
|
|
|
def __getattr__(self, method_name: str) -> Any:
|
|
"""Provide the property `self.engine`."""
|
|
main_engine = self.main_engine
|
|
|
|
def method(*args: Any, **kwargs: Any) -> Any:
|
|
nonlocal main_engine
|
|
nonlocal method_name
|
|
return main_engine.notify(method_name, *args, **kwargs)
|
|
|
|
return method
|
|
|
|
|
|
def getHomemadeEngine(name: str) -> type[MinimalEngine]:
|
|
"""
|
|
Get the homemade engine with name `name`. e.g. If `name` is `RandomMove` then we will return `strategies.RandomMove`.
|
|
|
|
:param name: The name of the homemade engine.
|
|
:return: The engine with this name.
|
|
"""
|
|
from lib import strategies
|
|
engine: type[MinimalEngine] = getattr(strategies, name)
|
|
return engine
|
|
|
|
|
|
def move_time(board: chess.Board,
|
|
game: model.Game,
|
|
can_ponder: bool,
|
|
setup_timer: Timer,
|
|
move_overhead: datetime.timedelta,
|
|
is_correspondence: bool,
|
|
correspondence_move_time: datetime.timedelta) -> tuple[chess.engine.Limit, bool]:
|
|
"""
|
|
Determine the game clock settings for the current move.
|
|
|
|
:param Board: The current position.
|
|
:param game: Information about the current game.
|
|
:param setup_timer: How much time has passed since receiving the opponent's move.
|
|
:param move_overhead: How much time it takes to communicate with lichess.
|
|
:param can_ponder: Whether the bot is allowed to ponder after choosing a move.
|
|
:param is_correspondence: Whether the current game is a correspondence game.
|
|
:param correspondence_move_time: How much time to use for this move it it is a correspondence game.
|
|
:return: The time to choose a move and whether the bot can ponder after the move.
|
|
"""
|
|
if len(board.move_stack) < 2:
|
|
return first_move_time(game), False # No pondering after the first move since a new clock starts afterwards.
|
|
elif is_correspondence:
|
|
return single_move_time(board, game, correspondence_move_time, setup_timer, move_overhead), can_ponder
|
|
else:
|
|
return game_clock_time(board, game, setup_timer, move_overhead), can_ponder
|
|
|
|
|
|
def single_move_time(board: chess.Board, game: model.Game, search_time: datetime.timedelta,
|
|
setup_timer: Timer, move_overhead: datetime.timedelta) -> chess.engine.Limit:
|
|
"""
|
|
Calculate time to search in correspondence games.
|
|
|
|
:param board: The current positions.
|
|
:param game: The game that the bot is playing.
|
|
:param search_time: How long the engine should search.
|
|
:param setup_timer: How much time has passed since receiving the opponent's move.
|
|
:param move_overhead: The time it takes to communicate between the engine and lichess_bot.
|
|
:return: The time to choose a move.
|
|
"""
|
|
pre_move_time = setup_timer.time_since_reset()
|
|
overhead = pre_move_time + move_overhead
|
|
wb = "w" if board.turn == chess.WHITE else "b"
|
|
clock_time = max(msec(0), msec(game.state[f"{wb}time"]) - overhead)
|
|
search_time = min(search_time, clock_time)
|
|
logger.info(f"Searching for time {sec_str(search_time)} seconds for game {game.id}")
|
|
return chess.engine.Limit(time=to_seconds(search_time), clock_id="correspondence")
|
|
|
|
|
|
def first_move_time(game: model.Game) -> chess.engine.Limit:
|
|
"""
|
|
Determine time limit for the first move in the game.
|
|
|
|
:param game: The game that the bot is playing.
|
|
:return: The time to choose the first move.
|
|
"""
|
|
# Need to hardcode first movetime since Lichess has 30 sec limit.
|
|
search_time = seconds(10)
|
|
logger.info(f"Searching for time {sec_str(search_time)} seconds for game {game.id}")
|
|
return chess.engine.Limit(time=to_seconds(search_time), clock_id="first move")
|
|
|
|
|
|
def game_clock_time(board: chess.Board,
|
|
game: model.Game,
|
|
setup_timer: Timer,
|
|
move_overhead: datetime.timedelta) -> chess.engine.Limit:
|
|
"""
|
|
Get the time to play by the engine in realtime games.
|
|
|
|
:param board: The current positions.
|
|
:param game: The game that the bot is playing.
|
|
:param setup_timer: How much time has passed since receiving the opponent's move.
|
|
:param move_overhead: The time it takes to communicate between the engine and lichess_bot.
|
|
:return: The time to play a move.
|
|
"""
|
|
pre_move_time = setup_timer.time_since_reset()
|
|
overhead = pre_move_time + move_overhead
|
|
times = {side: msec(game.state[side]) for side in ["wtime", "btime"]}
|
|
wb = "w" if board.turn == chess.WHITE else "b"
|
|
times[f"{wb}time"] = max(msec(0), times[f"{wb}time"] - overhead)
|
|
logger.info(f"Searching for wtime {msec_str(times['wtime'])} btime {msec_str(times['btime'])} for game {game.id}")
|
|
return chess.engine.Limit(white_clock=to_seconds(times["wtime"]),
|
|
black_clock=to_seconds(times["btime"]),
|
|
white_inc=to_seconds(msec(game.state["winc"])),
|
|
black_inc=to_seconds(msec(game.state["binc"])),
|
|
clock_id="real time")
|
|
|
|
|
|
def check_for_draw_offer(game: model.Game) -> bool:
|
|
"""Check if the bot was offered a draw."""
|
|
return bool(game.state.get(f"{game.opponent_color[0]}draw"))
|
|
|
|
|
|
def get_book_move(board: chess.Board, game: model.Game,
|
|
polyglot_cfg: config.Configuration) -> chess.engine.PlayResult:
|
|
"""Get a move from an opening book."""
|
|
no_book_move = chess.engine.PlayResult(None, None)
|
|
use_book = polyglot_cfg.enabled
|
|
max_game_length = polyglot_cfg.max_depth * 2 - 1
|
|
if not use_book or len(board.move_stack) > max_game_length:
|
|
return no_book_move
|
|
|
|
if board.chess960:
|
|
variant = "chess960"
|
|
else:
|
|
variant = "standard" if board.uci_variant == "chess" else str(board.uci_variant)
|
|
|
|
config.change_value_to_list(polyglot_cfg.config, "book", key=variant)
|
|
books = polyglot_cfg.book.lookup(variant)
|
|
|
|
for book in books:
|
|
with chess.polyglot.open_reader(book) as reader:
|
|
try:
|
|
selection = polyglot_cfg.selection
|
|
min_weight = polyglot_cfg.min_weight
|
|
if selection == "weighted_random":
|
|
move = reader.weighted_choice(board).move
|
|
elif selection == "uniform_random":
|
|
move = reader.choice(board, minimum_weight=min_weight).move
|
|
elif selection == "best_move":
|
|
move = reader.find(board, minimum_weight=min_weight).move
|
|
except IndexError:
|
|
# python-chess raises "IndexError" if no entries found.
|
|
move = None
|
|
|
|
if move is not None:
|
|
logger.info(f"Got move {move} from book {book} for game {game.id}")
|
|
return chess.engine.PlayResult(move, None, {"string": "lichess_bot-source:Opening Book"})
|
|
|
|
return no_book_move
|
|
|
|
|
|
def get_online_move(li: lichess.Lichess, board: chess.Board, game: model.Game, online_moves_cfg: config.Configuration,
|
|
draw_or_resign_cfg: config.Configuration) -> Union[chess.engine.PlayResult, list[chess.Move]]:
|
|
"""
|
|
Get a move from an online source.
|
|
|
|
If `move_quality` is `suggest`, then it will return a list of moves for the engine to choose from.
|
|
"""
|
|
online_egtb_cfg = online_moves_cfg.online_egtb
|
|
best_move, wdl, comment = get_online_egtb_move(li, board, game, online_egtb_cfg)
|
|
if best_move is not None:
|
|
can_offer_draw = draw_or_resign_cfg.offer_draw_enabled
|
|
offer_draw_for_zero = draw_or_resign_cfg.offer_draw_for_egtb_zero
|
|
offer_draw = can_offer_draw and offer_draw_for_zero and wdl == 0
|
|
|
|
can_resign = draw_or_resign_cfg.resign_enabled
|
|
resign_on_egtb_loss = draw_or_resign_cfg.resign_for_egtb_minus_two
|
|
resign = can_resign and resign_on_egtb_loss and wdl == -2
|
|
|
|
wdl_to_score = {2: 9900, 1: 500, 0: 0, -1: -500, -2: -9900}
|
|
comment["score"] = chess.engine.PovScore(chess.engine.Cp(wdl_to_score[wdl]), board.turn)
|
|
if isinstance(best_move, str):
|
|
return chess.engine.PlayResult(chess.Move.from_uci(best_move),
|
|
None,
|
|
comment,
|
|
draw_offered=offer_draw,
|
|
resigned=resign)
|
|
return [chess.Move.from_uci(move) for move in best_move]
|
|
|
|
max_out_of_book_moves = online_moves_cfg.max_out_of_book_moves
|
|
max_opening_moves = online_moves_cfg.max_depth * 2 - 1
|
|
game_moves = len(board.move_stack)
|
|
if game_moves > max_opening_moves or out_of_online_opening_book_moves[game.id] >= max_out_of_book_moves:
|
|
return chess.engine.PlayResult(None, None)
|
|
|
|
chessdb_cfg = online_moves_cfg.chessdb_book
|
|
lichess_cloud_cfg = online_moves_cfg.lichess_cloud_analysis
|
|
opening_explorer_cfg = online_moves_cfg.lichess_opening_explorer
|
|
|
|
for online_source, cfg in ((get_chessdb_move, chessdb_cfg),
|
|
(get_lichess_cloud_move, lichess_cloud_cfg),
|
|
(get_opening_explorer_move, opening_explorer_cfg)):
|
|
best_move, comment = online_source(li, board, game, cfg)
|
|
if best_move:
|
|
return chess.engine.PlayResult(chess.Move.from_uci(best_move), None, comment)
|
|
|
|
out_of_online_opening_book_moves[game.id] += 1
|
|
used_opening_books = chessdb_cfg.enabled or lichess_cloud_cfg.enabled or opening_explorer_cfg.enabled
|
|
if out_of_online_opening_book_moves[game.id] == max_out_of_book_moves and used_opening_books:
|
|
logger.info(f"Will stop using online opening books for game {game.id}.")
|
|
return chess.engine.PlayResult(None, None)
|
|
|
|
|
|
def get_chessdb_move(li: lichess.Lichess, board: chess.Board, game: model.Game,
|
|
chessdb_cfg: config.Configuration) -> tuple[Optional[str], chess.engine.InfoDict]:
|
|
"""Get a move from chessdb.cn's opening book."""
|
|
wb = "w" if board.turn == chess.WHITE else "b"
|
|
use_chessdb = chessdb_cfg.enabled
|
|
time_left = msec(game.state[f"{wb}time"])
|
|
min_time = seconds(chessdb_cfg.min_time)
|
|
if not use_chessdb or time_left < min_time or board.uci_variant != "chess":
|
|
return None, {}
|
|
|
|
move = None
|
|
comment: chess.engine.InfoDict = {}
|
|
site = "https://www.chessdb.cn/cdb.php"
|
|
quality = chessdb_cfg.move_quality
|
|
action = {"best": "querypv",
|
|
"good": "querybest",
|
|
"all": "query"}
|
|
try:
|
|
params = {"action": action[quality],
|
|
"board": board.fen(),
|
|
"json": 1}
|
|
data = li.online_book_get(site, params=params)
|
|
if data["status"] == "ok":
|
|
if quality == "best":
|
|
depth = data["depth"]
|
|
if depth >= chessdb_cfg.min_depth:
|
|
score = data["score"]
|
|
move = data["pv"][0]
|
|
comment["score"] = chess.engine.PovScore(chess.engine.Cp(score), board.turn)
|
|
comment["depth"] = data["depth"]
|
|
comment["pv"] = list(map(chess.Move.from_uci, data["pv"]))
|
|
comment["string"] = "lichess_bot-source:ChessDB"
|
|
logger.info(f"Got move {move} from chessdb.cn (depth: {depth}, score: {score}) for game {game.id}")
|
|
else:
|
|
move = data["move"]
|
|
logger.info(f"Got move {move} from chessdb.cn for game {game.id}")
|
|
except Exception:
|
|
pass
|
|
|
|
return move, comment
|
|
|
|
|
|
def get_lichess_cloud_move(li: lichess.Lichess, board: chess.Board, game: model.Game,
|
|
lichess_cloud_cfg: config.Configuration) -> tuple[Optional[str], chess.engine.InfoDict]:
|
|
"""Get a move from the lichess's cloud analysis."""
|
|
wb = "w" if board.turn == chess.WHITE else "b"
|
|
time_left = msec(game.state[f"{wb}time"])
|
|
min_time = seconds(lichess_cloud_cfg.min_time)
|
|
use_lichess_cloud = lichess_cloud_cfg.enabled
|
|
if not use_lichess_cloud or time_left < min_time:
|
|
return None, {}
|
|
|
|
move = None
|
|
comment: chess.engine.InfoDict = {}
|
|
|
|
quality = lichess_cloud_cfg.move_quality
|
|
multipv = 1 if quality == "best" else 5
|
|
variant = "standard" if board.uci_variant == "chess" else board.uci_variant
|
|
|
|
try:
|
|
data = li.online_book_get("https://lichess.org/api/cloud-eval",
|
|
params={"fen": board.fen(),
|
|
"multiPv": multipv,
|
|
"variant": variant})
|
|
if "error" not in data:
|
|
depth = data["depth"]
|
|
knodes = data["knodes"]
|
|
min_depth = lichess_cloud_cfg.min_depth
|
|
min_knodes = lichess_cloud_cfg.min_knodes
|
|
if depth >= min_depth and knodes >= min_knodes:
|
|
if quality == "best":
|
|
pv = data["pvs"][0]
|
|
else:
|
|
best_eval = data["pvs"][0]["cp"]
|
|
pvs = data["pvs"]
|
|
max_difference = lichess_cloud_cfg.max_score_difference
|
|
if wb == "w":
|
|
pvs = list(filter(lambda pv: pv["cp"] >= best_eval - max_difference, pvs))
|
|
else:
|
|
pvs = list(filter(lambda pv: pv["cp"] <= best_eval + max_difference, pvs))
|
|
pv = random.choice(pvs)
|
|
move = pv["moves"].split()[0]
|
|
score = pv["cp"] if wb == "w" else -pv["cp"]
|
|
comment["score"] = chess.engine.PovScore(chess.engine.Cp(score), board.turn)
|
|
comment["depth"] = data["depth"]
|
|
comment["nodes"] = data["knodes"] * 1000
|
|
comment["pv"] = list(map(chess.Move.from_uci, pv["moves"].split()))
|
|
comment["string"] = "lichess_bot-source:Lichess Cloud Analysis"
|
|
logger.info(f"Got move {move} from lichess cloud analysis (depth: {depth}, score: {score}, knodes: {knodes})"
|
|
f" for game {game.id}")
|
|
except Exception:
|
|
pass
|
|
|
|
return move, comment
|
|
|
|
|
|
def get_opening_explorer_move(li: lichess.Lichess, board: chess.Board, game: model.Game,
|
|
opening_explorer_cfg: config.Configuration
|
|
) -> tuple[Optional[str], chess.engine.InfoDict]:
|
|
"""Get a move from lichess's opening explorer."""
|
|
wb = "w" if board.turn == chess.WHITE else "b"
|
|
time_left = msec(game.state[f"{wb}time"])
|
|
min_time = seconds(opening_explorer_cfg.min_time)
|
|
source = opening_explorer_cfg.source
|
|
if not opening_explorer_cfg.enabled or time_left < min_time or source == "master" and board.uci_variant != "chess":
|
|
return None, {}
|
|
|
|
move = None
|
|
comment: chess.engine.InfoDict = {}
|
|
variant = "standard" if board.uci_variant == "chess" else board.uci_variant
|
|
try:
|
|
if source == "masters":
|
|
params = {"fen": board.fen(), "moves": 100}
|
|
response = li.online_book_get("https://explorer.lichess.ovh/masters", params)
|
|
comment = {"string": "lichess_bot-source:Lichess Opening Explorer (Masters)"}
|
|
elif source == "player":
|
|
player = opening_explorer_cfg.player_name
|
|
if not player:
|
|
player = game.username
|
|
params = {"player": player, "fen": board.fen(), "moves": 100, "variant": variant,
|
|
"recentGames": 0, "color": "white" if wb == "w" else "black"}
|
|
response = li.online_book_get("https://explorer.lichess.ovh/player", params, True)
|
|
comment = {"string": "lichess_bot-source:Lichess Opening Explorer (Player)"}
|
|
else:
|
|
params = {"fen": board.fen(), "moves": 100, "variant": variant, "topGames": 0, "recentGames": 0}
|
|
response = li.online_book_get("https://explorer.lichess.ovh/lichess", params)
|
|
comment = {"string": "lichess_bot-source:Lichess Opening Explorer (Lichess)"}
|
|
moves = []
|
|
for possible_move in response["moves"]:
|
|
games_played = possible_move["white"] + possible_move["black"] + possible_move["draws"]
|
|
winrate = (possible_move["white"] + possible_move["draws"] * .5) / games_played
|
|
if games_played >= opening_explorer_cfg.min_games:
|
|
# We add both winrate and games_played to the tuple, so that if 2 moves are tied on the first metric,
|
|
# the second one will be used.
|
|
moves.append((winrate if opening_explorer_cfg.sort == "winrate" else games_played,
|
|
games_played if opening_explorer_cfg.sort == "winrate" else winrate, possible_move["uci"]))
|
|
moves.sort(reverse=True)
|
|
move = moves[0][2]
|
|
logger.info(f"Got move {move} from lichess opening explorer ({opening_explorer_cfg.sort}: {moves[0][0]})"
|
|
f" for game {game.id}")
|
|
except Exception:
|
|
pass
|
|
|
|
return move, comment
|
|
|
|
|
|
def get_online_egtb_move(li: lichess.Lichess, board: chess.Board, game: model.Game, online_egtb_cfg: config.Configuration
|
|
) -> tuple[Union[str, list[str], None], int, chess.engine.InfoDict]:
|
|
"""
|
|
Get a move from an online egtb (either by lichess or chessdb).
|
|
|
|
If `move_quality` is `suggest`, then it will return a list of moves for the engine to choose from.
|
|
"""
|
|
use_online_egtb = online_egtb_cfg.enabled
|
|
wb = "w" if board.turn == chess.WHITE else "b"
|
|
pieces = chess.popcount(board.occupied)
|
|
source = online_egtb_cfg.source
|
|
minimum_time = seconds(online_egtb_cfg.min_time)
|
|
if (not use_online_egtb
|
|
or msec(game.state[f"{wb}time"]) < minimum_time
|
|
or board.uci_variant not in ["chess", "antichess", "atomic"]
|
|
and source == "lichess"
|
|
or board.uci_variant != "chess"
|
|
and source == "chessdb"
|
|
or pieces > online_egtb_cfg.max_pieces
|
|
or board.castling_rights):
|
|
|
|
return None, -3, {}
|
|
|
|
quality = online_egtb_cfg.move_quality
|
|
variant = "standard" if board.uci_variant == "chess" else str(board.uci_variant)
|
|
|
|
try:
|
|
if source == "lichess":
|
|
return get_lichess_egtb_move(li, game, board, quality, variant)
|
|
elif source == "chessdb":
|
|
return get_chessdb_egtb_move(li, game, board, quality)
|
|
except Exception:
|
|
pass
|
|
|
|
return None, -3, {}
|
|
|
|
|
|
def get_egtb_move(board: chess.Board, game: model.Game, lichess_bot_tbs: config.Configuration,
|
|
draw_or_resign_cfg: config.Configuration) -> Union[chess.engine.PlayResult, list[chess.Move]]:
|
|
"""
|
|
Get a move from a local egtb.
|
|
|
|
If `move_quality` is `suggest`, then it will return a list of moves for the engine to choose from.
|
|
"""
|
|
best_move, wdl = get_syzygy(board, game, lichess_bot_tbs.syzygy)
|
|
source = "lichess_bot-source:Syzygy EGTB"
|
|
if best_move is None:
|
|
best_move, wdl = get_gaviota(board, game, lichess_bot_tbs.gaviota)
|
|
source = "lichess_bot-source:Gaviota EGTB"
|
|
if best_move:
|
|
can_offer_draw = draw_or_resign_cfg.offer_draw_enabled
|
|
offer_draw_for_zero = draw_or_resign_cfg.offer_draw_for_egtb_zero
|
|
offer_draw = bool(can_offer_draw and offer_draw_for_zero and wdl == 0)
|
|
|
|
can_resign = draw_or_resign_cfg.resign_enabled
|
|
resign_on_egtb_loss = draw_or_resign_cfg.resign_for_egtb_minus_two
|
|
resign = bool(can_resign and resign_on_egtb_loss and wdl == -2)
|
|
wdl_to_score = {2: 9900, 1: 500, 0: 0, -1: -500, -2: -9900}
|
|
comment: chess.engine.InfoDict = {"score": chess.engine.PovScore(chess.engine.Cp(wdl_to_score[wdl]), board.turn),
|
|
"string": source}
|
|
if isinstance(best_move, chess.Move):
|
|
return chess.engine.PlayResult(best_move, None, comment, draw_offered=offer_draw, resigned=resign)
|
|
return best_move
|
|
return chess.engine.PlayResult(None, None)
|
|
|
|
|
|
def get_lichess_egtb_move(li: lichess.Lichess, game: model.Game, board: chess.Board, quality: str,
|
|
variant: str) -> tuple[Union[str, list[str], None], int, chess.engine.InfoDict]:
|
|
"""
|
|
Get a move from lichess's egtb.
|
|
|
|
If `move_quality` is `suggest`, then it will return a list of moves for the engine to choose from.
|
|
"""
|
|
name_to_wld = {"loss": -2,
|
|
"maybe-loss": -1,
|
|
"blessed-loss": -1,
|
|
"draw": 0,
|
|
"cursed-win": 1,
|
|
"maybe-win": 1,
|
|
"win": 2}
|
|
pieces = chess.popcount(board.occupied)
|
|
max_pieces = 7 if board.uci_variant == "chess" else 6
|
|
if pieces <= max_pieces:
|
|
data = li.online_book_get(f"http://tablebase.lichess.ovh/{variant}",
|
|
params={"fen": board.fen()})
|
|
if quality == "best":
|
|
move = data["moves"][0]["uci"]
|
|
wdl = name_to_wld[data["moves"][0]["category"]] * -1
|
|
dtz = data["moves"][0]["dtz"] * -1
|
|
dtm = data["moves"][0]["dtm"]
|
|
if dtm:
|
|
dtm *= -1
|
|
logger.info(f"Got move {move} from tablebase.lichess.ovh (wdl: {wdl}, dtz: {dtz}, dtm: {dtm}) for game {game.id}")
|
|
else: # quality == "suggest":
|
|
best_wdl = name_to_wld[data["moves"][0]["category"]]
|
|
|
|
def good_enough(possible_move: LICHESS_EGTB_MOVE) -> bool:
|
|
return name_to_wld[possible_move["category"]] == best_wdl
|
|
|
|
possible_moves = list(filter(good_enough, data["moves"]))
|
|
if len(possible_moves) > 1:
|
|
move = [move["uci"] for move in possible_moves]
|
|
wdl = best_wdl * -1
|
|
logger.info(f"Suggesting moves from tablebase.lichess.ovh (wdl: {wdl}) for game {game.id}")
|
|
else:
|
|
best_move = possible_moves[0]
|
|
move = best_move["uci"]
|
|
wdl = name_to_wld[best_move["category"]] * -1
|
|
dtz = best_move["dtz"] * -1
|
|
dtm = best_move["dtm"]
|
|
if dtm:
|
|
dtm *= -1
|
|
logger.info(f"Got move {move} from tablebase.lichess.ovh (wdl: {wdl}, dtz: {dtz}, dtm: {dtm})"
|
|
f" for game {game.id}")
|
|
|
|
return move, wdl, {"string": "lichess_bot-source:Lichess EGTB"}
|
|
return None, -3, {}
|
|
|
|
|
|
def get_chessdb_egtb_move(li: lichess.Lichess, game: model.Game, board: chess.Board,
|
|
quality: str) -> tuple[Union[str, list[str], None], int, chess.engine.InfoDict]:
|
|
"""
|
|
Get a move from chessdb's egtb.
|
|
|
|
If `move_quality` is `suggest`, then it will return a list of moves for the engine to choose from.
|
|
"""
|
|
def score_to_wdl(score: int) -> int:
|
|
return piecewise_function([(-20000, 'e', 2),
|
|
(0, 'e', -1),
|
|
(0, 'i', 0),
|
|
(20000, 'i', 1)], 2, score)
|
|
|
|
def score_to_dtz(score: int) -> int:
|
|
return piecewise_function([(-20000, 'e', -30000 - score),
|
|
(0, 'e', -20000 - score),
|
|
(0, 'i', 0),
|
|
(20000, 'i', 20000 - score)], 30000 - score, score)
|
|
|
|
action = "querypv" if quality == "best" else "queryall"
|
|
data = li.online_book_get("https://www.chessdb.cn/cdb.php",
|
|
params={"action": action, "board": board.fen(), "json": 1})
|
|
if data["status"] == "ok":
|
|
if quality == "best":
|
|
score = data["score"]
|
|
move = data["pv"][0]
|
|
wdl = score_to_wdl(score)
|
|
dtz = score_to_dtz(score)
|
|
logger.info(f"Got move {move} from chessdb.cn (wdl: {wdl}, dtz: {dtz}) for game {game.id}")
|
|
else: # quality == "suggest"
|
|
best_wdl = score_to_wdl(data["moves"][0]["score"])
|
|
|
|
def good_enough(move: CHESSDB_EGTB_MOVE) -> bool:
|
|
return score_to_wdl(move["score"]) == best_wdl
|
|
|
|
possible_moves = list(filter(good_enough, data["moves"]))
|
|
if len(possible_moves) > 1:
|
|
wdl = score_to_wdl(possible_moves[0]["score"])
|
|
move = [move["uci"] for move in possible_moves]
|
|
logger.info(f"Suggesting moves from from chessdb.cn (wdl: {wdl}) for game {game.id}")
|
|
else:
|
|
best_move = possible_moves[0]
|
|
score = best_move["score"]
|
|
move = best_move["uci"]
|
|
wdl = score_to_wdl(score)
|
|
dtz = score_to_dtz(score)
|
|
logger.info(f"Got move {move} from chessdb.cn (wdl: {wdl}, dtz: {dtz}) for game {game.id}")
|
|
|
|
return move, wdl, {"string": "lichess_bot-source:ChessDB EGTB"}
|
|
return None, -3, {}
|
|
|
|
|
|
def get_syzygy(board: chess.Board, game: model.Game,
|
|
syzygy_cfg: config.Configuration) -> tuple[Union[chess.Move, list[chess.Move], None], int]:
|
|
"""
|
|
Get a move from local syzygy egtbs.
|
|
|
|
If `move_quality` is `suggest`, then it will return a list of moves for the engine to choose from.
|
|
"""
|
|
if (not syzygy_cfg.enabled
|
|
or chess.popcount(board.occupied) > syzygy_cfg.max_pieces
|
|
or board.uci_variant not in ["chess", "antichess", "atomic"]):
|
|
return None, -3
|
|
move: Union[chess.Move, list[chess.Move]]
|
|
move_quality = syzygy_cfg.move_quality
|
|
with chess.syzygy.open_tablebase(syzygy_cfg.paths[0]) as tablebase:
|
|
for path in syzygy_cfg.paths[1:]:
|
|
tablebase.add_directory(path)
|
|
|
|
try:
|
|
moves = score_syzygy_moves(board, dtz_scorer, tablebase)
|
|
|
|
best_wdl = max(map(dtz_to_wdl, moves.values()))
|
|
good_moves = [(move, dtz) for move, dtz in moves.items() if dtz_to_wdl(dtz) == best_wdl]
|
|
if move_quality == "suggest" and len(good_moves) > 1:
|
|
move = [chess_move for chess_move, dtz in good_moves]
|
|
logger.info(f"Suggesting moves from syzygy (wdl: {best_wdl}) for game {game.id}")
|
|
return move, best_wdl
|
|
else:
|
|
# There can be multiple moves with the same dtz.
|
|
best_dtz = min([dtz for chess_move, dtz in good_moves])
|
|
best_moves = [chess_move for chess_move, dtz in good_moves if dtz == best_dtz]
|
|
move = random.choice(best_moves)
|
|
logger.info(f"Got move {move.uci()} from syzygy (wdl: {best_wdl}, dtz: {best_dtz}) for game {game.id}")
|
|
return move, best_wdl
|
|
except KeyError:
|
|
# Attempt to only get the WDL score. It returns moves of quality="suggest", even if quality is set to "best".
|
|
try:
|
|
moves = score_syzygy_moves(board, lambda tablebase, b: -tablebase.probe_wdl(b), tablebase)
|
|
best_wdl = int(max(moves.values())) # int is there only for mypy.
|
|
good_chess_moves = [chess_move for chess_move, wdl in moves.items() if wdl == best_wdl]
|
|
logger.debug("Found moves using 'move_quality'='suggest'. We didn't find an '.rtbz' file for this endgame."
|
|
if move_quality == "best" else "")
|
|
if len(good_chess_moves) > 1:
|
|
move = good_chess_moves
|
|
logger.info(f"Suggesting moves from syzygy (wdl: {best_wdl}) for game {game.id}")
|
|
else:
|
|
move = good_chess_moves[0]
|
|
logger.info(f"Got move {move.uci()} from syzygy (wdl: {best_wdl}) for game {game.id}")
|
|
return move, best_wdl
|
|
except KeyError:
|
|
return None, -3
|
|
|
|
|
|
def dtz_scorer(tablebase: chess.syzygy.Tablebase, board: chess.Board) -> Union[int, float]:
|
|
"""
|
|
Score a position based on a syzygy DTZ egtb.
|
|
|
|
For a zeroing move (capture or pawn move), a DTZ of +/-0.5 is returned.
|
|
"""
|
|
dtz: Union[int, float] = -tablebase.probe_dtz(board)
|
|
dtz = dtz if board.halfmove_clock else math.copysign(.5, dtz)
|
|
return dtz + (math.copysign(board.halfmove_clock, dtz) if dtz else 0)
|
|
|
|
|
|
def dtz_to_wdl(dtz: Union[int, float]) -> int:
|
|
"""Convert DTZ scores to syzygy WDL scores.
|
|
|
|
A DTZ of +/-100 returns a draw score of +/-1 instead of a win/loss score of +/-2 because
|
|
a 50-move draw can be forced before checkmate can be forced.
|
|
"""
|
|
return piecewise_function([(-100, 'i', -1), (0, 'e', -2), (0, 'i', 0), (100, 'e', 2)], 1, dtz)
|
|
|
|
|
|
def get_gaviota(board: chess.Board, game: model.Game,
|
|
gaviota_cfg: config.Configuration) -> tuple[Union[chess.Move, list[chess.Move], None], int]:
|
|
"""
|
|
Get a move from local gaviota egtbs.
|
|
|
|
If `move_quality` is `suggest`, then it will return a list of moves for the engine to choose from.
|
|
"""
|
|
if (not gaviota_cfg.enabled
|
|
or chess.popcount(board.occupied) > gaviota_cfg.max_pieces
|
|
or board.uci_variant != "chess"):
|
|
return None, -3
|
|
move: Union[chess.Move, list[chess.Move]]
|
|
move_quality = gaviota_cfg.move_quality
|
|
# Since gaviota TBs use dtm and not dtz, we have to put a limit where after it the position are considered to have
|
|
# a syzygy wdl=1/-1, so the positions are draws under the 50 move rule. We use min_dtm_to_consider_as_wdl_1 as a
|
|
# second limit, because if a position has 5 pieces and dtm=110 it may take 98 half-moves, to go down to 4 pieces and
|
|
# another 12 to mate, so this position has a syzygy wdl=2/-2. To be safe, the first limit is 100 moves, which
|
|
# guarantees that all moves have a syzygy wdl=2/-2. Setting min_dtm_to_consider_as_wdl_1 to 100 will disable it
|
|
# because dtm >= dtz, so if abs(dtm) < 100 => abs(dtz) < 100, so wdl=2/-2.
|
|
min_dtm_to_consider_as_wdl_1 = gaviota_cfg.min_dtm_to_consider_as_wdl_1
|
|
with chess.gaviota.open_tablebase(gaviota_cfg.paths[0]) as tablebase:
|
|
for path in gaviota_cfg.paths[1:]:
|
|
tablebase.add_directory(path)
|
|
|
|
try:
|
|
moves = score_gaviota_moves(board, dtm_scorer, tablebase)
|
|
|
|
best_wdl = max(map(dtm_to_gaviota_wdl, moves.values()))
|
|
good_moves = [(move, dtm) for move, dtm in moves.items() if dtm_to_gaviota_wdl(dtm) == best_wdl]
|
|
best_dtm = min([dtm for move, dtm in good_moves])
|
|
|
|
pseudo_wdl = dtm_to_wdl(best_dtm, min_dtm_to_consider_as_wdl_1)
|
|
if move_quality == "suggest":
|
|
best_moves = good_enough_gaviota_moves(good_moves, best_dtm, min_dtm_to_consider_as_wdl_1)
|
|
if len(best_moves) > 1:
|
|
move = [chess_move for chess_move, dtm in best_moves]
|
|
logger.info(f"Suggesting moves from gaviota (pseudo wdl: {pseudo_wdl}) for game {game.id}")
|
|
else:
|
|
move, dtm = random.choice(best_moves)
|
|
logger.info(f"Got move {move.uci()} from gaviota (pseudo wdl: {pseudo_wdl}, dtm: {dtm})"
|
|
f" for game {game.id}")
|
|
else:
|
|
# There can be multiple moves with the same dtm.
|
|
best_moves = [(move, dtm) for move, dtm in good_moves if dtm == best_dtm]
|
|
move, dtm = random.choice(best_moves)
|
|
logger.info(f"Got move {move.uci()} from gaviota (pseudo wdl: {pseudo_wdl}, dtm: {dtm}) for game {game.id}")
|
|
return move, pseudo_wdl
|
|
except KeyError:
|
|
return None, -3
|
|
|
|
|
|
def dtm_scorer(tablebase: Union[chess.gaviota.NativeTablebase, chess.gaviota.PythonTablebase], board: chess.Board) -> int:
|
|
"""Score a position based on a gaviota DTM egtb."""
|
|
dtm = -tablebase.probe_dtm(board)
|
|
return dtm + int(math.copysign(board.halfmove_clock, dtm) if dtm else 0)
|
|
|
|
|
|
def dtm_to_gaviota_wdl(dtm: int) -> int:
|
|
"""Convert DTM scores to gaviota WDL scores."""
|
|
return piecewise_function([(-1, 'i', -1), (0, 'i', 0)], 1, dtm)
|
|
|
|
|
|
def dtm_to_wdl(dtm: int, min_dtm_to_consider_as_wdl_1: int) -> int:
|
|
"""Convert DTM scores to syzygy WDL scores."""
|
|
# We use 100 and not min_dtm_to_consider_as_wdl_1, because we want to play it safe and not resign in a
|
|
# position where dtz=-102 (only if resign_for_egtb_minus_two is enabled).
|
|
return piecewise_function([(-100, 'i', -1), (-1, 'i', -2), (0, 'i', 0), (min_dtm_to_consider_as_wdl_1, 'e', 2)], 1, dtm)
|
|
|
|
|
|
def good_enough_gaviota_moves(good_moves: list[tuple[chess.Move, int]], best_dtm: int,
|
|
min_dtm_to_consider_as_wdl_1: int) -> list[tuple[chess.Move, int]]:
|
|
"""
|
|
Get the moves that are good enough to consider.
|
|
|
|
:param good_moves: All the moves to choose from.
|
|
:param best_dtm: The best DTM score of a move.
|
|
:param min_dtm_to_consider_as_wdl_1: The minimum DTM score to consider as WDL=1.
|
|
:return: A list of the moves that are good enough to consider.
|
|
"""
|
|
if best_dtm < 100:
|
|
# If a move had wdl=2 and dtz=98, but halfmove_clock is 4 then the real wdl=1 and dtz=102, so we
|
|
# want to avoid these positions, if there is a move where even when we add the halfmove_clock the
|
|
# dtz is still <100.
|
|
return [(move, dtm) for move, dtm in good_moves if dtm < 100]
|
|
elif best_dtm < min_dtm_to_consider_as_wdl_1:
|
|
# If a move had wdl=2 and dtz=98, but halfmove_clock is 4 then the real wdl=1 and dtz=102, so we
|
|
# want to avoid these positions, if there is a move where even when we add the halfmove_clock the
|
|
# dtz is still <100.
|
|
return [(move, dtm) for move, dtm in good_moves if dtm < min_dtm_to_consider_as_wdl_1]
|
|
elif best_dtm <= -min_dtm_to_consider_as_wdl_1:
|
|
# If a move had wdl=-2 and dtz=-98, but halfmove_clock is 4 then the real wdl=-1 and dtz=-102, so we
|
|
# want to only choose between the moves where the real wdl=-1.
|
|
return [(move, dtm) for move, dtm in good_moves if dtm <= -min_dtm_to_consider_as_wdl_1]
|
|
elif best_dtm <= -100:
|
|
# If a move had wdl=-2 and dtz=-98, but halfmove_clock is 4 then the real wdl=-1 and dtz=-102, so we
|
|
# want to only choose between the moves where the real wdl=-1.
|
|
return [(move, dtm) for move, dtm in good_moves if dtm <= -100]
|
|
else:
|
|
return good_moves
|
|
|
|
|
|
def piecewise_function(range_definitions: list[tuple[Union[int, float], Literal['e', 'i'], int]], last_value: int,
|
|
position: Union[int, float]) -> int:
|
|
"""
|
|
Return a value according to a position argument.
|
|
|
|
This function is meant to replace if-elif-else blocks that turn ranges into discrete values.
|
|
Each tuple in the list has three parts: an upper limit, and inclusive/exclusive indicator, and
|
|
a value. For example,
|
|
`piecewise_function([(-20000, 'e', 2), (0, 'e' -1), (0, 'i', 0), (20000, 'i', 1)], 2, score)` is equivalent to:
|
|
|
|
if score < -20000:
|
|
return -2
|
|
elif score < 0:
|
|
return -1
|
|
elif score <= 0:
|
|
return 0
|
|
elif score <= 20000:
|
|
return 1
|
|
else:
|
|
return 2
|
|
|
|
Arguments:
|
|
range_definitions:
|
|
A list of tuples with the first element being the inclusive right border of region and the second
|
|
element being the associated value. An element of this list (a, 'i', b) corresponds to an
|
|
inclusive limit and is equivalent to
|
|
if x <= a:
|
|
return b
|
|
where x is the value of the position argument. An element of the form (a, 'e', b) corresponds to
|
|
an exclusive limit and is equivalent to
|
|
if x < a:
|
|
return b
|
|
For correct operation, this argument should be sorted by the first element. If two ranges have the
|
|
same border, one with 'e' and the other with 'i', the 'e' element should be first.
|
|
last_value:
|
|
If the position argument does not fall in any of the ranges in the range_definition argument,
|
|
return this value.
|
|
position:
|
|
The value that will be compared to the first element of the range_definitions tuples.
|
|
|
|
"""
|
|
for border, inc_exc, value in range_definitions:
|
|
if position < border or (inc_exc == 'i' and position == border):
|
|
return value
|
|
return last_value
|
|
|
|
|
|
def score_syzygy_moves(board: chess.Board,
|
|
scorer: Union[Callable[[chess.syzygy.Tablebase, chess.Board], int],
|
|
Callable[[chess.syzygy.Tablebase, chess.Board], Union[int, float]]],
|
|
tablebase: chess.syzygy.Tablebase) -> dict[chess.Move, Union[int, float]]:
|
|
"""Score all the moves using syzygy egtbs."""
|
|
moves = {}
|
|
for move in board.legal_moves:
|
|
board_copy = board.copy()
|
|
board_copy.push(move)
|
|
moves[move] = scorer(tablebase, board_copy)
|
|
return moves
|
|
|
|
|
|
def score_gaviota_moves(board: chess.Board,
|
|
scorer: Callable[[Union[chess.gaviota.NativeTablebase, chess.gaviota.PythonTablebase],
|
|
chess.Board], int],
|
|
tablebase: Union[chess.gaviota.NativeTablebase, chess.gaviota.PythonTablebase]
|
|
) -> dict[chess.Move, int]:
|
|
"""Score all the moves using gaviota egtbs."""
|
|
moves = {}
|
|
for move in board.legal_moves:
|
|
board_copy = board.copy()
|
|
board_copy.push(move)
|
|
moves[move] = scorer(tablebase, board_copy)
|
|
return moves
|