added engine factory and adapted command line parsing, added lc0_strategy.py, added stockfish and lc0 standalone engines

This commit is contained in:
2024-01-30 19:44:09 +01:00
parent 50cd4cde9b
commit b2ce73f06b
9 changed files with 214 additions and 56 deletions

View File

@@ -1,5 +1,6 @@
import random import random
import time import time
import os
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from torch import distributions as dist from torch import distributions as dist
@@ -15,13 +16,13 @@ from typing import Dict
class Limit: class Limit:
""" Class to determine when to stop searching for moves """ """ Class to determine when to stop searching for moves """
time: float|None time: float | None
""" Search for `time` seconds """ """ Search for `time` seconds """
nodes: int|None nodes: int | None
""" Search for a limited number of `nodes`""" """ Search for a limited number of `nodes`"""
def __init__(self, time: float|None = None, nodes: int|None = None): def __init__(self, time: float | None = None, nodes: int | None = None):
self.time = time self.time = time
self.nodes = nodes self.nodes = nodes
@@ -44,7 +45,7 @@ class Limit:
def _run_time(self, func, *args, **kwargs): def _run_time(self, func, *args, **kwargs):
start = time.perf_counter_ns() start = time.perf_counter_ns()
while (time.perf_counter_ns()-start)/1e9 < self.time: while (time.perf_counter_ns() - start) / 1e9 < self.time:
func(*args, **kwargs) func(*args, **kwargs)
@@ -56,7 +57,7 @@ class Engine(ABC):
strategy: IStrategy strategy: IStrategy
"""The strategy used to pick moves when simulating games.""" """The strategy used to pick moves when simulating games."""
def __init__(self, board: chess.Board, color: chess.Color, strategy: IStrategy): def __init__(self, board: chess.Board, color: chess.Color, strategy: IStrategy | None):
self.board = board self.board = board
self.color = color self.color = color
self.strategy = strategy self.strategy = strategy
@@ -138,3 +139,29 @@ class RandomEngine(Engine):
def play(self, board: chess.Board, limit: Limit) -> chess.engine.PlayResult: def play(self, board: chess.Board, limit: Limit) -> chess.engine.PlayResult:
move = random.choice(list(board.legal_moves)) move = random.choice(list(board.legal_moves))
return chess.engine.PlayResult(move=move, ponder=None) return chess.engine.PlayResult(move=move, ponder=None)
class StockFishEngine(Engine):
def __init__(self, board: chess.Board, color: chess, path="../stockfish/stockfish-ubuntu-x86-64-avx2"):
super().__init__(board, color, None)
self.stockfish = chess.engine.SimpleEngine.popen_uci(path)
def play(self, board: chess.Board, limit: Limit) -> chess.engine.PlayResult:
return self.stockfish.play(board, limit)
@staticmethod
def get_name() -> str:
return "Stockfish"
class Lc0Engine(Engine):
def __init__(self, board: chess.Board, color: chess, path="../lc0/lc0"):
super().__init__(board, color, None)
self.lc0 = chess.engine.SimpleEngine.popen_uci(path)
def play(self, board: chess.Board, limit: Limit) -> chess.engine.PlayResult:
return self.lc0.play(board, limit)
@staticmethod
def get_name() -> str:
return "Lc0"

76
chesspp/engine_factory.py Normal file
View File

@@ -0,0 +1,76 @@
from enum import Enum
from chesspp.engine import *
from chesspp.lc0_strategy import Lc0Strategy
from chesspp.random_strategy import RandomStrategy
from chesspp.stockfish_strategy import StockFishStrategy
from chesspp.i_strategy import IStrategy
import chess
class EngineEnum(Enum):
ClassicMcts = 0
BayesianMcts = 1
Stockfish = 2
Lc0 = 3
Random = 4
class StrategyEnum(Enum):
Stockfish = 0
Lc0 = 1
Random = 2
class EngineFactory:
@staticmethod
def create_engine(engine_name: EngineEnum, strategy_name: StrategyEnum, color: chess.Color, stockfish_path: str, lc0_path: str, rollout_depth: int = 4) -> Engine:
match strategy_name:
case StrategyEnum.Stockfish:
strategy = EngineFactory._get_stockfish_strategy(stockfish_path, rollout_depth)
case StrategyEnum.Lc0:
strategy = EngineFactory._get_lc0_strategy(lc0_path, rollout_depth)
case StrategyEnum.Random:
strategy = EngineFactory._get_random_strategy(rollout_depth)
match engine_name:
case EngineEnum.ClassicMcts:
return EngineFactory.classic_mcts(color, strategy)
case EngineEnum.BayesianMcts:
return EngineFactory.bayesian_mcts(color, strategy)
case EngineEnum.Stockfish:
return EngineFactory.stockfish_engine(color, stockfish_path)
case EngineEnum.Lc0:
return EngineFactory.lc0_engine(color, lc0_path)
@staticmethod
def stockfish_engine(color: chess.Color, engine_path: str, board: chess.Board | None = chess.Board()) -> Engine:
return StockFishEngine(board, color, engine_path)
@staticmethod
def lc0_engine(color: chess.Color, engine_path: str, board: chess.Board | None = chess.Board()) -> Engine:
return Lc0Engine(board, color, engine_path)
@staticmethod
def bayesian_mcts(color: chess.Color, strategy: IStrategy, board: chess.Board | None = chess.Board()) -> Engine:
return BayesMctsEngine(board, color, strategy)
@staticmethod
def classic_mcts(color: chess.Color, strategy: IStrategy, board: chess.Board | None = chess.Board()) -> Engine:
return ClassicMctsEngine(board, color, strategy)
@staticmethod
def _get_random_strategy(rollout_depth: int) -> IStrategy:
return RandomStrategy(random.Random(), rollout_depth)
@staticmethod
def _get_stockfish_strategy(engine_path: str, rollout_depth: int) -> IStrategy:
return StockFishStrategy(engine_path, rollout_depth)
@staticmethod
def _get_lc0_strategy(engine_path: str, rollout_depth: int) -> IStrategy:
return Lc0Strategy(engine_path, rollout_depth)

View File

@@ -173,30 +173,38 @@ def score_manual(board: chess.Board) -> int:
return score return score
def score_stockfish(board: chess.Board, stockfish: chess.engine.SimpleEngine | None = None) -> int: def score_stockfish(board: chess.Board, stockfish: chess.engine.SimpleEngine | None = None,
limit: chess.engine.Limit = chess.engine.Limit(depth=0)) -> int:
""" """
Calculate the score of the given board using stockfish Calculate the score of the given board using stockfish
:param board: :param board:
:param stockfish:
:param limit:
:return: :return:
""" """
if stockfish is None: if stockfish is None:
engine = chess.engine.SimpleEngine.popen_uci( engine = chess.engine.SimpleEngine.popen_uci(
"/home/luke/projects/pp-project/chess-engine-pp/stockfish/stockfish-ubuntu-x86-64-avx2") "/home/luke/projects/pp-project/chess-engine-pp/stockfish/stockfish-ubuntu-x86-64-avx2")
info = engine.analyse(board, chess.engine.Limit(depth=0)) info = engine.analyse(board, limit)
engine.quit() engine.quit()
return info['score'].white().score(mate_score=100_000) return info['score'].white().score(mate_score=100_000)
else: else:
info = stockfish.analyse(board, chess.engine.Limit(depth=0)) info = stockfish.analyse(board, limit)
return info['score'].white().score(mate_score=100_000) return info['score'].white().score(mate_score=100_000)
def score_lc0(board: chess.Board) -> chess.engine.PovScore: def score_lc0(board: chess.Board, lc0: chess.engine.SimpleEngine | None = None,
limit: chess.engine.Limit= chess.engine.Limit(depth=0)) -> int:
""" """
Calculate the score of the given board using lc0 Calculate the score of the given board using lc0
:param board: :param board:
:return: :return:
""" """
if lc0 is None:
engine = chess.engine.SimpleEngine.popen_uci("/home/luke/projects/pp-project/chess-engine-pp/lc0/lc0") engine = chess.engine.SimpleEngine.popen_uci("/home/luke/projects/pp-project/chess-engine-pp/lc0/lc0")
info = engine.analyse(board, chess.engine.Limit(depth=4)) info = engine.analyse(board, limit)
engine.quit() engine.quit()
return info["score"] return info["score"]
else:
info = lc0.analyse(board, limit)
return info['score'].white().score(mate_score=100_000)

View File

@@ -3,8 +3,11 @@ from abc import ABC, abstractmethod
import chess import chess
# TODO extend class
class IStrategy(ABC): class IStrategy(ABC):
rollout_depth: int
def __init__(self, rollout_depth: int = 4):
self.rollout_depth = rollout_depth
@abstractmethod @abstractmethod
def pick_next_move(self, board: chess.Board) -> chess.Move: def pick_next_move(self, board: chess.Board) -> chess.Move:

39
chesspp/lc0_strategy.py Normal file
View File

@@ -0,0 +1,39 @@
import chess
import chess.engine
import os
from chesspp.i_strategy import IStrategy
from chesspp.eval import score_lc0
_DIR = os.path.abspath(os.path.dirname(__file__))
class Lc0Strategy(IStrategy):
def __init__(self, path="../lc0/lc0", rollout_depth: int = 4,
limit: chess.engine.Limit = chess.engine.Limit(depth=4)):
super().__init__(rollout_depth)
self._lc0 = None
self.path = path
self.limit = limit
def __del__(self):
if self._lc0 is not None:
self._lc0.quit()
@property
def lc0(self) -> chess.engine.SimpleEngine:
if self._lc0 is None:
self._lc0 = self.lc0 = chess.engine.SimpleEngine.popen_uci(self.path)
return self._lc0
@lc0.setter
def lc0(self, value):
self._lc0 = value
def pick_next_move(self, board: chess.Board) -> chess.Move | None:
return self.lc0.play(board, self.limit).move
def analyze_board(self, board: chess.Board) -> int:
score = score_lc0(board, self.lc0)
print("lc0 score", score)
return score

View File

@@ -5,7 +5,8 @@ from chesspp.eval import score_manual
class RandomStrategy(IStrategy): class RandomStrategy(IStrategy):
def __init__(self, random_state: random.Random): def __init__(self, random_state: random.Random, rollout_depth: int = 4):
super().__init__(rollout_depth)
self.random_state = random_state self.random_state = random_state
def pick_next_move(self, board: chess.Board) -> chess.Move | None: def pick_next_move(self, board: chess.Board) -> chess.Move | None:

View File

@@ -5,8 +5,8 @@ import chess.pgn
from typing import Tuple, List from typing import Tuple, List
from enum import Enum from enum import Enum
from dataclasses import dataclass from dataclasses import dataclass
from chesspp.i_strategy import IStrategy
from chesspp.engine_factory import StrategyEnum, EngineFactory, EngineEnum
from chesspp.engine import Engine, Limit from chesspp.engine import Engine, Limit
@@ -36,32 +36,39 @@ def simulate_game(white: Engine, black: Engine, limit: Limit, board: chess.Board
class Evaluation: class Evaluation:
def __init__(self, engine_a: Engine.__class__, strategy_a, engine_b: Engine.__class__, strategy_b, limit: Limit): def __init__(self, engine_a: EngineEnum, strategy_a, engine_b: EngineEnum, strategy_b, limit: Limit,
stockfish_path: str, lc0_path: str):
self.engine_a = engine_a self.engine_a = engine_a
self.strategy_a = strategy_a self.strategy_a = strategy_a
self.engine_b = engine_b self.engine_b = engine_b
self.strategy_b = strategy_b self.strategy_b = strategy_b
self.stockfish_path = stockfish_path
self.lc0_path = lc0_path
self.limit = limit self.limit = limit
def run(self, n_games=100, proc=mp.cpu_count()) -> List[EvaluationResult]: def run(self, n_games=100, proc=mp.cpu_count()) -> List[EvaluationResult]:
proc = min(proc, mp.cpu_count()) proc = min(proc, mp.cpu_count())
with mp.Pool(proc) as pool: with mp.Pool(proc) as pool:
args = [(self.engine_a, self.strategy_a, self.engine_b, self.strategy_b, self.limit) for i in range(n_games)] args = [(self.engine_a, self.strategy_a, self.engine_b, self.strategy_b, self.limit, self.stockfish_path, self.lc0_path) for i
in
range(n_games)]
return pool.map(Evaluation._test_simulate, args) return pool.map(Evaluation._test_simulate, args)
@staticmethod @staticmethod
def _test_simulate(arg: Tuple[Engine.__class__, IStrategy, Engine.__class__, IStrategy, Limit]) -> EvaluationResult: def _test_simulate(arg: Tuple[EngineEnum, StrategyEnum, EngineEnum, StrategyEnum, Limit, str, str]) -> EvaluationResult:
engine_a, strategy_a, engine_b, strategy_b, limit = arg engine_a, strategy_a, engine_b, strategy_b, limit, stockfish_path, lc0_path = arg
flip_engines = bool(random.getrandbits(1)) flip_engines = bool(random.getrandbits(1))
board = chess.Board()
if flip_engines: if flip_engines:
black, white = engine_a(board.copy(), chess.BLACK, strategy_a), engine_b(board.copy(), chess.WHITE, strategy_b) black, white = EngineFactory.create_engine(engine_a, strategy_a, chess.BLACK,
stockfish_path, lc0_path), EngineFactory.create_engine(
engine_b, strategy_b, chess.WHITE, stockfish_path, lc0_path)
else: else:
white, black = engine_a(board.copy(), chess.WHITE, strategy_a), engine_b(board.copy(), chess.BLACK, strategy_b) white, black = EngineFactory.create_engine(engine_a, strategy_a, chess.WHITE,
stockfish_path, lc0_path), EngineFactory.create_engine(
engine_b, strategy_b, chess.BLACK, stockfish_path, lc0_path)
game = simulate_game(white, black, limit, board) game = simulate_game(white, black, limit, chess.Board())
winner = game.end().board().outcome().winner winner = game.end().board().outcome().winner
result = Winner.Draw result = Winner.Draw

View File

@@ -4,14 +4,15 @@ from chesspp.i_strategy import IStrategy
from chesspp.eval import score_stockfish from chesspp.eval import score_stockfish
import chess.engine import chess.engine
_DIR = os.path.abspath(os.path.dirname(__file__))
class StockFishStrategy(IStrategy): class StockFishStrategy(IStrategy):
def __init__(self, path="../stockfish/stockfish-windows-x86-64-avx2"): def __init__(self, path="../stockfish/stockfish-windows-x86-64-avx2", rollout_depth: int = 4,
limit: chess.engine.Limit = chess.engine.Limit(depth=4)):
super().__init__(rollout_depth)
self._stockfish = None self._stockfish = None
self.path = path self.path = path
self.limit = limit
def __del__(self): def __del__(self):
if self._stockfish is not None: if self._stockfish is not None:
@@ -20,8 +21,7 @@ class StockFishStrategy(IStrategy):
@property @property
def stockfish(self) -> chess.engine.SimpleEngine: def stockfish(self) -> chess.engine.SimpleEngine:
if self._stockfish is None: if self._stockfish is None:
self._stockfish = self.stockfish = chess.engine.SimpleEngine.popen_uci( self._stockfish = self.stockfish = chess.engine.SimpleEngine.popen_uci(self.path)
os.path.join(_DIR, self.path))
return self._stockfish return self._stockfish
@stockfish.setter @stockfish.setter
@@ -29,7 +29,7 @@ class StockFishStrategy(IStrategy):
self._stockfish = stockfish self._stockfish = stockfish
def pick_next_move(self, board: chess.Board) -> chess.Move | None: def pick_next_move(self, board: chess.Board) -> chess.Move | None:
return self.stockfish.play(board, chess.engine.Limit(depth=4)).move return self.stockfish.play(board, self.limit).move
def analyze_board(self, board: chess.Board) -> int: def analyze_board(self, board: chess.Board) -> int:
return score_stockfish(board, self.stockfish) return score_stockfish(board, self.stockfish)

37
main.py
View File

@@ -1,11 +1,11 @@
import random import random
import time import time
import chess import chess
import chess.engine import chess.engine
import chess.pgn import chess.pgn
from chesspp.classic_mcts import ClassicMcts from chesspp.classic_mcts import ClassicMcts
from chesspp.baysian_mcts import BayesianMcts from chesspp.baysian_mcts import BayesianMcts
from chesspp.engine_factory import EngineEnum, StrategyEnum
from chesspp.random_strategy import RandomStrategy from chesspp.random_strategy import RandomStrategy
from chesspp.stockfish_strategy import StockFishStrategy from chesspp.stockfish_strategy import StockFishStrategy
from chesspp import engine from chesspp import engine
@@ -87,26 +87,17 @@ def analyze_results(moves: dict):
def test_evaluation(): def test_evaluation():
a, b, s1, s2, n, limit, stockfish_path, proc = read_arguments() a, b, s1, s2, n, limit, stockfish_path, lc0_path, proc = read_arguments()
limit = engine.Limit(time=limit) limit = engine.Limit(time=limit)
if s1 == StockFishStrategy:
strat1 = StockFishStrategy(stockfish_path)
else:
strat1 = s1()
if s2 == StockFishStrategy: evaluator = simulation.Evaluation(a, s1, b, s2, limit, stockfish_path, lc0_path)
strat2 = StockFishStrategy(stockfish_path)
else:
strat2 = s1()
evaluator = simulation.Evaluation(a, strat1, b, strat2, limit)
results = evaluator.run(n, proc) results = evaluator.run(n, proc)
a_results = len(list(filter(lambda x: x.winner == simulation.Winner.Engine_A, results))) / len(results) * 100 a_results = len(list(filter(lambda x: x.winner == simulation.Winner.Engine_A, results))) / len(results) * 100
b_results = len(list(filter(lambda x: x.winner == simulation.Winner.Engine_B, results))) / len(results) * 100 b_results = len(list(filter(lambda x: x.winner == simulation.Winner.Engine_B, results))) / len(results) * 100
draws = len(list(filter(lambda x: x.winner == simulation.Winner.Draw, results))) / len(results) * 100 draws = len(list(filter(lambda x: x.winner == simulation.Winner.Draw, results))) / len(results) * 100
print(f"Engine {a.get_name()} won {a_results}% of games") print(f"Engine {a} won {a_results}% of games")
print(f"Engine {b.get_name()} won {b_results}% of games") print(f"Engine {b} won {b_results}% of games")
print(f"{draws}% of games resulted in a draw") print(f"{draws}% of games resulted in a draw")
@@ -116,19 +107,24 @@ def read_arguments():
description='Compare two engines by playing multiple games against each other' description='Compare two engines by playing multiple games against each other'
) )
engines = {"ClassicMCTS": engine.ClassicMctsEngine, "BayesianMCTS": engine.BayesMctsEngine, "Random": engine.RandomEngine} engines = {"ClassicMCTS": EngineEnum.ClassicMcts, "BayesianMCTS": EngineEnum.BayesianMcts,
strategies = {"Random": RandomStrategy, "Stockfish": StockFishStrategy} "Random": EngineEnum.Random, "Stockfish": EngineEnum.Stockfish, "Lc0": EngineEnum.Lc0}
strategies = {"Random": StrategyEnum.Random, "Stockfish": StrategyEnum.Stockfish, "Lc0": StrategyEnum.Lc0}
if os.name == 'nt': if os.name == 'nt':
stockfish_default = "../stockfish/stockfish-windows-x86-64-avx2" stockfish_default = "../stockfish/stockfish-windows-x86-64-avx2"
lc0_default = "../lc0/lc0.exe"
else: else:
stockfish_default = "../stockfish/stockfish-ubuntu-x86-64-avx2" stockfish_default = "../stockfish/stockfish-ubuntu-x86-64-avx2"
lc0_default = "../lc0/lc0"
parser.add_argument("--proc", default=2, help="Number of processors to use for simulation, default=1") parser.add_argument("--proc", default=2, help="Number of processors to use for simulation, default=1")
parser.add_argument("--time", default=0.5, help="Time limit for each simulation step, default=0.5") parser.add_argument("--time", default=0.5, help="Time limit for each simulation step, default=0.5")
parser.add_argument("-n", default=100, help="Number of games to simulate, default=100") parser.add_argument("-n", default=100, help="Number of games to simulate, default=100")
parser.add_argument("--stockfish", default=stockfish_default, parser.add_argument("--stockfish_path", default=stockfish_default,
help=f"Path for stockfish executable, default='{stockfish_default}'") help=f"Path for engine executable, default='{stockfish_default}'")
parser.add_argument("--lc0_path", default=lc0_default,
help=f"Path for engine executable, default='{stockfish_default}'")
parser.add_argument("--engine1", "--e1", help="Engine A for the simulation", choices=engines.keys(), required=True) parser.add_argument("--engine1", "--e1", help="Engine A for the simulation", choices=engines.keys(), required=True)
parser.add_argument("--engine2", "--e2", help="Engine B for the simulation", choices=engines.keys(), required=True) parser.add_argument("--engine2", "--e2", help="Engine B for the simulation", choices=engines.keys(), required=True)
parser.add_argument("--strategy1", "--s1", default=list(strategies.keys())[0], parser.add_argument("--strategy1", "--s1", default=list(strategies.keys())[0],
@@ -136,7 +132,7 @@ def read_arguments():
choices=strategies.keys()) choices=strategies.keys())
parser.add_argument("--strategy2", "--s2", default=list(strategies.keys())[0], parser.add_argument("--strategy2", "--s2", default=list(strategies.keys())[0],
help="Strategy for engine B for the rollout", help="Strategy for engine B for the rollout",
choices=strategies) choices=strategies.keys())
args = parser.parse_args() args = parser.parse_args()
engine1 = engines[args.engine1] engine1 = engines[args.engine1]
@@ -144,7 +140,8 @@ def read_arguments():
strategy1 = strategies[args.strategy1] strategy1 = strategies[args.strategy1]
strategy2 = strategies[args.strategy2] strategy2 = strategies[args.strategy2]
return engine1, engine2, strategy1, strategy2, int(args.n), float(args.time), args.stockfish, int(args.proc) print(engine1, engine2, strategy1, strategy2, int(args.n), float(args.time), args.stockfish_path, args.lc0_path, int(args.proc))
return engine1, engine2, strategy1, strategy2, int(args.n), float(args.time), args.stockfish_path, args.lc0_path, int(args.proc)
def main(): def main():