diff --git a/.gitignore b/.gitignore index 85c7f25..58660d9 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ /stockfish/ -.idea \ No newline at end of file +.idea +.venv \ No newline at end of file diff --git a/__pycache__/engine.cpython-310.pyc b/__pycache__/engine.cpython-310.pyc new file mode 100644 index 0000000..3088e8e Binary files /dev/null and b/__pycache__/engine.cpython-310.pyc differ diff --git a/__pycache__/eval.cpython-310.pyc b/__pycache__/eval.cpython-310.pyc new file mode 100644 index 0000000..4371a34 Binary files /dev/null and b/__pycache__/eval.cpython-310.pyc differ diff --git a/__pycache__/mcts.cpython-310.pyc b/__pycache__/mcts.cpython-310.pyc new file mode 100644 index 0000000..61d75e9 Binary files /dev/null and b/__pycache__/mcts.cpython-310.pyc differ diff --git a/engine.py b/engine.py index 01413ed..0051260 100644 --- a/engine.py +++ b/engine.py @@ -2,30 +2,8 @@ import chess import chess.engine import random import eval - - -def main(): - fools_mate = "rnbqkbnr/pppp1ppp/4p3/8/5PP1/8/PPPPP2P/RNBQKBNR b KQkq f3 0 2" - board = chess.Board(fools_mate) - print(board, '\n') - moves = {} - for i in range(10): - move = pick_move(board) - if move is None: - break - - simulate_game(board, move, 100) - moves[move] = board - board = chess.Board(fools_mate) - - analyze_results(moves) - - -def analyze_results(moves: dict): - for m, b in moves.items(): - manual_score = eval.score_game(b) - engine_score = eval.analyze_with_stockfish(b) - print(f"score for move {m}: manual_score={manual_score}, engine_score={engine_score}") +import numpy as np +from stockfish import Stockfish def pick_move(board: chess.Board) -> chess.Move | None: @@ -49,19 +27,54 @@ def simulate_game(board: chess.Board, move: chess.Move, depth: int): """ engine = chess.engine.SimpleEngine.popen_uci("./stockfish/stockfish-ubuntu-x86-64-avx2") board.push(move) - print(move) - print(board, '\n') for i in range(depth): if board.is_game_over(): engine.quit() return r = engine.play(board, chess.engine.Limit(depth=2)) - print(r) board.push(r.move) - print(board, '\n') engine.quit() -if __name__ == '__main__': - main() +def simulate_stockfish_prob(board: chess.Board, move: chess.Move, games: int = 10, depth: int = 10) -> (float, float): + """ + Simulate a game using + :param board: chess board + :param move: chosen move + :param games: number of games that should be simulated after playing the move + :param depth: simulation depth per game + :return: + """ + board.push(move) + copied_board = board.copy() + scores = [] + + stockfish = Stockfish("./stockfish/stockfish-ubuntu-x86-64-avx2", depth=2, parameters={"Threads": 8, "Hash": 2048}) + stockfish.set_elo_rating(1200) + stockfish.set_fen_position(board.fen()) + + def reset_game(): + nonlocal scores, copied_board, board + score = eval.score_stockfish(copied_board).white().score(mate_score=100_000) + scores.append(score) + copied_board = board.copy() + stockfish.set_fen_position(board.fen()) + + for _ in range(games): + for d in range(depth): + if copied_board.is_game_over() or d == depth - 1: + reset_game() + break + + if d == depth - 1: + reset_game() + + top_moves = stockfish.get_top_moves(3) + chosen_move = random.choice(top_moves)['Move'] + stockfish.make_moves_from_current_position([chosen_move]) + copied_board.push(chess.Move.from_uci(chosen_move)) + + print(scores) + # TODO: return distribution here? + return np.array(scores).mean(), np.array(scores).std() diff --git a/eval.py b/eval.py index cda3c56..3393a6e 100644 --- a/eval.py +++ b/eval.py @@ -1,5 +1,6 @@ import chess import chess.engine +import sys # Eval constants for scoring chess boards # Evaluation metric inspired by Tomasz Michniewski: https://www.chessprogramming.org/Simplified_Evaluation_Function @@ -136,9 +137,7 @@ def check_endgame(board: chess.Board) -> bool: return (queens_black == 0 and queens_white == 0) or ((queens_black >= 1 and minors_black <= 1) or (queens_white >= 1 and minors_white <= 1)) - - -def score_game(board: chess.Board) -> float: +def score_manual(board: chess.Board) -> int: """ Calculate the score of the given board regarding the given color :param board: the chess board @@ -147,7 +146,7 @@ def score_game(board: chess.Board) -> float: outcome = board.outcome() if outcome is not None: if outcome.termination == chess.Termination.CHECKMATE: - return float('inf') if outcome.winner == chess.WHITE else float('-inf') + return sys.maxsize if outcome.winner == chess.WHITE else -sys.maxsize else: # draw return 0 @@ -171,13 +170,13 @@ def score_game(board: chess.Board) -> float: return score -def analyze_with_stockfish(board: chess.Board) -> chess.engine.PovScore: +def score_stockfish(board: chess.Board) -> chess.engine.PovScore: """ Calculate the score of the given board using stockfish :param board: :return: """ engine = chess.engine.SimpleEngine.popen_uci("./stockfish/stockfish-ubuntu-x86-64-avx2") - info = engine.analyse(board, chess.engine.Limit(depth=20)) + info = engine.analyse(board, chess.engine.Limit(depth=2)) engine.quit() return info["score"] diff --git a/main.py b/main.py new file mode 100644 index 0000000..5171f16 --- /dev/null +++ b/main.py @@ -0,0 +1,61 @@ +import chess +import chess.engine +from mcts import MCTSNode +import engine +import eval + + +def test_mcts(): + fools_mate = "rnbqkbnr/pppp1ppp/4p3/8/5PP1/8/PPPPP2P/RNBQKBNR b KQkq f3 0 2" + board = chess.Board(fools_mate) + mcts_root = MCTSNode(board) + mcts_root.build_tree() + sorted_moves = sorted(mcts_root.children, key=lambda x: x.move.uci()) + for c in sorted_moves: + print("move (mcts):", c.move, " with score:", c.score) + + +def test_stockfish(): + fools_mate = "rnbqkbnr/pppp1ppp/4p3/8/5PP1/8/PPPPP2P/RNBQKBNR b KQkq f3 0 2" + board = chess.Board(fools_mate) + moves = {} + untried_moves = list(board.legal_moves) + for move in untried_moves: + engine.simulate_game(board, move, 100) + moves[move] = board + board = chess.Board(fools_mate) + + sorted_moves = dict(sorted(moves.items(), key=lambda x: x[0].uci())) + analyze_results(sorted_moves) + + +def test_stockfish_prob(): + fools_mate = "rnbqkbnr/pppp1ppp/4p3/8/5PP1/8/PPPPP2P/RNBQKBNR b KQkq f3 0 2" + board = chess.Board(fools_mate) + moves = {} + untried_moves = list(board.legal_moves) + for move in untried_moves: + mean, std = engine.simulate_stockfish_prob(board, move, 10, 4) + moves[move] = (mean, std) + board = chess.Board(fools_mate) + + sorted_moves = dict(sorted(moves.items(), key=lambda x: x[0].uci())) + for m, s in sorted_moves.items(): + print(f"move '{m.uci()}' (prob_stockfish): mean={s[0]}, std={s[1]}") + + +def analyze_results(moves: dict): + for m, b in moves.items(): + manual_score = eval.score_manual(b) + engine_score = eval.score_stockfish(b).white() + print(f"score for move {m}: manual_score={manual_score}, engine_score={engine_score}") + + +def main(): + test_mcts() + test_stockfish() + test_stockfish_prob() + + +if __name__ == '__main__': + main() diff --git a/mcts.py b/mcts.py new file mode 100644 index 0000000..df551d6 --- /dev/null +++ b/mcts.py @@ -0,0 +1,105 @@ +import chess +import random +import eval +import engine +import numpy as np + + +class MCTSNode: + def __init__(self, board: chess.Board, parent = None, move: chess.Move | None = None, random_state: int | None = None): + self.random = random.Random(random_state) + self.board = board + self.parent = parent + self.move = move + self.children = [] + self.visits = 0 + self.legal_moves = list(board.legal_moves) + self.untried_actions = self.legal_moves + self.score = 0 + + def _expand(self) -> 'MCTSNode': + """ + Expands the node, i.e., choose an action and apply it to the board + :return: + """ + move = self.random.choice(self.untried_actions) + self.untried_actions.remove(move) + next_board = self.board.copy() + next_board.push(move) + child_node = MCTSNode(next_board, parent=self, move=move) + self.children.append(child_node) + return child_node + + def _rollout(self, rollout_depth: int = 20) -> int: + """ + Rolls out the node by simulating a game for a given depth. + Sometimes this step is called 'simulation' or 'playout'. + :return: the score of the rolled out game + """ + copied_board = self.board.copy() + steps = 1 + for i in range(rollout_depth): + if copied_board.is_game_over(): + break + + m = engine.pick_move(copied_board) + copied_board.push(m) + steps += 1 + + return eval.score_manual(copied_board) // steps + + def _backpropagate(self, score: float) -> None: + """ + Backpropagates the results of the rollout + :param score: + :return: + """ + self.visits += 1 + # TODO: maybe use score + num of moves together (a win in 1 move is better than a win in 20 moves) + self.score += score + if self.parent: + self.parent._backpropagate(score) + + def is_fully_expanded(self) -> bool: + return len(self.untried_actions) == 0 + + def _best_child(self) -> 'MCTSNode': + """ + Picks the best child according to our policy + :return: the best child + """ + # NOTE: maybe clamp the score between [-1, +1] instead of [-inf, +inf] + choices_weights = [(c.score / c.visits) + np.sqrt(((2 * np.log(self.visits)) / c.visits)) + for c in self.children] + return self.children[np.argmax(choices_weights)] + + def _select_leaf(self) -> 'MCTSNode': + """ + Selects a leaf node. + If the node is not expanded is will be expanded. + :return: Leaf node + """ + current_node = self + while not current_node.board.is_game_over(): + if not current_node.is_fully_expanded(): + return current_node._expand() + else: + current_node = current_node._best_child() + + return current_node + + def build_tree(self, samples: int = 1000) -> 'MCTSNode': + """ + Runs the MCTS with the given number of samples + :param samples: number of simulations + :return: best node containing the best move + """ + for i in range(samples): + # selection & expansion + # rollout + # backpropagate score + node = self._select_leaf() + score = node._rollout() + node._backpropagate(score) + + return self._best_child() diff --git a/requirements.txt b/requirements.txt index a4adb10..b83defa 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1,3 @@ -chess==1.10.0 \ No newline at end of file +chess==1.10.0 +numpy==1.26.3 +stockfish==3.28.0 \ No newline at end of file