From b2ce73f06bcf893529c4012f23b7a47765d45c67 Mon Sep 17 00:00:00 2001 From: luk3k Date: Tue, 30 Jan 2024 19:44:09 +0100 Subject: [PATCH] added engine factory and adapted command line parsing, added lc0_strategy.py, added stockfish and lc0 standalone engines --- chesspp/engine.py | 37 ++++++++++++++--- chesspp/engine_factory.py | 76 +++++++++++++++++++++++++++++++++++ chesspp/eval.py | 34 ++++++++++------ chesspp/i_strategy.py | 5 ++- chesspp/lc0_strategy.py | 39 ++++++++++++++++++ chesspp/random_strategy.py | 3 +- chesspp/simulation.py | 27 ++++++++----- chesspp/stockfish_strategy.py | 12 +++--- main.py | 37 ++++++++--------- 9 files changed, 214 insertions(+), 56 deletions(-) create mode 100644 chesspp/engine_factory.py create mode 100644 chesspp/lc0_strategy.py diff --git a/chesspp/engine.py b/chesspp/engine.py index da08b64..0c40a3f 100644 --- a/chesspp/engine.py +++ b/chesspp/engine.py @@ -1,5 +1,6 @@ import random import time +import os from abc import ABC, abstractmethod from torch import distributions as dist @@ -15,13 +16,13 @@ from typing import Dict class Limit: """ Class to determine when to stop searching for moves """ - time: float|None + time: float | None """ Search for `time` seconds """ - nodes: int|None + nodes: int | None """ Search for a limited number of `nodes`""" - def __init__(self, time: float|None = None, nodes: int|None = None): + def __init__(self, time: float | None = None, nodes: int | None = None): self.time = time self.nodes = nodes @@ -44,7 +45,7 @@ class Limit: def _run_time(self, func, *args, **kwargs): start = time.perf_counter_ns() - while (time.perf_counter_ns()-start)/1e9 < self.time: + while (time.perf_counter_ns() - start) / 1e9 < self.time: func(*args, **kwargs) @@ -56,7 +57,7 @@ class Engine(ABC): strategy: IStrategy """The strategy used to pick moves when simulating games.""" - def __init__(self, board: chess.Board, color: chess.Color, strategy: IStrategy): + def __init__(self, board: chess.Board, color: chess.Color, strategy: IStrategy | None): self.board = board self.color = color self.strategy = strategy @@ -138,3 +139,29 @@ class RandomEngine(Engine): def play(self, board: chess.Board, limit: Limit) -> chess.engine.PlayResult: move = random.choice(list(board.legal_moves)) return chess.engine.PlayResult(move=move, ponder=None) + + +class StockFishEngine(Engine): + def __init__(self, board: chess.Board, color: chess, path="../stockfish/stockfish-ubuntu-x86-64-avx2"): + super().__init__(board, color, None) + self.stockfish = chess.engine.SimpleEngine.popen_uci(path) + + def play(self, board: chess.Board, limit: Limit) -> chess.engine.PlayResult: + return self.stockfish.play(board, limit) + + @staticmethod + def get_name() -> str: + return "Stockfish" + + +class Lc0Engine(Engine): + def __init__(self, board: chess.Board, color: chess, path="../lc0/lc0"): + super().__init__(board, color, None) + self.lc0 = chess.engine.SimpleEngine.popen_uci(path) + + def play(self, board: chess.Board, limit: Limit) -> chess.engine.PlayResult: + return self.lc0.play(board, limit) + + @staticmethod + def get_name() -> str: + return "Lc0" diff --git a/chesspp/engine_factory.py b/chesspp/engine_factory.py new file mode 100644 index 0000000..c257ba5 --- /dev/null +++ b/chesspp/engine_factory.py @@ -0,0 +1,76 @@ +from enum import Enum + +from chesspp.engine import * +from chesspp.lc0_strategy import Lc0Strategy +from chesspp.random_strategy import RandomStrategy +from chesspp.stockfish_strategy import StockFishStrategy +from chesspp.i_strategy import IStrategy +import chess + + +class EngineEnum(Enum): + ClassicMcts = 0 + BayesianMcts = 1 + Stockfish = 2 + Lc0 = 3 + Random = 4 + + +class StrategyEnum(Enum): + Stockfish = 0 + Lc0 = 1 + Random = 2 + + +class EngineFactory: + + @staticmethod + def create_engine(engine_name: EngineEnum, strategy_name: StrategyEnum, color: chess.Color, stockfish_path: str, lc0_path: str, rollout_depth: int = 4) -> Engine: + match strategy_name: + case StrategyEnum.Stockfish: + strategy = EngineFactory._get_stockfish_strategy(stockfish_path, rollout_depth) + case StrategyEnum.Lc0: + strategy = EngineFactory._get_lc0_strategy(lc0_path, rollout_depth) + case StrategyEnum.Random: + strategy = EngineFactory._get_random_strategy(rollout_depth) + + match engine_name: + case EngineEnum.ClassicMcts: + return EngineFactory.classic_mcts(color, strategy) + + case EngineEnum.BayesianMcts: + return EngineFactory.bayesian_mcts(color, strategy) + + case EngineEnum.Stockfish: + return EngineFactory.stockfish_engine(color, stockfish_path) + + case EngineEnum.Lc0: + return EngineFactory.lc0_engine(color, lc0_path) + + @staticmethod + def stockfish_engine(color: chess.Color, engine_path: str, board: chess.Board | None = chess.Board()) -> Engine: + return StockFishEngine(board, color, engine_path) + + @staticmethod + def lc0_engine(color: chess.Color, engine_path: str, board: chess.Board | None = chess.Board()) -> Engine: + return Lc0Engine(board, color, engine_path) + + @staticmethod + def bayesian_mcts(color: chess.Color, strategy: IStrategy, board: chess.Board | None = chess.Board()) -> Engine: + return BayesMctsEngine(board, color, strategy) + + @staticmethod + def classic_mcts(color: chess.Color, strategy: IStrategy, board: chess.Board | None = chess.Board()) -> Engine: + return ClassicMctsEngine(board, color, strategy) + + @staticmethod + def _get_random_strategy(rollout_depth: int) -> IStrategy: + return RandomStrategy(random.Random(), rollout_depth) + + @staticmethod + def _get_stockfish_strategy(engine_path: str, rollout_depth: int) -> IStrategy: + return StockFishStrategy(engine_path, rollout_depth) + + @staticmethod + def _get_lc0_strategy(engine_path: str, rollout_depth: int) -> IStrategy: + return Lc0Strategy(engine_path, rollout_depth) diff --git a/chesspp/eval.py b/chesspp/eval.py index 375b1f5..ab171a7 100644 --- a/chesspp/eval.py +++ b/chesspp/eval.py @@ -76,12 +76,12 @@ king_eval = [ ] king_endgame_eval = [ 50, -30, -30, -30, -30, -30, -30, -50, - -30, -30, 0, 0, 0, 0, -30, -30, + -30, -30, 0, 0, 0, 0, -30, -30, -30, -10, 20, 30, 30, 20, -10, -30, -30, -10, 30, 40, 40, 30, -10, -30, -30, -10, 30, 40, 40, 30, -10, -30, -30, -10, 20, 30, 30, 20, -10, -30, - -30, -20, -10, 0, 0, -10, -20, -30, + -30, -20, -10, 0, 0, -10, -20, -30, -50, -40, -30, -20, -20, -30, -40, -50 ] @@ -124,18 +124,18 @@ def check_endgame(board: chess.Board) -> bool: if piece.piece_type == chess.QUEEN: if piece.color == chess.WHITE: - queens_white += 1 + queens_white += 1 else: queens_black += 1 if piece.piece_type == chess.BISHOP or piece.piece_type == chess.KNIGHT: if piece.color == chess.WHITE: - minors_white += 1 + minors_white += 1 else: minors_black += 1 return (queens_black == 0 and queens_white == 0) or ((queens_black >= 1 >= minors_black) or ( - queens_white >= 1 >= minors_white)) + queens_white >= 1 >= minors_white)) def score_manual(board: chess.Board) -> int: @@ -173,30 +173,38 @@ def score_manual(board: chess.Board) -> int: return score -def score_stockfish(board: chess.Board, stockfish: chess.engine.SimpleEngine | None = None) -> int: +def score_stockfish(board: chess.Board, stockfish: chess.engine.SimpleEngine | None = None, + limit: chess.engine.Limit = chess.engine.Limit(depth=0)) -> int: """ Calculate the score of the given board using stockfish :param board: + :param stockfish: + :param limit: :return: """ if stockfish is None: engine = chess.engine.SimpleEngine.popen_uci( "/home/luke/projects/pp-project/chess-engine-pp/stockfish/stockfish-ubuntu-x86-64-avx2") - info = engine.analyse(board, chess.engine.Limit(depth=0)) + info = engine.analyse(board, limit) engine.quit() return info['score'].white().score(mate_score=100_000) else: - info = stockfish.analyse(board, chess.engine.Limit(depth=0)) + info = stockfish.analyse(board, limit) return info['score'].white().score(mate_score=100_000) -def score_lc0(board: chess.Board) -> chess.engine.PovScore: +def score_lc0(board: chess.Board, lc0: chess.engine.SimpleEngine | None = None, + limit: chess.engine.Limit= chess.engine.Limit(depth=0)) -> int: """ Calculate the score of the given board using lc0 :param board: :return: """ - engine = chess.engine.SimpleEngine.popen_uci("/home/luke/projects/pp-project/chess-engine-pp/lc0/lc0") - info = engine.analyse(board, chess.engine.Limit(depth=4)) - engine.quit() - return info["score"] + if lc0 is None: + engine = chess.engine.SimpleEngine.popen_uci("/home/luke/projects/pp-project/chess-engine-pp/lc0/lc0") + info = engine.analyse(board, limit) + engine.quit() + return info["score"] + else: + info = lc0.analyse(board, limit) + return info['score'].white().score(mate_score=100_000) diff --git a/chesspp/i_strategy.py b/chesspp/i_strategy.py index d985b79..b37b0cb 100644 --- a/chesspp/i_strategy.py +++ b/chesspp/i_strategy.py @@ -3,8 +3,11 @@ from abc import ABC, abstractmethod import chess -# TODO extend class class IStrategy(ABC): + rollout_depth: int + + def __init__(self, rollout_depth: int = 4): + self.rollout_depth = rollout_depth @abstractmethod def pick_next_move(self, board: chess.Board) -> chess.Move: diff --git a/chesspp/lc0_strategy.py b/chesspp/lc0_strategy.py new file mode 100644 index 0000000..cb1fe11 --- /dev/null +++ b/chesspp/lc0_strategy.py @@ -0,0 +1,39 @@ +import chess +import chess.engine +import os + +from chesspp.i_strategy import IStrategy +from chesspp.eval import score_lc0 + +_DIR = os.path.abspath(os.path.dirname(__file__)) + + +class Lc0Strategy(IStrategy): + def __init__(self, path="../lc0/lc0", rollout_depth: int = 4, + limit: chess.engine.Limit = chess.engine.Limit(depth=4)): + super().__init__(rollout_depth) + self._lc0 = None + self.path = path + self.limit = limit + + def __del__(self): + if self._lc0 is not None: + self._lc0.quit() + + @property + def lc0(self) -> chess.engine.SimpleEngine: + if self._lc0 is None: + self._lc0 = self.lc0 = chess.engine.SimpleEngine.popen_uci(self.path) + return self._lc0 + + @lc0.setter + def lc0(self, value): + self._lc0 = value + + def pick_next_move(self, board: chess.Board) -> chess.Move | None: + return self.lc0.play(board, self.limit).move + + def analyze_board(self, board: chess.Board) -> int: + score = score_lc0(board, self.lc0) + print("lc0 score", score) + return score diff --git a/chesspp/random_strategy.py b/chesspp/random_strategy.py index 74193d7..28c1bb4 100644 --- a/chesspp/random_strategy.py +++ b/chesspp/random_strategy.py @@ -5,7 +5,8 @@ from chesspp.eval import score_manual class RandomStrategy(IStrategy): - def __init__(self, random_state: random.Random): + def __init__(self, random_state: random.Random, rollout_depth: int = 4): + super().__init__(rollout_depth) self.random_state = random_state def pick_next_move(self, board: chess.Board) -> chess.Move | None: diff --git a/chesspp/simulation.py b/chesspp/simulation.py index 30ae071..98cf94c 100644 --- a/chesspp/simulation.py +++ b/chesspp/simulation.py @@ -5,8 +5,8 @@ import chess.pgn from typing import Tuple, List from enum import Enum from dataclasses import dataclass -from chesspp.i_strategy import IStrategy +from chesspp.engine_factory import StrategyEnum, EngineFactory, EngineEnum from chesspp.engine import Engine, Limit @@ -36,32 +36,39 @@ def simulate_game(white: Engine, black: Engine, limit: Limit, board: chess.Board class Evaluation: - def __init__(self, engine_a: Engine.__class__, strategy_a, engine_b: Engine.__class__, strategy_b, limit: Limit): + def __init__(self, engine_a: EngineEnum, strategy_a, engine_b: EngineEnum, strategy_b, limit: Limit, + stockfish_path: str, lc0_path: str): self.engine_a = engine_a self.strategy_a = strategy_a self.engine_b = engine_b self.strategy_b = strategy_b + self.stockfish_path = stockfish_path + self.lc0_path = lc0_path self.limit = limit def run(self, n_games=100, proc=mp.cpu_count()) -> List[EvaluationResult]: proc = min(proc, mp.cpu_count()) with mp.Pool(proc) as pool: - args = [(self.engine_a, self.strategy_a, self.engine_b, self.strategy_b, self.limit) for i in range(n_games)] + args = [(self.engine_a, self.strategy_a, self.engine_b, self.strategy_b, self.limit, self.stockfish_path, self.lc0_path) for i + in + range(n_games)] return pool.map(Evaluation._test_simulate, args) @staticmethod - def _test_simulate(arg: Tuple[Engine.__class__, IStrategy, Engine.__class__, IStrategy, Limit]) -> EvaluationResult: - engine_a, strategy_a, engine_b, strategy_b, limit = arg + def _test_simulate(arg: Tuple[EngineEnum, StrategyEnum, EngineEnum, StrategyEnum, Limit, str, str]) -> EvaluationResult: + engine_a, strategy_a, engine_b, strategy_b, limit, stockfish_path, lc0_path = arg flip_engines = bool(random.getrandbits(1)) - board = chess.Board() - if flip_engines: - black, white = engine_a(board.copy(), chess.BLACK, strategy_a), engine_b(board.copy(), chess.WHITE, strategy_b) + black, white = EngineFactory.create_engine(engine_a, strategy_a, chess.BLACK, + stockfish_path, lc0_path), EngineFactory.create_engine( + engine_b, strategy_b, chess.WHITE, stockfish_path, lc0_path) else: - white, black = engine_a(board.copy(), chess.WHITE, strategy_a), engine_b(board.copy(), chess.BLACK, strategy_b) + white, black = EngineFactory.create_engine(engine_a, strategy_a, chess.WHITE, + stockfish_path, lc0_path), EngineFactory.create_engine( + engine_b, strategy_b, chess.BLACK, stockfish_path, lc0_path) - game = simulate_game(white, black, limit, board) + game = simulate_game(white, black, limit, chess.Board()) winner = game.end().board().outcome().winner result = Winner.Draw diff --git a/chesspp/stockfish_strategy.py b/chesspp/stockfish_strategy.py index 4ae3024..0a089f2 100644 --- a/chesspp/stockfish_strategy.py +++ b/chesspp/stockfish_strategy.py @@ -4,14 +4,15 @@ from chesspp.i_strategy import IStrategy from chesspp.eval import score_stockfish import chess.engine -_DIR = os.path.abspath(os.path.dirname(__file__)) - class StockFishStrategy(IStrategy): - def __init__(self, path="../stockfish/stockfish-windows-x86-64-avx2"): + def __init__(self, path="../stockfish/stockfish-windows-x86-64-avx2", rollout_depth: int = 4, + limit: chess.engine.Limit = chess.engine.Limit(depth=4)): + super().__init__(rollout_depth) self._stockfish = None self.path = path + self.limit = limit def __del__(self): if self._stockfish is not None: @@ -20,8 +21,7 @@ class StockFishStrategy(IStrategy): @property def stockfish(self) -> chess.engine.SimpleEngine: if self._stockfish is None: - self._stockfish = self.stockfish = chess.engine.SimpleEngine.popen_uci( - os.path.join(_DIR, self.path)) + self._stockfish = self.stockfish = chess.engine.SimpleEngine.popen_uci(self.path) return self._stockfish @stockfish.setter @@ -29,7 +29,7 @@ class StockFishStrategy(IStrategy): self._stockfish = stockfish def pick_next_move(self, board: chess.Board) -> chess.Move | None: - return self.stockfish.play(board, chess.engine.Limit(depth=4)).move + return self.stockfish.play(board, self.limit).move def analyze_board(self, board: chess.Board) -> int: return score_stockfish(board, self.stockfish) diff --git a/main.py b/main.py index 549ae13..80d1330 100644 --- a/main.py +++ b/main.py @@ -1,11 +1,11 @@ import random import time - import chess import chess.engine import chess.pgn from chesspp.classic_mcts import ClassicMcts from chesspp.baysian_mcts import BayesianMcts +from chesspp.engine_factory import EngineEnum, StrategyEnum from chesspp.random_strategy import RandomStrategy from chesspp.stockfish_strategy import StockFishStrategy from chesspp import engine @@ -87,26 +87,17 @@ def analyze_results(moves: dict): def test_evaluation(): - a, b, s1, s2, n, limit, stockfish_path, proc = read_arguments() + a, b, s1, s2, n, limit, stockfish_path, lc0_path, proc = read_arguments() limit = engine.Limit(time=limit) - if s1 == StockFishStrategy: - strat1 = StockFishStrategy(stockfish_path) - else: - strat1 = s1() - if s2 == StockFishStrategy: - strat2 = StockFishStrategy(stockfish_path) - else: - strat2 = s1() - - evaluator = simulation.Evaluation(a, strat1, b, strat2, limit) + evaluator = simulation.Evaluation(a, s1, b, s2, limit, stockfish_path, lc0_path) results = evaluator.run(n, proc) a_results = len(list(filter(lambda x: x.winner == simulation.Winner.Engine_A, results))) / len(results) * 100 b_results = len(list(filter(lambda x: x.winner == simulation.Winner.Engine_B, results))) / len(results) * 100 draws = len(list(filter(lambda x: x.winner == simulation.Winner.Draw, results))) / len(results) * 100 - print(f"Engine {a.get_name()} won {a_results}% of games") - print(f"Engine {b.get_name()} won {b_results}% of games") + print(f"Engine {a} won {a_results}% of games") + print(f"Engine {b} won {b_results}% of games") print(f"{draws}% of games resulted in a draw") @@ -116,19 +107,24 @@ def read_arguments(): description='Compare two engines by playing multiple games against each other' ) - engines = {"ClassicMCTS": engine.ClassicMctsEngine, "BayesianMCTS": engine.BayesMctsEngine, "Random": engine.RandomEngine} - strategies = {"Random": RandomStrategy, "Stockfish": StockFishStrategy} + engines = {"ClassicMCTS": EngineEnum.ClassicMcts, "BayesianMCTS": EngineEnum.BayesianMcts, + "Random": EngineEnum.Random, "Stockfish": EngineEnum.Stockfish, "Lc0": EngineEnum.Lc0} + strategies = {"Random": StrategyEnum.Random, "Stockfish": StrategyEnum.Stockfish, "Lc0": StrategyEnum.Lc0} if os.name == 'nt': stockfish_default = "../stockfish/stockfish-windows-x86-64-avx2" + lc0_default = "../lc0/lc0.exe" else: stockfish_default = "../stockfish/stockfish-ubuntu-x86-64-avx2" + lc0_default = "../lc0/lc0" parser.add_argument("--proc", default=2, help="Number of processors to use for simulation, default=1") parser.add_argument("--time", default=0.5, help="Time limit for each simulation step, default=0.5") parser.add_argument("-n", default=100, help="Number of games to simulate, default=100") - parser.add_argument("--stockfish", default=stockfish_default, - help=f"Path for stockfish executable, default='{stockfish_default}'") + parser.add_argument("--stockfish_path", default=stockfish_default, + help=f"Path for engine executable, default='{stockfish_default}'") + parser.add_argument("--lc0_path", default=lc0_default, + help=f"Path for engine executable, default='{stockfish_default}'") parser.add_argument("--engine1", "--e1", help="Engine A for the simulation", choices=engines.keys(), required=True) parser.add_argument("--engine2", "--e2", help="Engine B for the simulation", choices=engines.keys(), required=True) parser.add_argument("--strategy1", "--s1", default=list(strategies.keys())[0], @@ -136,7 +132,7 @@ def read_arguments(): choices=strategies.keys()) parser.add_argument("--strategy2", "--s2", default=list(strategies.keys())[0], help="Strategy for engine B for the rollout", - choices=strategies) + choices=strategies.keys()) args = parser.parse_args() engine1 = engines[args.engine1] @@ -144,7 +140,8 @@ def read_arguments(): strategy1 = strategies[args.strategy1] strategy2 = strategies[args.strategy2] - return engine1, engine2, strategy1, strategy2, int(args.n), float(args.time), args.stockfish, int(args.proc) + print(engine1, engine2, strategy1, strategy2, int(args.n), float(args.time), args.stockfish_path, args.lc0_path, int(args.proc)) + return engine1, engine2, strategy1, strategy2, int(args.n), float(args.time), args.stockfish_path, args.lc0_path, int(args.proc) def main():