Merge branch 'mcts' into 'main'
Mcts See merge request tu-wien/prob-prog!1
This commit is contained in:
3
.gitignore
vendored
3
.gitignore
vendored
@@ -1,2 +1,3 @@
|
||||
/stockfish/
|
||||
.idea
|
||||
.idea
|
||||
.venv
|
||||
BIN
__pycache__/engine.cpython-310.pyc
Normal file
BIN
__pycache__/engine.cpython-310.pyc
Normal file
Binary file not shown.
BIN
__pycache__/eval.cpython-310.pyc
Normal file
BIN
__pycache__/eval.cpython-310.pyc
Normal file
Binary file not shown.
BIN
__pycache__/mcts.cpython-310.pyc
Normal file
BIN
__pycache__/mcts.cpython-310.pyc
Normal file
Binary file not shown.
73
engine.py
73
engine.py
@@ -2,30 +2,8 @@ import chess
|
||||
import chess.engine
|
||||
import random
|
||||
import eval
|
||||
|
||||
|
||||
def main():
|
||||
fools_mate = "rnbqkbnr/pppp1ppp/4p3/8/5PP1/8/PPPPP2P/RNBQKBNR b KQkq f3 0 2"
|
||||
board = chess.Board(fools_mate)
|
||||
print(board, '\n')
|
||||
moves = {}
|
||||
for i in range(10):
|
||||
move = pick_move(board)
|
||||
if move is None:
|
||||
break
|
||||
|
||||
simulate_game(board, move, 100)
|
||||
moves[move] = board
|
||||
board = chess.Board(fools_mate)
|
||||
|
||||
analyze_results(moves)
|
||||
|
||||
|
||||
def analyze_results(moves: dict):
|
||||
for m, b in moves.items():
|
||||
manual_score = eval.score_game(b)
|
||||
engine_score = eval.analyze_with_stockfish(b)
|
||||
print(f"score for move {m}: manual_score={manual_score}, engine_score={engine_score}")
|
||||
import numpy as np
|
||||
from stockfish import Stockfish
|
||||
|
||||
|
||||
def pick_move(board: chess.Board) -> chess.Move | None:
|
||||
@@ -49,19 +27,54 @@ def simulate_game(board: chess.Board, move: chess.Move, depth: int):
|
||||
"""
|
||||
engine = chess.engine.SimpleEngine.popen_uci("./stockfish/stockfish-ubuntu-x86-64-avx2")
|
||||
board.push(move)
|
||||
print(move)
|
||||
print(board, '\n')
|
||||
for i in range(depth):
|
||||
if board.is_game_over():
|
||||
engine.quit()
|
||||
return
|
||||
r = engine.play(board, chess.engine.Limit(depth=2))
|
||||
print(r)
|
||||
board.push(r.move)
|
||||
print(board, '\n')
|
||||
|
||||
engine.quit()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
def simulate_stockfish_prob(board: chess.Board, move: chess.Move, games: int = 10, depth: int = 10) -> (float, float):
|
||||
"""
|
||||
Simulate a game using
|
||||
:param board: chess board
|
||||
:param move: chosen move
|
||||
:param games: number of games that should be simulated after playing the move
|
||||
:param depth: simulation depth per game
|
||||
:return:
|
||||
"""
|
||||
board.push(move)
|
||||
copied_board = board.copy()
|
||||
scores = []
|
||||
|
||||
stockfish = Stockfish("./stockfish/stockfish-ubuntu-x86-64-avx2", depth=2, parameters={"Threads": 8, "Hash": 2048})
|
||||
stockfish.set_elo_rating(1200)
|
||||
stockfish.set_fen_position(board.fen())
|
||||
|
||||
def reset_game():
|
||||
nonlocal scores, copied_board, board
|
||||
score = eval.score_stockfish(copied_board).white().score(mate_score=100_000)
|
||||
scores.append(score)
|
||||
copied_board = board.copy()
|
||||
stockfish.set_fen_position(board.fen())
|
||||
|
||||
for _ in range(games):
|
||||
for d in range(depth):
|
||||
if copied_board.is_game_over() or d == depth - 1:
|
||||
reset_game()
|
||||
break
|
||||
|
||||
if d == depth - 1:
|
||||
reset_game()
|
||||
|
||||
top_moves = stockfish.get_top_moves(3)
|
||||
chosen_move = random.choice(top_moves)['Move']
|
||||
stockfish.make_moves_from_current_position([chosen_move])
|
||||
copied_board.push(chess.Move.from_uci(chosen_move))
|
||||
|
||||
print(scores)
|
||||
# TODO: return distribution here?
|
||||
return np.array(scores).mean(), np.array(scores).std()
|
||||
|
||||
11
eval.py
11
eval.py
@@ -1,5 +1,6 @@
|
||||
import chess
|
||||
import chess.engine
|
||||
import sys
|
||||
|
||||
# Eval constants for scoring chess boards
|
||||
# Evaluation metric inspired by Tomasz Michniewski: https://www.chessprogramming.org/Simplified_Evaluation_Function
|
||||
@@ -136,9 +137,7 @@ def check_endgame(board: chess.Board) -> bool:
|
||||
return (queens_black == 0 and queens_white == 0) or ((queens_black >= 1 and minors_black <= 1) or (queens_white >= 1 and minors_white <= 1))
|
||||
|
||||
|
||||
|
||||
|
||||
def score_game(board: chess.Board) -> float:
|
||||
def score_manual(board: chess.Board) -> int:
|
||||
"""
|
||||
Calculate the score of the given board regarding the given color
|
||||
:param board: the chess board
|
||||
@@ -147,7 +146,7 @@ def score_game(board: chess.Board) -> float:
|
||||
outcome = board.outcome()
|
||||
if outcome is not None:
|
||||
if outcome.termination == chess.Termination.CHECKMATE:
|
||||
return float('inf') if outcome.winner == chess.WHITE else float('-inf')
|
||||
return sys.maxsize if outcome.winner == chess.WHITE else -sys.maxsize
|
||||
else: # draw
|
||||
return 0
|
||||
|
||||
@@ -171,13 +170,13 @@ def score_game(board: chess.Board) -> float:
|
||||
return score
|
||||
|
||||
|
||||
def analyze_with_stockfish(board: chess.Board) -> chess.engine.PovScore:
|
||||
def score_stockfish(board: chess.Board) -> chess.engine.PovScore:
|
||||
"""
|
||||
Calculate the score of the given board using stockfish
|
||||
:param board:
|
||||
:return:
|
||||
"""
|
||||
engine = chess.engine.SimpleEngine.popen_uci("./stockfish/stockfish-ubuntu-x86-64-avx2")
|
||||
info = engine.analyse(board, chess.engine.Limit(depth=20))
|
||||
info = engine.analyse(board, chess.engine.Limit(depth=2))
|
||||
engine.quit()
|
||||
return info["score"]
|
||||
|
||||
61
main.py
Normal file
61
main.py
Normal file
@@ -0,0 +1,61 @@
|
||||
import chess
|
||||
import chess.engine
|
||||
from mcts import MCTSNode
|
||||
import engine
|
||||
import eval
|
||||
|
||||
|
||||
def test_mcts():
|
||||
fools_mate = "rnbqkbnr/pppp1ppp/4p3/8/5PP1/8/PPPPP2P/RNBQKBNR b KQkq f3 0 2"
|
||||
board = chess.Board(fools_mate)
|
||||
mcts_root = MCTSNode(board)
|
||||
mcts_root.build_tree()
|
||||
sorted_moves = sorted(mcts_root.children, key=lambda x: x.move.uci())
|
||||
for c in sorted_moves:
|
||||
print("move (mcts):", c.move, " with score:", c.score)
|
||||
|
||||
|
||||
def test_stockfish():
|
||||
fools_mate = "rnbqkbnr/pppp1ppp/4p3/8/5PP1/8/PPPPP2P/RNBQKBNR b KQkq f3 0 2"
|
||||
board = chess.Board(fools_mate)
|
||||
moves = {}
|
||||
untried_moves = list(board.legal_moves)
|
||||
for move in untried_moves:
|
||||
engine.simulate_game(board, move, 100)
|
||||
moves[move] = board
|
||||
board = chess.Board(fools_mate)
|
||||
|
||||
sorted_moves = dict(sorted(moves.items(), key=lambda x: x[0].uci()))
|
||||
analyze_results(sorted_moves)
|
||||
|
||||
|
||||
def test_stockfish_prob():
|
||||
fools_mate = "rnbqkbnr/pppp1ppp/4p3/8/5PP1/8/PPPPP2P/RNBQKBNR b KQkq f3 0 2"
|
||||
board = chess.Board(fools_mate)
|
||||
moves = {}
|
||||
untried_moves = list(board.legal_moves)
|
||||
for move in untried_moves:
|
||||
mean, std = engine.simulate_stockfish_prob(board, move, 10, 4)
|
||||
moves[move] = (mean, std)
|
||||
board = chess.Board(fools_mate)
|
||||
|
||||
sorted_moves = dict(sorted(moves.items(), key=lambda x: x[0].uci()))
|
||||
for m, s in sorted_moves.items():
|
||||
print(f"move '{m.uci()}' (prob_stockfish): mean={s[0]}, std={s[1]}")
|
||||
|
||||
|
||||
def analyze_results(moves: dict):
|
||||
for m, b in moves.items():
|
||||
manual_score = eval.score_manual(b)
|
||||
engine_score = eval.score_stockfish(b).white()
|
||||
print(f"score for move {m}: manual_score={manual_score}, engine_score={engine_score}")
|
||||
|
||||
|
||||
def main():
|
||||
test_mcts()
|
||||
test_stockfish()
|
||||
test_stockfish_prob()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
105
mcts.py
Normal file
105
mcts.py
Normal file
@@ -0,0 +1,105 @@
|
||||
import chess
|
||||
import random
|
||||
import eval
|
||||
import engine
|
||||
import numpy as np
|
||||
|
||||
|
||||
class MCTSNode:
|
||||
def __init__(self, board: chess.Board, parent = None, move: chess.Move | None = None, random_state: int | None = None):
|
||||
self.random = random.Random(random_state)
|
||||
self.board = board
|
||||
self.parent = parent
|
||||
self.move = move
|
||||
self.children = []
|
||||
self.visits = 0
|
||||
self.legal_moves = list(board.legal_moves)
|
||||
self.untried_actions = self.legal_moves
|
||||
self.score = 0
|
||||
|
||||
def _expand(self) -> 'MCTSNode':
|
||||
"""
|
||||
Expands the node, i.e., choose an action and apply it to the board
|
||||
:return:
|
||||
"""
|
||||
move = self.random.choice(self.untried_actions)
|
||||
self.untried_actions.remove(move)
|
||||
next_board = self.board.copy()
|
||||
next_board.push(move)
|
||||
child_node = MCTSNode(next_board, parent=self, move=move)
|
||||
self.children.append(child_node)
|
||||
return child_node
|
||||
|
||||
def _rollout(self, rollout_depth: int = 20) -> int:
|
||||
"""
|
||||
Rolls out the node by simulating a game for a given depth.
|
||||
Sometimes this step is called 'simulation' or 'playout'.
|
||||
:return: the score of the rolled out game
|
||||
"""
|
||||
copied_board = self.board.copy()
|
||||
steps = 1
|
||||
for i in range(rollout_depth):
|
||||
if copied_board.is_game_over():
|
||||
break
|
||||
|
||||
m = engine.pick_move(copied_board)
|
||||
copied_board.push(m)
|
||||
steps += 1
|
||||
|
||||
return eval.score_manual(copied_board) // steps
|
||||
|
||||
def _backpropagate(self, score: float) -> None:
|
||||
"""
|
||||
Backpropagates the results of the rollout
|
||||
:param score:
|
||||
:return:
|
||||
"""
|
||||
self.visits += 1
|
||||
# TODO: maybe use score + num of moves together (a win in 1 move is better than a win in 20 moves)
|
||||
self.score += score
|
||||
if self.parent:
|
||||
self.parent._backpropagate(score)
|
||||
|
||||
def is_fully_expanded(self) -> bool:
|
||||
return len(self.untried_actions) == 0
|
||||
|
||||
def _best_child(self) -> 'MCTSNode':
|
||||
"""
|
||||
Picks the best child according to our policy
|
||||
:return: the best child
|
||||
"""
|
||||
# NOTE: maybe clamp the score between [-1, +1] instead of [-inf, +inf]
|
||||
choices_weights = [(c.score / c.visits) + np.sqrt(((2 * np.log(self.visits)) / c.visits))
|
||||
for c in self.children]
|
||||
return self.children[np.argmax(choices_weights)]
|
||||
|
||||
def _select_leaf(self) -> 'MCTSNode':
|
||||
"""
|
||||
Selects a leaf node.
|
||||
If the node is not expanded is will be expanded.
|
||||
:return: Leaf node
|
||||
"""
|
||||
current_node = self
|
||||
while not current_node.board.is_game_over():
|
||||
if not current_node.is_fully_expanded():
|
||||
return current_node._expand()
|
||||
else:
|
||||
current_node = current_node._best_child()
|
||||
|
||||
return current_node
|
||||
|
||||
def build_tree(self, samples: int = 1000) -> 'MCTSNode':
|
||||
"""
|
||||
Runs the MCTS with the given number of samples
|
||||
:param samples: number of simulations
|
||||
:return: best node containing the best move
|
||||
"""
|
||||
for i in range(samples):
|
||||
# selection & expansion
|
||||
# rollout
|
||||
# backpropagate score
|
||||
node = self._select_leaf()
|
||||
score = node._rollout()
|
||||
node._backpropagate(score)
|
||||
|
||||
return self._best_child()
|
||||
@@ -1 +1,3 @@
|
||||
chess==1.10.0
|
||||
chess==1.10.0
|
||||
numpy==1.26.3
|
||||
stockfish==3.28.0
|
||||
Reference in New Issue
Block a user