diff --git a/main.py b/main.py index 7934599..c8484b6 100644 --- a/main.py +++ b/main.py @@ -1,7 +1,10 @@ +import random import chess import chess.engine import chess.pgn from src.chesspp.classic_mcts import ClassicMcts +from src.chesspp.baysian_mcts import BayesianMcts +from src.chesspp.random_strategy import RandomStrategy from src.chesspp import engine from src.chesspp import util from src.chesspp import simulation, eval @@ -24,6 +27,18 @@ def test_mcts(): print("move (mcts):", c.move, " with score:", c.score) +def test_bayes_mcts(): + global lookup_count + fools_mate = "rnbqkbnr/pppp1ppp/4p3/8/5PP1/8/PPPPP2P/RNBQKBNR b KQkq f3 0 2" + board = chess.Board(fools_mate) + seed = 1 + stategy = RandomStrategy(random.Random(seed)) + mcts = BayesianMcts(board, stategy, seed) + mcts.sample() + for c in mcts.get_children(): + print("move (mcts):", c.move, " with score:", c.mu) + + def test_stockfish(): fools_mate = "rnbqkbnr/pppp1ppp/4p3/8/5PP1/8/PPPPP2P/RNBQKBNR b KQkq f3 0 2" board = chess.Board(fools_mate) diff --git a/requirements.txt b/requirements.txt index 2e4154a..c16a52a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,6 @@ chess==1.10.0 numpy==1.26.3 stockfish==3.28.0 +torch==2.1.2 +pytest aiohttp \ No newline at end of file diff --git a/src/chesspp/baysian_mcts.py b/src/chesspp/baysian_mcts.py new file mode 100644 index 0000000..30f9213 --- /dev/null +++ b/src/chesspp/baysian_mcts.py @@ -0,0 +1,145 @@ +import chess +from src.chesspp.i_mcts import * +from src.chesspp.i_strategy import IStrategy +from src.chesspp.util_gaussian import gaussian_ucb1, max_gaussian, beta_std, beta_mean +from src.chesspp.eval import * +import numpy as np +import math + + +class BayesianMctsNode(IMctsNode): + def __init__(self, board: chess.Board, strategy: IStrategy, parent: Self | None, move: chess.Move | None, + random_state: random.Random, inherit_results: list[int] | None = None): + super().__init__(board, strategy, parent, move, random_state) + self.visits = 0 + self.results = inherit_results.copy() if inherit_results is not None else [1, 1] + + self._set_mu_sigma() + + def _create_child(self, move: chess.Move): + copied_board = self.board.copy() + copied_board.push(move) + return BayesianMctsNode(copied_board, self.strategy, self, move, self.random_state, inherit_results=self.results) + + def _set_mu_sigma(self): + alpha = self.results[0] + beta = self.results[1] + + self.mu = beta_mean(alpha, beta) + self.sigma = beta_std(alpha, beta) + + def _select_child(self) -> IMctsNode: + # select child by modified UCB1 + if self.board.is_game_over(): + return self + + best_child = self.random_state.choice(self.children) + best_val = gaussian_ucb1(best_child.mu, best_child.sigma, self.visits) + for c in self.children: + g = gaussian_ucb1(c.mu, c.sigma, self.visits) + + if g > best_val: + best_val = g + best_child = c + return best_child + + def select(self) -> IMctsNode: + if len(self.children) == 0: + return self + else: + return self._select_child().select() + + def expand(self) -> IMctsNode: + if self.visits == 0: + return self + + for move in self.legal_moves: + self.children.append(self._create_child(move)) + + return self._select_child() + + def rollout(self, rollout_depth: int = 20) -> int: + copied_board = self.board.copy() + steps = 1 + for i in range(rollout_depth): + if copied_board.is_game_over(): + break + + m = self.strategy.pick_next_move(copied_board) + if m is None: + break + + copied_board.push(m) + steps += 1 + + score = eval.score_manual(copied_board) // steps + if score > 0: + self.results[1] += 1 + else: + self.results[0] += abs(score) // 50_000 + return score + + def backpropagate(self, score: int | None = None) -> None: + self.visits += 1 + + if score is not None: + self.results.append(score) + + if len(self.children) == 0: + # leaf node + self._set_mu_sigma() + else: + # interior node + shuffled_children = self.random_state.sample(self.children, len(self.children)) + max_mu = shuffled_children[0].mu + max_sigma = shuffled_children[0].sigma + for c in shuffled_children[1:]: + max_mu, max_sigma = max_gaussian(max_mu, max_sigma, c.mu, c.sigma) + + if max_sigma == 0: + max_sigma = 0.001 + self.mu = max_mu + self.sigma = max_sigma + + if self.parent: + self.parent.backpropagate() + + def print(self, indent=0): + print("\t"*indent + f"visits={self.visits}, mu={self.mu}, sigma={self.sigma}") + for c in self.children: + c.print(indent+1) + + +class BayesianMcts(IMcts): + def __init__(self, board: chess.Board, strategy: IStrategy, seed: int | None = None): + super().__init__(board, strategy, seed) + self.root = BayesianMctsNode(board, strategy, None, None, self.random_state) + self.root.visits += 1 + + def sample(self, runs: int = 1000) -> None: + for i in range(runs): + #print(f"sample {i}") + leaf_node = self.root.select().expand() + _ = leaf_node.rollout() + leaf_node.backpropagate() + #self.root.print() + + def apply_move(self, move: chess.Move) -> None: + self.board.push(move) + + # if a child node contains the move, set this child as new root + for child in self.get_children(): + if child.move == move: + self.root = child + self.root.parent = None + return + + # if no child node contains the move, initialize a new tree. + self.root = BayesianMctsNode(self.board, self.root.strategy, None, None, self.random_state) + + def get_children(self) -> list[IMctsNode]: + return self.root.children + + def print(self): + print("================================") + self.root.print() \ No newline at end of file diff --git a/src/chesspp/classic_mcts.py b/src/chesspp/classic_mcts.py index 46ff5f3..afd8a18 100644 --- a/src/chesspp/classic_mcts.py +++ b/src/chesspp/classic_mcts.py @@ -1,111 +1,111 @@ -import chess -import random -import numpy as np - - -from chesspp import eval -from chesspp import util - - -class ClassicMcts: - - def __init__(self, board: chess.Board, color: chess.Color, parent=None, move: chess.Move | None = None, - random_state: int | None = None): - self.random = random.Random(random_state) - self.board = board - self.color = color - self.parent = parent - self.move = move - self.children = [] - self.visits = 0 - self.legal_moves = list(board.legal_moves) - self.untried_actions = self.legal_moves - self.score = 0 - - def _expand(self) -> 'ClassicMcts': - """ - Expands the node, i.e., choose an action and apply it to the board - :return: - """ - move = self.random.choice(self.untried_actions) - self.untried_actions.remove(move) - next_board = self.board.copy() - next_board.push(move) - child_node = ClassicMcts(next_board, color=self.color, parent=self, move=move) - self.children.append(child_node) - return child_node - - def _rollout(self, rollout_depth: int = 20) -> int: - """ - Rolls out the node by simulating a game for a given depth. - Sometimes this step is called 'simulation' or 'playout'. - :return: the score of the rolled out game - """ - copied_board = self.board.copy() - steps = 1 - for i in range(rollout_depth): - if copied_board.is_game_over(): - break - - m = util.pick_move(copied_board) - copied_board.push(m) - steps += 1 - - return eval.score_manual(copied_board) // steps - - def _backpropagate(self, score: float) -> None: - """ - Backpropagates the results of the rollout - :param score: - :return: - """ - self.visits += 1 - # TODO: maybe use score + num of moves together (a win in 1 move is better than a win in 20 moves) - self.score += score - if self.parent: - self.parent._backpropagate(score) - - def is_fully_expanded(self) -> bool: - return len(self.untried_actions) == 0 - - def _best_child(self) -> 'ClassicMcts': - """ - Picks the best child according to our policy - :return: the best child - """ - # NOTE: maybe clamp the score between [-1, +1] instead of [-inf, +inf] - choices_weights = [(c.score / c.visits) + np.sqrt(((2 * np.log(self.visits)) / c.visits)) - for c in self.children] - best_child_index = np.argmax(choices_weights) if self.color == chess.WHITE else np.argmin(choices_weights) - return self.children[best_child_index] - - def _select_leaf(self) -> 'ClassicMcts': - """ - Selects a leaf node. - If the node is not expanded is will be expanded. - :return: Leaf node - """ - current_node = self - while not current_node.board.is_game_over(): - if not current_node.is_fully_expanded(): - return current_node._expand() - else: - current_node = current_node._best_child() - - return current_node - - def build_tree(self, samples: int = 1000) -> 'ClassicMcts': - """ - Runs the MCTS with the given number of samples - :param samples: number of simulations - :return: best node containing the best move - """ - for i in range(samples): - # selection & expansion - # rollout - # backpropagate score - node = self._select_leaf() - score = node._rollout() - node._backpropagate(score) - - return self._best_child() +import chess +import random +import numpy as np + + +from src.chesspp import eval +from src.chesspp import util + + +class ClassicMcts: + + def __init__(self, board: chess.Board, color: chess.Color, parent=None, move: chess.Move | None = None, + random_state: int | None = None): + self.random = random.Random(random_state) + self.board = board + self.color = color + self.parent = parent + self.move = move + self.children = [] + self.visits = 0 + self.legal_moves = list(board.legal_moves) + self.untried_actions = self.legal_moves + self.score = 0 + + def _expand(self) -> 'ClassicMcts': + """ + Expands the node, i.e., choose an action and apply it to the board + :return: + """ + move = self.random.choice(self.untried_actions) + self.untried_actions.remove(move) + next_board = self.board.copy() + next_board.push(move) + child_node = ClassicMcts(next_board, color=self.color, parent=self, move=move) + self.children.append(child_node) + return child_node + + def _rollout(self, rollout_depth: int = 20) -> int: + """ + Rolls out the node by simulating a game for a given depth. + Sometimes this step is called 'simulation' or 'playout'. + :return: the score of the rolled out game + """ + copied_board = self.board.copy() + steps = 1 + for i in range(rollout_depth): + if copied_board.is_game_over(): + break + + m = util.pick_move(copied_board) + copied_board.push(m) + steps += 1 + + return eval.score_manual(copied_board) // steps + + def _backpropagate(self, score: float) -> None: + """ + Backpropagates the results of the rollout + :param score: + :return: + """ + self.visits += 1 + # TODO: maybe use score + num of moves together (a win in 1 move is better than a win in 20 moves) + self.score += score + if self.parent: + self.parent._backpropagate(score) + + def is_fully_expanded(self) -> bool: + return len(self.untried_actions) == 0 + + def _best_child(self) -> 'ClassicMcts': + """ + Picks the best child according to our policy + :return: the best child + """ + # NOTE: maybe clamp the score between [-1, +1] instead of [-inf, +inf] + choices_weights = [(c.score / c.visits) + np.sqrt(((2 * np.log(self.visits)) / c.visits)) + for c in self.children] + best_child_index = np.argmax(choices_weights) if self.color == chess.WHITE else np.argmin(choices_weights) + return self.children[best_child_index] + + def _select_leaf(self) -> 'ClassicMcts': + """ + Selects a leaf node. + If the node is not expanded is will be expanded. + :return: Leaf node + """ + current_node = self + while not current_node.board.is_game_over(): + if not current_node.is_fully_expanded(): + return current_node._expand() + else: + current_node = current_node._best_child() + + return current_node + + def build_tree(self, samples: int = 1000) -> 'ClassicMcts': + """ + Runs the MCTS with the given number of samples + :param samples: number of simulations + :return: best node containing the best move + """ + for i in range(samples): + # selection & expansion + # rollout + # backpropagate score + node = self._select_leaf() + score = node._rollout() + node._backpropagate(score) + + return self._best_child() diff --git a/src/chesspp/engine.py b/src/chesspp/engine.py index 3b98e4a..2f365de 100644 --- a/src/chesspp/engine.py +++ b/src/chesspp/engine.py @@ -3,7 +3,7 @@ import chess import chess.engine import random import time -from chesspp.classic_mcts import ClassicMcts +from src.chesspp.classic_mcts import ClassicMcts class Limit: """ Class to determine when to stop searching for moves """ diff --git a/src/chesspp/i_mcts.py b/src/chesspp/i_mcts.py index 4293744..69478b7 100644 --- a/src/chesspp/i_mcts.py +++ b/src/chesspp/i_mcts.py @@ -1,13 +1,61 @@ import chess +import random from abc import ABC, abstractmethod -from typing import Dict -from chesspp.i_strategy import IStrategy +from typing import Dict, Self +from src.chesspp.i_strategy import IStrategy + + +class IMctsNode(ABC): + def __init__(self, board: chess.Board, strategy: IStrategy, parent: Self | None, move: chess.Move | None, + random_state: random.Random): + self.board = board + self.strategy = strategy + self.parent = parent + self.children = [] + self.move = move + self.legal_moves = list(board.legal_moves) + self.random_state = random_state + + @abstractmethod + def select(self) -> Self: + """ + Selects the next node leaf node in the tree + :return: + """ + pass + + @abstractmethod + def expand(self) -> Self: + """ + Expands this node creating X child leaf nodes, i.e., choose an action and apply it to the board + :return: + """ + pass + + @abstractmethod + def rollout(self, rollout_depth: int = 20) -> int: + """ + Rolls out the node by simulating a game for a given depth. + Sometimes this step is called 'simulation' or 'playout'. + :return: the score of the rolled out game + """ + pass + + @abstractmethod + def backpropagate(self, score: float) -> None: + """ + Backpropagates the results of the rollout + :param score: + :return: + """ + pass class IMcts(ABC): - - def __init__(self, board: chess.Board, strategy: IStrategy): + def __init__(self, board: chess.Board, strategy: IStrategy, seed: int | None): self.board = board + self.strategy = strategy + self.random_state = random.Random(seed) @abstractmethod def sample(self, runs: int = 1000) -> None: @@ -28,7 +76,7 @@ class IMcts(ABC): pass @abstractmethod - def get_children(self) -> list['IMcts']: + def get_children(self) -> list[IMctsNode]: """ Return the immediate children of the root node :return: list of immediate children of mcts root diff --git a/src/chesspp/i_strategy.py b/src/chesspp/i_strategy.py index 5d8dc4d..cd7229c 100644 --- a/src/chesspp/i_strategy.py +++ b/src/chesspp/i_strategy.py @@ -1,8 +1,11 @@ from abc import ABC, abstractmethod +import chess + + # TODO extend class class IStrategy(ABC): @abstractmethod - def pick_next_move(self, ): + def pick_next_move(self, board: chess.Board) -> chess.Move: pass diff --git a/src/chesspp/random_strategy.py b/src/chesspp/random_strategy.py new file mode 100644 index 0000000..bc9cb41 --- /dev/null +++ b/src/chesspp/random_strategy.py @@ -0,0 +1,13 @@ +import chess +import random +from src.chesspp.i_strategy import IStrategy + + +class RandomStrategy(IStrategy): + def __init__(self, random_state: random.Random): + self.random_state = random_state + + def pick_next_move(self, board: chess.Board) -> chess.Move | None: + if len(list(board.legal_moves)) == 0: + return None + return self.random_state.choice(list(board.legal_moves)) diff --git a/src/chesspp/simulation.py b/src/chesspp/simulation.py index a04fca6..f0ad838 100644 --- a/src/chesspp/simulation.py +++ b/src/chesspp/simulation.py @@ -6,7 +6,7 @@ from typing import Tuple, List from enum import Enum from dataclasses import dataclass -from chesspp.engine import Engine, Limit +from src.chesspp.engine import Engine, Limit class Winner(Enum): diff --git a/src/chesspp/util_gaussian.py b/src/chesspp/util_gaussian.py new file mode 100644 index 0000000..41c15de --- /dev/null +++ b/src/chesspp/util_gaussian.py @@ -0,0 +1,83 @@ +import math + +import torch +import torch.distributions as dist +from torch import exp + +F1: dict[float, float] = {} +F2: dict[float, float] = {} +CDF: dict[float, float] = {} +lookup_count = 0 + + +def max_gaussian_numeric(mu1, sigma1, mu2, sigma2) -> (float, float): + pass + + +def max_gaussian(mu1, sigma1, mu2, sigma2) -> (float, float): + global lookup_count + global F1 + global F2 + global CDF + + """ + Returns the combined max gaussian of two Gaussians represented by mu1, sigma1, mu2, simga2 + :param mu1: mu of the first Gaussian + :param sigma1: sigma of the first Gaussian + :param mu2: mu of the second Gaussian + :param sigma2: sigma of the second Gaussian + """ + # we assume independence of the two gaussians + try: + #print(mu1, sigma1, mu2, sigma2) + normal = dist.Normal(0, 1) + sigma_m = math.sqrt(sigma1 ** 2 + sigma2 ** 2) + alpha = (mu1 - mu2) / sigma_m + + if alpha in CDF: + cdf_alpha = CDF[alpha] + lookup_count += 1 + else: + cdf_alpha = normal.cdf(torch.tensor(alpha)).item() + CDF[alpha] = cdf_alpha + + pdf_alpha = exp(normal.log_prob(torch.tensor(alpha))).item() + + if alpha in F1: + f1_alpha = F1[alpha] + lookup_count += 1 + else: + f1_alpha = alpha * cdf_alpha + pdf_alpha + F1[alpha] = f1_alpha + + if alpha in F2: + f2_alpha = F2[alpha] + lookup_count += 1 + else: + f2_alpha = alpha ** 2 * cdf_alpha * (1 - cdf_alpha) + ( + 1 - 2 * cdf_alpha) * alpha * pdf_alpha - pdf_alpha ** 2 + F2[alpha] = f2_alpha + + mu = mu2 + sigma_m * f1_alpha + #sigma_old = sigma2 ** 2 + (sigma1 ** 2 - sigma2 ** 2) * cdf_alpha + sigma_m ** 2 * f2_alpha + sigma = math.sqrt((mu1**2 + sigma1**2) * cdf_alpha + (mu2**2 + sigma2**2) * (1 - cdf_alpha) + (mu1 + mu2) * sigma_m * pdf_alpha - mu**2) + + return mu, sigma + except ValueError: + print(mu1, sigma1, mu2, sigma2) + exit(1) + + +def beta_mean(alpha, beta): + return alpha / (alpha + beta) + + +def beta_std(alpha, beta): + try: + return math.sqrt((alpha * beta) / ((alpha * beta)**2 * (alpha + beta + 1))) + except ZeroDivisionError: + print(alpha, beta) + + +def gaussian_ucb1(mu, sigma, N) -> float: + return mu + math.sqrt(2 * math.log(N) * sigma)