Added basic bayes-mcts using beta distribution

This commit is contained in:
Theo Haslinger
2024-01-28 15:18:13 +01:00
parent c667a263a7
commit 2662dbf53a
10 changed files with 428 additions and 119 deletions

15
main.py
View File

@@ -1,7 +1,10 @@
import random
import chess import chess
import chess.engine import chess.engine
import chess.pgn import chess.pgn
from src.chesspp.classic_mcts import ClassicMcts from src.chesspp.classic_mcts import ClassicMcts
from src.chesspp.baysian_mcts import BayesianMcts
from src.chesspp.random_strategy import RandomStrategy
from src.chesspp import engine from src.chesspp import engine
from src.chesspp import util from src.chesspp import util
from src.chesspp import simulation, eval from src.chesspp import simulation, eval
@@ -24,6 +27,18 @@ def test_mcts():
print("move (mcts):", c.move, " with score:", c.score) print("move (mcts):", c.move, " with score:", c.score)
def test_bayes_mcts():
global lookup_count
fools_mate = "rnbqkbnr/pppp1ppp/4p3/8/5PP1/8/PPPPP2P/RNBQKBNR b KQkq f3 0 2"
board = chess.Board(fools_mate)
seed = 1
stategy = RandomStrategy(random.Random(seed))
mcts = BayesianMcts(board, stategy, seed)
mcts.sample()
for c in mcts.get_children():
print("move (mcts):", c.move, " with score:", c.mu)
def test_stockfish(): def test_stockfish():
fools_mate = "rnbqkbnr/pppp1ppp/4p3/8/5PP1/8/PPPPP2P/RNBQKBNR b KQkq f3 0 2" fools_mate = "rnbqkbnr/pppp1ppp/4p3/8/5PP1/8/PPPPP2P/RNBQKBNR b KQkq f3 0 2"
board = chess.Board(fools_mate) board = chess.Board(fools_mate)

View File

@@ -1,4 +1,6 @@
chess==1.10.0 chess==1.10.0
numpy==1.26.3 numpy==1.26.3
stockfish==3.28.0 stockfish==3.28.0
torch==2.1.2
pytest
aiohttp aiohttp

145
src/chesspp/baysian_mcts.py Normal file
View File

@@ -0,0 +1,145 @@
import chess
from src.chesspp.i_mcts import *
from src.chesspp.i_strategy import IStrategy
from src.chesspp.util_gaussian import gaussian_ucb1, max_gaussian, beta_std, beta_mean
from src.chesspp.eval import *
import numpy as np
import math
class BayesianMctsNode(IMctsNode):
def __init__(self, board: chess.Board, strategy: IStrategy, parent: Self | None, move: chess.Move | None,
random_state: random.Random, inherit_results: list[int] | None = None):
super().__init__(board, strategy, parent, move, random_state)
self.visits = 0
self.results = inherit_results.copy() if inherit_results is not None else [1, 1]
self._set_mu_sigma()
def _create_child(self, move: chess.Move):
copied_board = self.board.copy()
copied_board.push(move)
return BayesianMctsNode(copied_board, self.strategy, self, move, self.random_state, inherit_results=self.results)
def _set_mu_sigma(self):
alpha = self.results[0]
beta = self.results[1]
self.mu = beta_mean(alpha, beta)
self.sigma = beta_std(alpha, beta)
def _select_child(self) -> IMctsNode:
# select child by modified UCB1
if self.board.is_game_over():
return self
best_child = self.random_state.choice(self.children)
best_val = gaussian_ucb1(best_child.mu, best_child.sigma, self.visits)
for c in self.children:
g = gaussian_ucb1(c.mu, c.sigma, self.visits)
if g > best_val:
best_val = g
best_child = c
return best_child
def select(self) -> IMctsNode:
if len(self.children) == 0:
return self
else:
return self._select_child().select()
def expand(self) -> IMctsNode:
if self.visits == 0:
return self
for move in self.legal_moves:
self.children.append(self._create_child(move))
return self._select_child()
def rollout(self, rollout_depth: int = 20) -> int:
copied_board = self.board.copy()
steps = 1
for i in range(rollout_depth):
if copied_board.is_game_over():
break
m = self.strategy.pick_next_move(copied_board)
if m is None:
break
copied_board.push(m)
steps += 1
score = eval.score_manual(copied_board) // steps
if score > 0:
self.results[1] += 1
else:
self.results[0] += abs(score) // 50_000
return score
def backpropagate(self, score: int | None = None) -> None:
self.visits += 1
if score is not None:
self.results.append(score)
if len(self.children) == 0:
# leaf node
self._set_mu_sigma()
else:
# interior node
shuffled_children = self.random_state.sample(self.children, len(self.children))
max_mu = shuffled_children[0].mu
max_sigma = shuffled_children[0].sigma
for c in shuffled_children[1:]:
max_mu, max_sigma = max_gaussian(max_mu, max_sigma, c.mu, c.sigma)
if max_sigma == 0:
max_sigma = 0.001
self.mu = max_mu
self.sigma = max_sigma
if self.parent:
self.parent.backpropagate()
def print(self, indent=0):
print("\t"*indent + f"visits={self.visits}, mu={self.mu}, sigma={self.sigma}")
for c in self.children:
c.print(indent+1)
class BayesianMcts(IMcts):
def __init__(self, board: chess.Board, strategy: IStrategy, seed: int | None = None):
super().__init__(board, strategy, seed)
self.root = BayesianMctsNode(board, strategy, None, None, self.random_state)
self.root.visits += 1
def sample(self, runs: int = 1000) -> None:
for i in range(runs):
#print(f"sample {i}")
leaf_node = self.root.select().expand()
_ = leaf_node.rollout()
leaf_node.backpropagate()
#self.root.print()
def apply_move(self, move: chess.Move) -> None:
self.board.push(move)
# if a child node contains the move, set this child as new root
for child in self.get_children():
if child.move == move:
self.root = child
self.root.parent = None
return
# if no child node contains the move, initialize a new tree.
self.root = BayesianMctsNode(self.board, self.root.strategy, None, None, self.random_state)
def get_children(self) -> list[IMctsNode]:
return self.root.children
def print(self):
print("================================")
self.root.print()

View File

@@ -1,111 +1,111 @@
import chess import chess
import random import random
import numpy as np import numpy as np
from chesspp import eval from src.chesspp import eval
from chesspp import util from src.chesspp import util
class ClassicMcts: class ClassicMcts:
def __init__(self, board: chess.Board, color: chess.Color, parent=None, move: chess.Move | None = None, def __init__(self, board: chess.Board, color: chess.Color, parent=None, move: chess.Move | None = None,
random_state: int | None = None): random_state: int | None = None):
self.random = random.Random(random_state) self.random = random.Random(random_state)
self.board = board self.board = board
self.color = color self.color = color
self.parent = parent self.parent = parent
self.move = move self.move = move
self.children = [] self.children = []
self.visits = 0 self.visits = 0
self.legal_moves = list(board.legal_moves) self.legal_moves = list(board.legal_moves)
self.untried_actions = self.legal_moves self.untried_actions = self.legal_moves
self.score = 0 self.score = 0
def _expand(self) -> 'ClassicMcts': def _expand(self) -> 'ClassicMcts':
""" """
Expands the node, i.e., choose an action and apply it to the board Expands the node, i.e., choose an action and apply it to the board
:return: :return:
""" """
move = self.random.choice(self.untried_actions) move = self.random.choice(self.untried_actions)
self.untried_actions.remove(move) self.untried_actions.remove(move)
next_board = self.board.copy() next_board = self.board.copy()
next_board.push(move) next_board.push(move)
child_node = ClassicMcts(next_board, color=self.color, parent=self, move=move) child_node = ClassicMcts(next_board, color=self.color, parent=self, move=move)
self.children.append(child_node) self.children.append(child_node)
return child_node return child_node
def _rollout(self, rollout_depth: int = 20) -> int: def _rollout(self, rollout_depth: int = 20) -> int:
""" """
Rolls out the node by simulating a game for a given depth. Rolls out the node by simulating a game for a given depth.
Sometimes this step is called 'simulation' or 'playout'. Sometimes this step is called 'simulation' or 'playout'.
:return: the score of the rolled out game :return: the score of the rolled out game
""" """
copied_board = self.board.copy() copied_board = self.board.copy()
steps = 1 steps = 1
for i in range(rollout_depth): for i in range(rollout_depth):
if copied_board.is_game_over(): if copied_board.is_game_over():
break break
m = util.pick_move(copied_board) m = util.pick_move(copied_board)
copied_board.push(m) copied_board.push(m)
steps += 1 steps += 1
return eval.score_manual(copied_board) // steps return eval.score_manual(copied_board) // steps
def _backpropagate(self, score: float) -> None: def _backpropagate(self, score: float) -> None:
""" """
Backpropagates the results of the rollout Backpropagates the results of the rollout
:param score: :param score:
:return: :return:
""" """
self.visits += 1 self.visits += 1
# TODO: maybe use score + num of moves together (a win in 1 move is better than a win in 20 moves) # TODO: maybe use score + num of moves together (a win in 1 move is better than a win in 20 moves)
self.score += score self.score += score
if self.parent: if self.parent:
self.parent._backpropagate(score) self.parent._backpropagate(score)
def is_fully_expanded(self) -> bool: def is_fully_expanded(self) -> bool:
return len(self.untried_actions) == 0 return len(self.untried_actions) == 0
def _best_child(self) -> 'ClassicMcts': def _best_child(self) -> 'ClassicMcts':
""" """
Picks the best child according to our policy Picks the best child according to our policy
:return: the best child :return: the best child
""" """
# NOTE: maybe clamp the score between [-1, +1] instead of [-inf, +inf] # NOTE: maybe clamp the score between [-1, +1] instead of [-inf, +inf]
choices_weights = [(c.score / c.visits) + np.sqrt(((2 * np.log(self.visits)) / c.visits)) choices_weights = [(c.score / c.visits) + np.sqrt(((2 * np.log(self.visits)) / c.visits))
for c in self.children] for c in self.children]
best_child_index = np.argmax(choices_weights) if self.color == chess.WHITE else np.argmin(choices_weights) best_child_index = np.argmax(choices_weights) if self.color == chess.WHITE else np.argmin(choices_weights)
return self.children[best_child_index] return self.children[best_child_index]
def _select_leaf(self) -> 'ClassicMcts': def _select_leaf(self) -> 'ClassicMcts':
""" """
Selects a leaf node. Selects a leaf node.
If the node is not expanded is will be expanded. If the node is not expanded is will be expanded.
:return: Leaf node :return: Leaf node
""" """
current_node = self current_node = self
while not current_node.board.is_game_over(): while not current_node.board.is_game_over():
if not current_node.is_fully_expanded(): if not current_node.is_fully_expanded():
return current_node._expand() return current_node._expand()
else: else:
current_node = current_node._best_child() current_node = current_node._best_child()
return current_node return current_node
def build_tree(self, samples: int = 1000) -> 'ClassicMcts': def build_tree(self, samples: int = 1000) -> 'ClassicMcts':
""" """
Runs the MCTS with the given number of samples Runs the MCTS with the given number of samples
:param samples: number of simulations :param samples: number of simulations
:return: best node containing the best move :return: best node containing the best move
""" """
for i in range(samples): for i in range(samples):
# selection & expansion # selection & expansion
# rollout # rollout
# backpropagate score # backpropagate score
node = self._select_leaf() node = self._select_leaf()
score = node._rollout() score = node._rollout()
node._backpropagate(score) node._backpropagate(score)
return self._best_child() return self._best_child()

View File

@@ -3,7 +3,7 @@ import chess
import chess.engine import chess.engine
import random import random
import time import time
from chesspp.classic_mcts import ClassicMcts from src.chesspp.classic_mcts import ClassicMcts
class Limit: class Limit:
""" Class to determine when to stop searching for moves """ """ Class to determine when to stop searching for moves """

View File

@@ -1,13 +1,61 @@
import chess import chess
import random
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from typing import Dict from typing import Dict, Self
from chesspp.i_strategy import IStrategy from src.chesspp.i_strategy import IStrategy
class IMctsNode(ABC):
def __init__(self, board: chess.Board, strategy: IStrategy, parent: Self | None, move: chess.Move | None,
random_state: random.Random):
self.board = board
self.strategy = strategy
self.parent = parent
self.children = []
self.move = move
self.legal_moves = list(board.legal_moves)
self.random_state = random_state
@abstractmethod
def select(self) -> Self:
"""
Selects the next node leaf node in the tree
:return:
"""
pass
@abstractmethod
def expand(self) -> Self:
"""
Expands this node creating X child leaf nodes, i.e., choose an action and apply it to the board
:return:
"""
pass
@abstractmethod
def rollout(self, rollout_depth: int = 20) -> int:
"""
Rolls out the node by simulating a game for a given depth.
Sometimes this step is called 'simulation' or 'playout'.
:return: the score of the rolled out game
"""
pass
@abstractmethod
def backpropagate(self, score: float) -> None:
"""
Backpropagates the results of the rollout
:param score:
:return:
"""
pass
class IMcts(ABC): class IMcts(ABC):
def __init__(self, board: chess.Board, strategy: IStrategy, seed: int | None):
def __init__(self, board: chess.Board, strategy: IStrategy):
self.board = board self.board = board
self.strategy = strategy
self.random_state = random.Random(seed)
@abstractmethod @abstractmethod
def sample(self, runs: int = 1000) -> None: def sample(self, runs: int = 1000) -> None:
@@ -28,7 +76,7 @@ class IMcts(ABC):
pass pass
@abstractmethod @abstractmethod
def get_children(self) -> list['IMcts']: def get_children(self) -> list[IMctsNode]:
""" """
Return the immediate children of the root node Return the immediate children of the root node
:return: list of immediate children of mcts root :return: list of immediate children of mcts root

View File

@@ -1,8 +1,11 @@
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
import chess
# TODO extend class # TODO extend class
class IStrategy(ABC): class IStrategy(ABC):
@abstractmethod @abstractmethod
def pick_next_move(self, ): def pick_next_move(self, board: chess.Board) -> chess.Move:
pass pass

View File

@@ -0,0 +1,13 @@
import chess
import random
from src.chesspp.i_strategy import IStrategy
class RandomStrategy(IStrategy):
def __init__(self, random_state: random.Random):
self.random_state = random_state
def pick_next_move(self, board: chess.Board) -> chess.Move | None:
if len(list(board.legal_moves)) == 0:
return None
return self.random_state.choice(list(board.legal_moves))

View File

@@ -6,7 +6,7 @@ from typing import Tuple, List
from enum import Enum from enum import Enum
from dataclasses import dataclass from dataclasses import dataclass
from chesspp.engine import Engine, Limit from src.chesspp.engine import Engine, Limit
class Winner(Enum): class Winner(Enum):

View File

@@ -0,0 +1,83 @@
import math
import torch
import torch.distributions as dist
from torch import exp
F1: dict[float, float] = {}
F2: dict[float, float] = {}
CDF: dict[float, float] = {}
lookup_count = 0
def max_gaussian_numeric(mu1, sigma1, mu2, sigma2) -> (float, float):
pass
def max_gaussian(mu1, sigma1, mu2, sigma2) -> (float, float):
global lookup_count
global F1
global F2
global CDF
"""
Returns the combined max gaussian of two Gaussians represented by mu1, sigma1, mu2, simga2
:param mu1: mu of the first Gaussian
:param sigma1: sigma of the first Gaussian
:param mu2: mu of the second Gaussian
:param sigma2: sigma of the second Gaussian
"""
# we assume independence of the two gaussians
try:
#print(mu1, sigma1, mu2, sigma2)
normal = dist.Normal(0, 1)
sigma_m = math.sqrt(sigma1 ** 2 + sigma2 ** 2)
alpha = (mu1 - mu2) / sigma_m
if alpha in CDF:
cdf_alpha = CDF[alpha]
lookup_count += 1
else:
cdf_alpha = normal.cdf(torch.tensor(alpha)).item()
CDF[alpha] = cdf_alpha
pdf_alpha = exp(normal.log_prob(torch.tensor(alpha))).item()
if alpha in F1:
f1_alpha = F1[alpha]
lookup_count += 1
else:
f1_alpha = alpha * cdf_alpha + pdf_alpha
F1[alpha] = f1_alpha
if alpha in F2:
f2_alpha = F2[alpha]
lookup_count += 1
else:
f2_alpha = alpha ** 2 * cdf_alpha * (1 - cdf_alpha) + (
1 - 2 * cdf_alpha) * alpha * pdf_alpha - pdf_alpha ** 2
F2[alpha] = f2_alpha
mu = mu2 + sigma_m * f1_alpha
#sigma_old = sigma2 ** 2 + (sigma1 ** 2 - sigma2 ** 2) * cdf_alpha + sigma_m ** 2 * f2_alpha
sigma = math.sqrt((mu1**2 + sigma1**2) * cdf_alpha + (mu2**2 + sigma2**2) * (1 - cdf_alpha) + (mu1 + mu2) * sigma_m * pdf_alpha - mu**2)
return mu, sigma
except ValueError:
print(mu1, sigma1, mu2, sigma2)
exit(1)
def beta_mean(alpha, beta):
return alpha / (alpha + beta)
def beta_std(alpha, beta):
try:
return math.sqrt((alpha * beta) / ((alpha * beta)**2 * (alpha + beta + 1)))
except ZeroDivisionError:
print(alpha, beta)
def gaussian_ucb1(mu, sigma, N) -> float:
return mu + math.sqrt(2 * math.log(N) * sigma)