Change package /src/chesspp to just /chesspp

This commit is contained in:
2024-01-29 00:56:24 +01:00
parent 31a219ea87
commit 8fcedbbc8c
27 changed files with 124 additions and 124 deletions

0
chesspp/__init__.py Normal file
View File

172
chesspp/baysian_mcts.py Normal file
View File

@@ -0,0 +1,172 @@
import chess
from chesspp.i_mcts import *
from chesspp.i_strategy import IStrategy
from chesspp.util_gaussian import gaussian_ucb1, max_gaussian, min_gaussian
from chesspp.eval import score_manual
import numpy as np
import math
class BayesianMctsNode(IMctsNode):
def __init__(self, board: chess.Board, strategy: IStrategy, color: chess.Color, parent: Self | None, move: chess.Move | None,
random_state: random.Random, inherit_result: int | None = None, depth: int = 0):
super().__init__(board, strategy, parent, move, random_state)
self.color = color # Color of the player whose turn it is
self.visits = 0
self.result = inherit_result if inherit_result is not None else 0
self._set_mu_sigma()
self.depth = depth
def _create_child(self, move: chess.Move) -> IMctsNode:
copied_board = self.board.copy()
copied_board.push(move)
return BayesianMctsNode(copied_board, self.strategy, not self.color, self, move, self.random_state, self.result, self.depth+1)
def _set_mu_sigma(self) -> None:
self.mu = self.result
self.sigma = 1
def _is_new_ucb1_better(self, current, new) -> bool:
if self.color == chess.WHITE:
# maximize ucb1
return new > current
else:
# minimize ubc1
return new < current
def _select_best_child(self) -> IMctsNode:
"""
Returns the child with the *best* ucb1 score.
It chooses the child with maximum ucb1 for WHITE, and with minimum ucb1 for BLACK.
"""
if self.board.is_game_over():
return self
best_child = self.random_state.choice(self.children)
best_ucb1 = gaussian_ucb1(best_child.mu, best_child.sigma, self.visits)
for child in self.children:
# if child has no visits, prioritize this child.
if child.visits == 0:
best_child = child
break
# save child if it has a *better* score, than our previous best child.
ucb1 = gaussian_ucb1(child.mu, child.sigma, self.visits)
if self._is_new_ucb1_better(best_ucb1, ucb1):
best_ucb1 = ucb1
best_child = child
return best_child
def select(self) -> IMctsNode:
if len(self.children) == 0:
return self
else:
return self._select_best_child().select()
def expand(self) -> IMctsNode:
if self.visits == 0:
return self
for move in self.legal_moves:
self.children.append(self._create_child(move))
return self._select_best_child()
def rollout(self, rollout_depth: int = 20) -> int:
copied_board = self.board.copy()
steps = self.depth
for i in range(rollout_depth):
if copied_board.is_game_over():
break
m = self.strategy.pick_next_move(copied_board)
if m is None:
break
copied_board.push(m)
steps += 1
score = score_manual(copied_board) // steps
self.result = score
return score
def _combine_gaussians(self, mu1: float, sigma1: float, mu2: float, sigma2: float) -> tuple[float, float]:
if self.color == chess.WHITE:
return max_gaussian(mu1, sigma1, mu2, sigma2)
else:
return min_gaussian(mu1, sigma1, mu2, sigma2)
def backpropagate(self, score: int | None = None) -> None:
self.visits += 1
if score is not None:
self.result = score
if len(self.children) == 0:
# leaf node
self._set_mu_sigma()
else:
# interior node
shuffled_children = self.random_state.sample(self.children, len(self.children))
mu = shuffled_children[0].mu
sigma = shuffled_children[0].sigma
for c in shuffled_children[1:]:
mu, sigma = self._combine_gaussians(mu, sigma, c.mu, c.sigma)
# if max_sigma == 0:
# max_sigma = 0.001
self.mu = mu
self.sigma = sigma
if self.parent:
self.parent.backpropagate()
def print(self, indent=0):
print("\t"*indent + f"move={self.move}, visits={self.visits}, mu={self.mu}, sigma={self.sigma}")
for c in self.children:
c.print(indent+1)
class BayesianMcts(IMcts):
def __init__(self, board: chess.Board, strategy: IStrategy, color: chess.Color, seed: int | None = None):
super().__init__(board, strategy, seed)
self.root = BayesianMctsNode(board, strategy, color,None, None, self.random_state)
self.root.visits += 1
self.color = color
def sample(self, runs: int = 1000) -> None:
for i in range(runs):
#print(f"sample {i}")
leaf_node = self.root.select().expand()
_ = leaf_node.rollout()
leaf_node.backpropagate()
def apply_move(self, move: chess.Move) -> None:
self.board.push(move)
self.color = not self.color
# if a child node contains the move, set this child as new root
for child in self.get_children():
if child.move == move:
self.root = child
self.root.parent = None
return
# if no child node contains the move, initialize a new tree.
self.root = BayesianMctsNode(self.board, self.root.strategy, self.color, None, None, self.random_state)
def get_children(self) -> list[IMctsNode]:
return self.root.children
def get_moves(self) -> Dict[chess.Move, int]:
res = {}
for c in self.root.children:
res[c.move] = c.mu
return res
def print(self):
print("================================")
self.root.print()

111
chesspp/classic_mcts.py Normal file
View File

@@ -0,0 +1,111 @@
import chess
import random
import numpy as np
from chesspp import eval
from chesspp import util
class ClassicMcts:
def __init__(self, board: chess.Board, color: chess.Color, parent=None, move: chess.Move | None = None,
random_state: int | None = None):
self.random = random.Random(random_state)
self.board = board
self.color = color
self.parent = parent
self.move = move
self.children = []
self.visits = 0
self.legal_moves = list(board.legal_moves)
self.untried_actions = self.legal_moves
self.score = 0
def _expand(self) -> 'ClassicMcts':
"""
Expands the node, i.e., choose an action and apply it to the board
:return:
"""
move = self.random.choice(self.untried_actions)
self.untried_actions.remove(move)
next_board = self.board.copy()
next_board.push(move)
child_node = ClassicMcts(next_board, color=self.color, parent=self, move=move)
self.children.append(child_node)
return child_node
def _rollout(self, rollout_depth: int = 20) -> int:
"""
Rolls out the node by simulating a game for a given depth.
Sometimes this step is called 'simulation' or 'playout'.
:return: the score of the rolled out game
"""
copied_board = self.board.copy()
steps = 1
for i in range(rollout_depth):
if copied_board.is_game_over():
break
m = util.pick_move(copied_board)
copied_board.push(m)
steps += 1
return eval.score_manual(copied_board) // steps
def _backpropagate(self, score: float) -> None:
"""
Backpropagates the results of the rollout
:param score:
:return:
"""
self.visits += 1
# TODO: maybe use score + num of moves together (a win in 1 move is better than a win in 20 moves)
self.score += score
if self.parent:
self.parent._backpropagate(score)
def is_fully_expanded(self) -> bool:
return len(self.untried_actions) == 0
def _best_child(self) -> 'ClassicMcts':
"""
Picks the best child according to our policy
:return: the best child
"""
# NOTE: maybe clamp the score between [-1, +1] instead of [-inf, +inf]
choices_weights = [(c.score / c.visits) + np.sqrt(((2 * np.log(self.visits)) / c.visits))
for c in self.children]
best_child_index = np.argmax(choices_weights) if self.color == chess.WHITE else np.argmin(choices_weights)
return self.children[best_child_index]
def _select_leaf(self) -> 'ClassicMcts':
"""
Selects a leaf node.
If the node is not expanded is will be expanded.
:return: Leaf node
"""
current_node = self
while not current_node.board.is_game_over():
if not current_node.is_fully_expanded():
return current_node._expand()
else:
current_node = current_node._best_child()
return current_node
def build_tree(self, samples: int = 1000) -> 'ClassicMcts':
"""
Runs the MCTS with the given number of samples
:param samples: number of simulations
:return: best node containing the best move
"""
for i in range(samples):
# selection & expansion
# rollout
# backpropagate score
node = self._select_leaf()
score = node._rollout()
node._backpropagate(score)
return self._best_child()

120
chesspp/engine.py Normal file
View File

@@ -0,0 +1,120 @@
from abc import ABC, abstractmethod
import chess
import chess.engine
import random
import time
from chesspp.classic_mcts import ClassicMcts
from chesspp.baysian_mcts import BayesianMcts
from chesspp.random_strategy import RandomStrategy
class Limit:
""" Class to determine when to stop searching for moves """
time: float|None
""" Search for `time` seconds """
nodes: int|None
""" Search for a limited number of `nodes`"""
def __init__(self, time: float|None = None, nodes: int|None = None):
self.time = time
self.nodes = nodes
def run(self, func, *args, **kwargs):
"""
Run `func` until the limit condition is reached
:param func: the func that performs one search iteration
:param *args: are passed to `func`
:param **kwargs: are passed to `func`
"""
if self.nodes:
self._run_nodes(func, *args, **kwargs)
elif self.time:
self._run_time(func, *args, **kwargs)
def _run_nodes(self, func, *args, **kwargs):
for _ in range(self.nodes):
func(*args, **kwargs)
def _run_time(self, func, *args, **kwargs):
start = time.perf_counter_ns()
while (time.perf_counter_ns()-start)/1e9 < self.time:
func(*args, **kwargs)
class Engine(ABC):
color: chess.Color
"""The side the engine plays (``chess.WHITE`` or ``chess.BLACK``)."""
def __init__(self, color: chess.Color):
self.color = color
@abstractmethod
def play(self, board: chess.Board, limit: Limit) -> chess.engine.PlayResult:
"""
Return the next action the engine chooses based on the given board
:param board: the chess board
:param limit: a limit specifying when to stop searching
:return: the engine's PlayResult
"""
pass
@staticmethod
@abstractmethod
def get_name() -> str:
"""
Return the engine's name
:return: the engine's name
"""
pass
class BayesMctsEngine(Engine):
def __init__(self, color: chess.Color):
super().__init__(color)
@staticmethod
def get_name() -> str:
return "BayesMctsEngine"
def play(self, board: chess.Board, limit: Limit) -> chess.engine.PlayResult:
strategy = RandomStrategy(random.Random())
bayes_mcts = BayesianMcts(board, strategy, self.color)
bayes_mcts.sample(1000)
# limit.run(lambda: mcts_root.build_tree())
best_move = max(bayes_mcts.get_moves().items(), key=lambda x: x[1])[0] if board.turn == chess.WHITE else (
min(bayes_mcts.get_moves().items(), key=lambda x: x[1])[0])
print(best_move)
return chess.engine.PlayResult(move=best_move, ponder=None)
class ClassicMctsEngine(Engine):
def __init__(self, color: chess.Color):
super().__init__(color)
@staticmethod
def get_name() -> str:
return "ClassicMctsEngine"
def play(self, board: chess.Board, limit: Limit) -> chess.engine.PlayResult:
mcts_root = ClassicMcts(board, self.color)
mcts_root.build_tree()
# limit.run(lambda: mcts_root.build_tree())
best_move = max(mcts_root.children, key=lambda x: x.score).move if board.turn == chess.WHITE else (
min(mcts_root.children, key=lambda x: x.score).move)
return chess.engine.PlayResult(move=best_move, ponder=None)
class RandomEngine(Engine):
def __init__(self, color: chess.Color):
super().__init__(color)
@staticmethod
def get_name() -> str:
return "Random"
def play(self, board: chess.Board, limit: Limit) -> chess.engine.PlayResult:
move = random.choice(list(board.legal_moves))
return chess.engine.PlayResult(move=move, ponder=None)

197
chesspp/eval.py Normal file
View File

@@ -0,0 +1,197 @@
import chess
import chess.engine
import sys
# Eval constants for scoring chess boards
# Evaluation metric inspired by Tomasz Michniewski: https://www.chessprogramming.org/Simplified_Evaluation_Function
PIECE_VALUES = {
chess.PAWN: 100,
chess.KNIGHT: 320,
chess.BISHOP: 330,
chess.ROOK: 500,
chess.QUEEN: 900,
chess.KING: 20000
}
pawn_eval = [
0, 0, 0, 0, 0, 0, 0, 0,
5, 10, 10, -20, -20, 10, 10, 5,
5, -5, -10, 0, 0, -10, -5, 5,
0, 0, 0, 20, 20, 0, 0, 0,
5, 5, 10, 25, 25, 10, 5, 5,
10, 10, 20, 30, 30, 20, 10, 10,
50, 50, 50, 50, 50, 50, 50, 50,
0, 0, 0, 0, 0, 0, 0, 0
]
knight_eval = [
-50, -40, -30, -30, -30, -30, -40, -50,
-40, -20, 0, 0, 0, 0, -20, -40,
-30, 0, 10, 15, 15, 10, 0, -30,
-30, 5, 15, 20, 20, 15, 5, -30,
-30, 0, 15, 20, 20, 15, 0, -30,
-30, 5, 10, 15, 15, 10, 5, -30,
-40, -20, 0, 5, 5, 0, -20, -40,
-50, -40, -30, -30, -30, -30, -40, -50
]
bishop_eval = [
-20, -10, -10, -10, -10, -10, -10, -20,
-10, 5, 0, 0, 0, 0, 5, -10,
-10, 10, 10, 10, 10, 10, 10, -10,
-10, 0, 10, 10, 10, 10, 0, -10,
-10, 5, 5, 10, 10, 5, 5, -10,
-10, 0, 5, 10, 10, 5, 0, -10,
-10, 0, 0, 0, 0, 0, 0, -10,
-20, -10, -10, -10, -10, -10, -10, -20
]
rook_eval = [
0, 0, 0, 5, 5, 0, 0, 0,
-5, 0, 0, 0, 0, 0, 0, -5,
-5, 0, 0, 0, 0, 0, 0, -5,
-5, 0, 0, 0, 0, 0, 0, -5,
-5, 0, 0, 0, 0, 0, 0, -5,
-5, 0, 0, 0, 0, 0, 0, -5,
5, 10, 10, 10, 10, 10, 10, 5,
0, 0, 0, 0, 0, 0, 0, 0
]
queen_eval = [
-20, -10, -10, -5, -5, -10, -10, -20,
-10, 0, 0, 0, 0, 0, 0, -10,
-10, 0, 5, 5, 5, 5, 0, -10,
-5, 0, 5, 5, 5, 5, 0, -5,
0, 0, 5, 5, 5, 5, 0, -5,
-10, 5, 5, 5, 5, 5, 0, -10,
-10, 0, 5, 0, 0, 0, 0, -10,
-20, -10, -10, -5, -5, -10, -10, -20
]
king_eval = [
20, 30, 10, 0, 0, 10, 30, 20,
20, 20, 0, 0, 0, 0, 20, 20,
-10, -20, -20, -20, -20, -20, -20, -10,
20, -30, -30, -40, -40, -30, -30, -20,
-30, -40, -40, -50, -50, -40, -40, -30,
-30, -40, -40, -50, -50, -40, -40, -30,
-30, -40, -40, -50, -50, -40, -40, -30,
-30, -40, -40, -50, -50, -40, -40, -30
]
king_endgame_eval = [
50, -30, -30, -30, -30, -30, -30, -50,
-30, -30, 0, 0, 0, 0, -30, -30,
-30, -10, 20, 30, 30, 20, -10, -30,
-30, -10, 30, 40, 40, 30, -10, -30,
-30, -10, 30, 40, 40, 30, -10, -30,
-30, -10, 20, 30, 30, 20, -10, -30,
-30, -20, -10, 0, 0, -10, -20, -30,
-50, -40, -30, -20, -20, -30, -40, -50
]
PIECE_TABLES = {
chess.WHITE: {
chess.PAWN: pawn_eval,
chess.KNIGHT: knight_eval,
chess.BISHOP: bishop_eval,
chess.ROOK: rook_eval,
chess.QUEEN: queen_eval,
chess.KING: king_eval,
'end_game_king': king_endgame_eval
},
chess.BLACK: {
chess.PAWN: list(reversed(pawn_eval)),
chess.KNIGHT: list(reversed(knight_eval)),
chess.BISHOP: list(reversed(bishop_eval)),
chess.ROOK: list(reversed(rook_eval)),
chess.QUEEN: list(reversed(queen_eval)),
chess.KING: list(reversed(king_eval)),
'end_game_king': list(reversed(king_endgame_eval))
}
}
def check_endgame(board: chess.Board) -> bool:
"""
Endgame according to Tomasz Michniewski:
1. Both sides have no queens or
2. Every side which has a queen has additionally no other pieces or one minorpiece maximum.
"""
queens_white = 0
minors_white = 0
queens_black = 0
minors_black = 0
for s in chess.SQUARES:
piece = board.piece_at(s)
if piece is None:
continue
if piece.piece_type == chess.QUEEN:
if piece.color == chess.WHITE:
queens_white += 1
else:
queens_black += 1
if piece.piece_type == chess.BISHOP or piece.piece_type == chess.KNIGHT:
if piece.color == chess.WHITE:
minors_white += 1
else:
minors_black += 1
return (queens_black == 0 and queens_white == 0) or ((queens_black >= 1 >= minors_black) or (
queens_white >= 1 >= minors_white))
def score_manual(board: chess.Board) -> int:
"""
Calculate the score of a given board.
Positive scores indicate an advantage for WHITE, negative scores indicate and advantage for BLACK.
The range of scores is from approx. -1.100.000 to 1.100.000
:param board: the chess board
:return: score
"""
outcome = board.outcome()
if outcome is not None:
if outcome.termination == chess.Termination.CHECKMATE:
return 1_100_000 if outcome.winner == chess.WHITE else -1_100_000
else: # draw
return 0
score = 0
for s in chess.SQUARES:
piece = board.piece_at(s)
if piece is None:
continue
if piece.color == chess.WHITE:
if piece.piece_type == chess.KING and check_endgame(board):
score += PIECE_VALUES[piece.piece_type] * PIECE_TABLES[chess.WHITE]['end_game_king'][s]
else:
score += PIECE_VALUES[piece.piece_type] * PIECE_TABLES[chess.WHITE][piece.piece_type][s]
else:
if piece.piece_type == chess.KING and check_endgame(board):
score -= PIECE_VALUES[piece.piece_type] * PIECE_TABLES[chess.BLACK]['end_game_king'][s]
else:
score -= PIECE_VALUES[piece.piece_type] * PIECE_TABLES[chess.BLACK][piece.piece_type][s]
return score
def score_stockfish(board: chess.Board) -> chess.engine.PovScore:
"""
Calculate the score of the given board using stockfish
:param board:
:return:
"""
engine = chess.engine.SimpleEngine.popen_uci("/home/luke/projects/pp-project/chess-engine-pp/stockfish/stockfish-ubuntu-x86-64-avx2")
info = engine.analyse(board, chess.engine.Limit(depth=0))
engine.quit()
return info["score"]
def score_lc0(board: chess.Board) -> chess.engine.PovScore:
"""
Calculate the score of the given board using lc0
:param board:
:return:
"""
engine = chess.engine.SimpleEngine.popen_uci("/home/luke/projects/pp-project/chess-engine-pp/lc0/lc0")
info = engine.analyse(board, chess.engine.Limit(depth=4))
engine.quit()
return info["score"]

99
chesspp/i_mcts.py Normal file
View File

@@ -0,0 +1,99 @@
import chess
import random
from abc import ABC, abstractmethod
from typing import Dict, Self
from chesspp.i_strategy import IStrategy
class IMctsNode(ABC):
def __init__(self, board: chess.Board, strategy: IStrategy, parent: Self | None, move: chess.Move | None,
random_state: random.Random):
self.board = board
self.strategy = strategy
self.parent = parent
self.children = []
self.move = move
self.legal_moves = list(board.legal_moves)
self.random_state = random_state
@abstractmethod
def select(self) -> Self:
"""
Selects the next node leaf node in the tree
:return:
"""
pass
@abstractmethod
def expand(self) -> Self:
"""
Expands this node creating X child leaf nodes, i.e., choose an action and apply it to the board
:return:
"""
pass
@abstractmethod
def rollout(self, rollout_depth: int = 20) -> int:
"""
Rolls out the node by simulating a game for a given depth.
Sometimes this step is called 'simulation' or 'playout'.
:return: the score of the rolled out game
"""
pass
@abstractmethod
def backpropagate(self, score: float) -> None:
"""
Backpropagates the results of the rollout
:param score:
:return:
"""
pass
class IMcts(ABC):
def __init__(self, board: chess.Board, strategy: IStrategy, seed: int | None):
self.board = board
self.strategy = strategy
self.random_state = random.Random(seed)
@abstractmethod
def sample(self, runs: int = 1000) -> None:
"""
Run the MCTS simulation
:param runs: number of runs
:return:
"""
pass
@abstractmethod
def apply_move(self, move: chess.Move) -> None:
"""
Apply the move to the chess board
:param move: move to apply
:return:
"""
pass
@abstractmethod
def get_children(self) -> list[IMctsNode]:
"""
Return the immediate children of the root node
:return: list of immediate children of mcts root
"""
pass
@abstractmethod
def get_moves(self) -> Dict[chess.Move, int]:
"""
Return all legal moves from this node with respective scores
:return: dictionary with moves as key and scores as values
"""
pass
"""
TODO: add score class:
how many moves until the end of the game?
score ranges?
perspective of white/black
"""

11
chesspp/i_strategy.py Normal file
View File

@@ -0,0 +1,11 @@
from abc import ABC, abstractmethod
import chess
# TODO extend class
class IStrategy(ABC):
@abstractmethod
def pick_next_move(self, board: chess.Board) -> chess.Move:
pass

21
chesspp/lichess-engine.py Normal file
View File

@@ -0,0 +1,21 @@
from lichess_bot.lib.engine_wrapper import MinimalEngine, MOVE
import chess.engine
from chesspp import engine
class ProbStockfish(MinimalEngine):
def search(self, board: chess.Board, time_limit: chess.engine.Limit, ponder: bool, draw_offered: bool,
root_moves: MOVE) -> chess.engine.PlayResult:
moves = {}
untried_moves = list(board.legal_moves)
for move in untried_moves:
mean, std = engine.simulate_game(board, move, 10)
moves[move] = (mean, std)
return self.get_best_move(moves)
def get_best_move(self, moves: dict) -> chess.engine.PlayResult:
best_avg = max(moves.items(), key=lambda m: m[1][0])
next_move = best_avg[0]
return chess.engine.PlayResult(next_move, None)

View File

@@ -0,0 +1,13 @@
import chess
import random
from chesspp.i_strategy import IStrategy
class RandomStrategy(IStrategy):
def __init__(self, random_state: random.Random):
self.random_state = random_state
def pick_next_move(self, board: chess.Board) -> chess.Move | None:
if len(list(board.legal_moves)) == 0:
return None
return self.random_state.choice(list(board.legal_moves))

73
chesspp/simulation.py Normal file
View File

@@ -0,0 +1,73 @@
import multiprocessing as mp
import random
import chess
import chess.pgn
from typing import Tuple, List
from enum import Enum
from dataclasses import dataclass
from chesspp.engine import Engine, Limit
class Winner(Enum):
Engine_A = 0
Engine_B = 1
Draw = 2
@dataclass
class EvaluationResult:
winner: Winner
game: chess.pgn.Game
def simulate_game(white: Engine, black: Engine, limit: Limit) -> chess.pgn.Game:
board = chess.Board()
is_white_playing = True
while not board.is_game_over():
play_result = white.play(board, limit) if is_white_playing else black.play(board, limit)
board.push(play_result.move)
is_white_playing = not is_white_playing
game = chess.pgn.Game.from_board(board)
game.headers['White'] = white.get_name()
game.headers['Black'] = black.get_name()
return game
class Evaluation:
def __init__(self, engine_a: Engine.__class__, engine_b: Engine.__class__, limit: Limit):
self.engine_a = engine_a
self.engine_b = engine_b
self.limit = limit
def run(self, n_games=100) -> List[EvaluationResult]:
with mp.Pool(mp.cpu_count()) as pool:
args = [(self.engine_a, self.engine_b, self.limit) for i in range(n_games)]
return pool.map(Evaluation._test_simulate, args)
@staticmethod
def _test_simulate(arg: Tuple[Engine.__class__, Engine.__class__, Limit]) -> EvaluationResult:
engine_a, engine_b, limit = arg
flip_engines = bool(random.getrandbits(1))
if flip_engines:
black, white = engine_a(chess.BLACK), engine_b(chess.WHITE)
else:
white, black = engine_a(chess.WHITE), engine_b(chess.BLACK)
game = simulate_game(white, black, limit)
winner = game.end().board().outcome().winner
result = Winner.Draw
match (winner, flip_engines):
case (chess.WHITE, True):
result = Winner.Engine_B
case (chess.BLACK, True):
result = Winner.Engine_A
case (chess.WHITE, False):
result = Winner.Engine_A
case (chess.BLACK, False):
result = Winner.Engine_B
return EvaluationResult(result, game)

BIN
chesspp/static_data/bB.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.4 KiB

BIN
chesspp/static_data/bK.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 2.9 KiB

BIN
chesspp/static_data/bN.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.8 KiB

BIN
chesspp/static_data/bP.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 777 B

BIN
chesspp/static_data/bQ.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 2.6 KiB

BIN
chesspp/static_data/bR.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 748 B

View File

@@ -0,0 +1,40 @@
<!DOCTYPE html>
<html>
<head>
<title>ChessPP</title>
<link rel="stylesheet"
href="https://unpkg.com/@chrisoakman/chessboardjs@1.0.0/dist/chessboard-1.0.0.min.css"
integrity="sha384-q94+BZtLrkL1/ohfjR8c6L+A6qzNH9R2hBLwyoAfu3i/WCvQjzL2RQJ3uNHDISdU"
crossorigin="anonymous">
<script src="https://ajax.googleapis.com/ajax/libs/jquery/3.5.1/jquery.min.js"></script>
<script src="https://unpkg.com/@chrisoakman/chessboardjs@1.0.0/dist/chessboard-1.0.0.min.js"
integrity="sha384-8Vi8VHwn3vjQ9eUHUxex3JSN/NFqUg3QbPyX8kWyb93+8AC/pPWTzj+nHtbC5bxD"
crossorigin="anonymous"></script>
</head>
<body>
<div id="board1" style="width: 400px"></div>
<script>
var board1 = Chessboard('board1', 'start')
const socket = new WebSocket("ws://localhost:8080/ws");
socket.addEventListener("open", (event) => {
socket.send("Hello Server!");
});
socket.addEventListener("message", (event) => {
board1.position(event.data)
console.log("Message from server ", event.data);
});
</script>
</body>
</html>

BIN
chesspp/static_data/wB.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 2.3 KiB

BIN
chesspp/static_data/wK.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 2.8 KiB

BIN
chesspp/static_data/wN.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 2.3 KiB

BIN
chesspp/static_data/wP.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.5 KiB

BIN
chesspp/static_data/wQ.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 3.7 KiB

BIN
chesspp/static_data/wR.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.1 KiB

79
chesspp/util.py Normal file
View File

@@ -0,0 +1,79 @@
import chess
import chess.engine
from stockfish import Stockfish
import numpy as np
import random
def pick_move(board: chess.Board) -> chess.Move | None:
"""
Pick a random move
:param board: chess board
:return: a valid move or None if no valid move available
"""
if len(list(board.legal_moves)) == 0:
return None
return random.choice(list(board.legal_moves))
def simulate_game(board: chess.Board, move: chess.Move, depth: int):
"""
Simulate a game starting with the given move
:param board: chess board
:param move: chosen move
:param depth: number of moves that should be simulated after playing the chosen move
:return: the score for the simulated game
"""
engine = chess.engine.SimpleEngine.popen_uci("./stockfish/stockfish-ubuntu-x86-64-avx2")
board.push(move)
for i in range(depth):
if board.is_game_over():
engine.quit()
return
r = engine.play(board, chess.engine.Limit(depth=2))
board.push(r.move)
engine.quit()
def simulate_stockfish_prob(board: chess.Board, move: chess.Move, games: int = 10, depth: int = 10) -> (float, float):
"""
Simulate a game using
:param board: chess board
:param move: chosen move
:param games: number of games that should be simulated after playing the move
:param depth: simulation depth per game
:return:
"""
board.push(move)
copied_board = board.copy()
scores = []
stockfish = Stockfish("./stockfish/stockfish-ubuntu-x86-64-avx2", depth=2, parameters={"Threads": 8, "Hash": 2048})
stockfish.set_elo_rating(1200)
stockfish.set_fen_position(board.fen())
def reset_game():
nonlocal scores, copied_board, board
score = eval.score_stockfish(copied_board).white().score(mate_score=100_000)
scores.append(score)
copied_board = board.copy()
stockfish.set_fen_position(board.fen())
for _ in range(games):
for d in range(depth):
if copied_board.is_game_over() or d == depth - 1:
reset_game()
break
if d == depth - 1:
reset_game()
top_moves = stockfish.get_top_moves(3)
chosen_move = random.choice(top_moves)['Move']
stockfish.make_moves_from_current_position([chosen_move])
copied_board.push(chess.Move.from_uci(chosen_move))
print(scores)
# TODO: return distribution here?
return np.array(scores).mean(), np.array(scores).std()

107
chesspp/util_gaussian.py Normal file
View File

@@ -0,0 +1,107 @@
import math
import torch
import torch.distributions as dist
from torch import exp
F1: dict[float, float] = {}
F2: dict[float, float] = {}
CDF: dict[float, float] = {}
lookup_count = 0
def get_lookup_count():
global lookup_count
return lookup_count
def max_gaussian(mu1, sigma1, mu2, sigma2) -> tuple[float, float]:
global lookup_count
global F1
global F2
global CDF
"""
Returns the combined max gaussian of two Gaussians represented by mu1, sigma1, mu2, simga2
:param mu1: mu of the first Gaussian
:param sigma1: sigma of the first Gaussian
:param mu2: mu of the second Gaussian
:param sigma2: sigma of the second Gaussian
:return: mu and sigma maximized
"""
# we assume independence of the two gaussians
#print(mu1, sigma1, mu2, sigma2)
normal = dist.Normal(0, 1)
sigma_m = math.sqrt(sigma1 ** 2 + sigma2 ** 2)
alpha = (mu1 - mu2) / sigma_m
if alpha in CDF:
cdf_alpha = CDF[alpha]
lookup_count += 1
else:
cdf_alpha = normal.cdf(torch.tensor(alpha)).item()
CDF[alpha] = cdf_alpha
pdf_alpha = exp(normal.log_prob(torch.tensor(alpha))).item()
if alpha in F1:
f1_alpha = F1[alpha]
lookup_count += 1
else:
f1_alpha = alpha * cdf_alpha + pdf_alpha
F1[alpha] = f1_alpha
if alpha in F2:
f2_alpha = F2[alpha]
lookup_count += 1
else:
f2_alpha = alpha ** 2 * cdf_alpha * (1 - cdf_alpha) + (
1 - 2 * cdf_alpha) * alpha * pdf_alpha - pdf_alpha ** 2
F2[alpha] = f2_alpha
mu = mu2 + sigma_m * f1_alpha
sigma = math.sqrt(sigma2 ** 2 + (sigma1 ** 2 - sigma2 ** 2) * cdf_alpha + sigma_m ** 2 * f2_alpha)
#sigma = math.sqrt((mu1**2 + sigma1**2) * cdf_alpha + (mu2**2 + sigma2**2) * (1 - cdf_alpha) + (mu1 + mu2) * sigma_m * pdf_alpha - mu**2)
return mu, sigma
def min_gaussian(mu1, sigma1, mu2, sigma2) -> tuple[float, float]:
"""
Returns the combined min gaussian of two Gaussians represented by mu1, sigma1, mu2, simga2
:param mu1: mu of the first Gaussian
:param sigma1: sigma of the first Gaussian
:param mu2: mu of the second Gaussian
:param sigma2: sigma of the second Gaussian
:return: mu and sigma minimized
"""
try:
normal = dist.Normal(0, 1)
sigma_m = math.sqrt(sigma1 ** 2 + sigma2 ** 2)
alpha = (mu1 - mu2) / sigma_m
cdf_alpha = normal.cdf(torch.tensor(alpha)).item()
pdf_alpha = exp(normal.log_prob(torch.tensor(alpha))).item()
pdf_alpha_neg = exp(normal.log_prob(torch.tensor(-alpha))).item()
mu = mu1 * (1 - cdf_alpha) + mu2 * cdf_alpha - pdf_alpha_neg * sigma_m
sigma = math.sqrt((mu1**2 + sigma1**2) * (1 - cdf_alpha) + (mu2**2 + sigma2**2) * cdf_alpha - (mu1 + mu2) * sigma_m * pdf_alpha - mu**2)
return mu, sigma
except ValueError:
print(mu1, sigma1, mu2, sigma2)
def beta_mean(alpha, beta):
return alpha / (alpha + beta)
def beta_std(alpha, beta):
try:
return math.sqrt((alpha * beta) / ((alpha * beta)**2 * (alpha + beta + 1)))
except ZeroDivisionError:
print(alpha, beta)
def gaussian_ucb1(mu, sigma, N) -> float:
return mu + math.sqrt(2 * math.log(N) * sigma)

103
chesspp/web.py Normal file
View File

@@ -0,0 +1,103 @@
import os
import asyncio
import aiohttp
from aiohttp import web
import chess
from chesspp import engine
_DIR = os.path.abspath(os.path.dirname(__file__))
_DATA_DIR = os.path.abspath(os.path.join(_DIR, "static_data"))
_INDEX = os.path.join(_DATA_DIR, "index.html")
def load_index() -> str:
"""
Load and return the chessboard html file from disk
"""
with open(_INDEX, 'r') as fp:
return fp.read()
class Simulate:
""" Run a simulation of two engines"""
def __init__(self, engine_white=None, engine_black=None):
if engine_white is None:
engine_white = engine.ClassicMctsEngine(chess.WHITE)
if engine_black is None:
engine_black = engine.ClassicMctsEngine(chess.BLACK)
self.white = engine_white
self.black = engine_black
def run(self, limit: engine.Limit):
board = chess.Board()
is_white_playing = True
while not board.is_game_over():
play_result = self.white.play(board, limit) if is_white_playing else self.black.play(board, limit)
board.push(play_result.move)
yield board
is_white_playing = not is_white_playing
class WebInterface:
def __init__(self, white_engine: engine.Engine.__class__, black_engine: engine.Engine.__class__, limit: engine.Limit):
self.white = white_engine
self.black = black_engine
self.limit = limit
async def handle_index(self, request) -> web.Response:
""" Entry point of webpage, returns the index html"""
return web.Response(text=load_index(), content_type='text/html')
async def handle_websocket(self, request):
""" Handles a websocket connection to the frontend"""
ws = web.WebSocketResponse()
await ws.prepare(request)
async def wait_msg():
""" Handles messages from client """
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
if msg.data == 'close':
await ws.close()
elif msg.type == aiohttp.WSMsgType.ERROR:
print(f'ws connection closed with exception {ws.exception()}')
async def turns():
""" Simulates the game and sends the response to the client """
runner = Simulate(self.white(chess.WHITE), self.black(chess.BLACK)).run(limit)
def sim():
return next(runner, None)
board = await asyncio.to_thread(sim)
while board is not None:
await ws.send_str(board.fen())
board = await asyncio.to_thread(sim)
async with asyncio.TaskGroup() as tg:
tg.create_task(wait_msg())
tg.create_task(turns())
print('websocket connection closed')
return ws
def run_app(self):
app = web.Application()
app.add_routes([
web.get('/', self.handle_index),
web.get('/ws', self.handle_websocket),
web.static('/img/chesspieces/wikipedia/', _DATA_DIR),
])
web.run_app(app)
if __name__ == '__main__':
limit = engine.Limit(time=0.5)
WebInterface(engine.ClassicMctsEngine, engine.ClassicMctsEngine, limit).run_app()