diff --git a/.gitignore b/.gitignore index a110392..d06d1d7 100644 --- a/.gitignore +++ b/.gitignore @@ -5,3 +5,4 @@ dist build docs/site/* +.venv/ diff --git a/pypokerengine/api/game.py b/pypokerengine/api/game.py index 54c3769..e2bf6ce 100644 --- a/pypokerengine/api/game.py +++ b/pypokerengine/api/game.py @@ -4,11 +4,13 @@ def setup_config(max_round, initial_stack, small_blind_amount, ante=0): return Config(max_round, initial_stack, small_blind_amount, ante) -def start_poker(config, verbose=2): +def start_poker(config, verbose=2, hand_history_writer=None): config.validation() dealer = Dealer(config.sb_amount, config.initial_stack, config.ante) dealer.set_verbose(verbose) dealer.set_blind_structure(config.blind_structure) + if hand_history_writer: + dealer.set_hand_history_writer(hand_history_writer) for info in config.players_info: dealer.register_player(info["name"], info["algorithm"]) result_message = dealer.start_game(config.max_round) @@ -46,5 +48,4 @@ def validation(self): if player_num < 2: detail_msg = "no player is registered yet" if player_num==0 else "you registered only 1 player" base_msg = "At least 2 players are needed to start the game" - raise Exception("%s (but %s.)" % (base_msg, detail_msg)) - + raise Exception("%s (but %s.)" % (base_msg, detail_msg)) \ No newline at end of file diff --git a/pypokerengine/engine/action_checker.py b/pypokerengine/engine/action_checker.py index 5968464..39f1fc6 100644 --- a/pypokerengine/engine/action_checker.py +++ b/pypokerengine/engine/action_checker.py @@ -37,7 +37,12 @@ def legal_actions(self, players, player_pos, sb_amount): min_raise = self.__min_raise_amount(players, sb_amount) max_raise = players[player_pos].stack + players[player_pos].paid_sum() if max_raise < min_raise: - min_raise = max_raise = -1 + if max_raise > self.agree_amount(players): + # Player can't meet min raise but has more than call amount → allow all-in + min_raise = max_raise + else: + # Player can't even cover the call → raise not possible + min_raise = max_raise = -1 return [ { "action" : "fold" , "amount" : 0 }, { "action" : "call" , "amount" : self.agree_amount(players) }, diff --git a/pypokerengine/engine/dealer.py b/pypokerengine/engine/dealer.py index 524ebb4..c2a63f2 100644 --- a/pypokerengine/engine/dealer.py +++ b/pypokerengine/engine/dealer.py @@ -18,6 +18,10 @@ def __init__(self, small_blind_amount=None, initial_stack=None, ante=None): self.message_summarizer = MessageSummarizer(verbose=0) self.table = Table() self.blind_structure = {} + self.hand_history_writer = None + + def set_hand_history_writer(self, writer): + self.hand_history_writer = writer def register_player(self, player_name, algorithm): self.__config_check() @@ -42,6 +46,8 @@ def start_game(self, max_round): def play_round(self, round_count, blind_amount, ante, table): state, msgs = RoundManager.start_new_round(round_count, blind_amount, ante, table) + if self.hand_history_writer: + self.__notify_writer_round_start(round_count, state) while True: self.__message_check(msgs, state["street"]) if state["street"] != Const.Street.FINISHED: # continue the round @@ -85,6 +91,9 @@ def __notify_game_start(self, max_round): start_msg = MessageBuilder.build_game_start_message(config, self.table.seats) self.message_handler.process_message(-1, start_msg) self.message_summarizer.summarize(start_msg) + if self.hand_history_writer: + game_info = start_msg["message"]["game_information"] + self.hand_history_writer.on_game_start(game_info) def __is_game_finished(self, table): return len([player for player in table.seats.players if player.is_active()]) == 1 @@ -99,8 +108,53 @@ def __message_check(self, msgs, street): def __publish_messages(self, msgs): for address, msg in msgs[:-1]: self.message_handler.process_message(address, msg) + if self.hand_history_writer: + self.__notify_writer_message(msg) self.message_summarizer.summarize_messages(msgs) - return self.message_handler.process_message(*msgs[-1]) + result = self.message_handler.process_message(*msgs[-1]) + if self.hand_history_writer: + self.__notify_writer_message(msgs[-1][1]) + return result + + def __notify_writer_round_start(self, round_count, state): + players = state["table"].seats.players + hole_cards_by_uuid = { + p.uuid: [str(c) for c in p.hole_card] + for p in players if p.hole_card + } + from pypokerengine.engine.data_encoder import DataEncoder + seats = DataEncoder.encode_seats(state["table"].seats)["seats"] + self.hand_history_writer.on_round_start( + round_count=round_count, + dealer_btn=state["table"].dealer_btn, + hole_cards_by_uuid=hole_cards_by_uuid, + seats=seats, + ) + + def __notify_writer_message(self, msg): + content = msg["message"] + message_type = content["message_type"] + if message_type == MessageBuilder.STREET_START_MESSAGE: + rs = content["round_state"] + self.hand_history_writer.on_street_start( + street=content["street"], + community_card=rs["community_card"], + ) + elif message_type == MessageBuilder.GAME_UPDATE_MESSAGE: + action = content["action"] + rs = content["round_state"] + self.hand_history_writer.on_action( + actor_uuid=action["player_uuid"], + action=action["action"].lower(), + amount=action["amount"], + seats=rs["seats"], + ) + elif message_type == MessageBuilder.ROUND_RESULT_MESSAGE: + self.hand_history_writer.on_round_result( + winners=content["winners"], + hand_info=content["hand_info"], + round_state=content["round_state"], + ) def __exclude_short_of_money_players(self, table, ante, sb_amount): sb_pos, bb_pos = self.__steal_money_from_poor_player(table, ante, sb_amount) @@ -271,5 +325,4 @@ def summarize_game_result(self, message): def summairze_blind_level_update(self, round_count, old_ante, new_ante, old_sb_amount, new_sb_amount): base = 'Blind level update at round-%d : Ante %s -> %s, SmallBlind %s -> %s' - return base % (round_count, old_ante, new_ante, old_sb_amount, new_sb_amount) - + return base % (round_count, old_ante, new_ante, old_sb_amount, new_sb_amount) \ No newline at end of file diff --git a/pypokerengine/engine/hand_evaluator.py b/pypokerengine/engine/hand_evaluator.py index 8da0955..2a15983 100644 --- a/pypokerengine/engine/hand_evaluator.py +++ b/pypokerengine/engine/hand_evaluator.py @@ -1,260 +1,236 @@ -from functools import reduce from itertools import groupby +from functools import reduce +from treys import Card as TreysCard, Evaluator + class HandEvaluator: - HIGHCARD = 0 - ONEPAIR = 1 << 8 - TWOPAIR = 1 << 9 - THREECARD = 1 << 10 - STRAIGHT = 1 << 11 - FLASH = 1 << 12 - FULLHOUSE = 1 << 13 - FOURCARD = 1 << 14 - STRAIGHTFLASH = 1 << 15 - - HAND_STRENGTH_MAP = { - HIGHCARD: "HIGHCARD", - ONEPAIR: "ONEPAIR", - TWOPAIR: "TWOPAIR", - THREECARD: "THREECARD", - STRAIGHT: "STRAIGHT", - FLASH: "FLASH", - FULLHOUSE: "FULLHOUSE", - FOURCARD: "FOURCARD", - STRAIGHTFLASH: "STRAIGHTFLASH" - } - - @classmethod - def gen_hand_rank_info(self, hole, community): - hand = self.eval_hand(hole, community) - row_strength = self.__mask_hand_strength(hand) - strength = self.HAND_STRENGTH_MAP[row_strength] - hand_high = self.__mask_hand_high_rank(hand) - hand_low = self.__mask_hand_low_rank(hand) - hole_high = self.__mask_hole_high_rank(hand) - hole_low = self.__mask_hole_low_rank(hand) - - return { - "hand" : { - "strength" : strength, - "high" : hand_high, - "low" : hand_low - }, - "hole" : { - "high" : hole_high, - "low" : hole_low - } + HIGHCARD = 0 + ONEPAIR = 1 << 8 + TWOPAIR = 1 << 9 + THREECARD = 1 << 10 + STRAIGHT = 1 << 11 + FLASH = 1 << 12 + FULLHOUSE = 1 << 13 + FOURCARD = 1 << 14 + STRAIGHTFLASH = 1 << 15 + + HAND_STRENGTH_MAP = { + HIGHCARD: "HIGHCARD", + ONEPAIR: "ONEPAIR", + TWOPAIR: "TWOPAIR", + THREECARD: "THREECARD", + STRAIGHT: "STRAIGHT", + FLASH: "FLASH", + FULLHOUSE: "FULLHOUSE", + FOURCARD: "FOURCARD", + STRAIGHTFLASH: "STRAIGHTFLASH", } - @classmethod - def eval_hand(self, hole, community): - ranks = sorted([card.rank for card in hole]) - hole_flg = ranks[1] << 4 | ranks[0] - hand_flg = self.__calc_hand_info_flg(hole, community) << 8 - return hand_flg | hole_flg - - # Return Format - # [Bit flg of hand][rank1(4bit)][rank2(4bit)] - # ex.) - # HighCard hole card 3,4 => 100 0011 - # OnePair of rank 3 => 1 0011 0000 - # TwoPair of rank A, 4 => 10 1110 0100 - # ThreeCard of rank 9 => 100 1001 0000 - # Straight of rank 10 => 1000 1010 0000 - # Flash of rank 5 => 10000 0101 0000 - # FullHouse of rank 3, 4 => 100000 0011 0100 - # FourCard of rank 2 => 1000000 0010 0000 - # straight flash of rank 7 => 10000000 0111 0000 - @classmethod - def __calc_hand_info_flg(self, hole, community): - cards = hole + community - if self.__is_straightflash(cards): return self.STRAIGHTFLASH | self.__eval_straightflash(cards) - if self.__is_fourcard(cards): return self.FOURCARD | self.__eval_fourcard(cards) - if self.__is_fullhouse(cards): return self.FULLHOUSE | self.__eval_fullhouse(cards) - if self.__is_flash(cards): return self.FLASH | self.__eval_flash(cards) - if self.__is_straight(cards): return self.STRAIGHT | self.__eval_straight(cards) - if self.__is_threecard(cards): return self.THREECARD | self.__eval_threecard(cards) - if self.__is_twopair(cards): return self.TWOPAIR | self.__eval_twopair(cards) - if self.__is_onepair(cards): return self.ONEPAIR | (self.__eval_onepair(cards)) - return self.__eval_holecard(hole) - - @classmethod - def __eval_holecard(self, hole): - ranks = sorted([card.rank for card in hole]) - return ranks[1] << 4 | ranks[0] - - @classmethod - def __is_onepair(self, cards): - return self.__eval_onepair(cards) != 0 - - @classmethod - def __eval_onepair(self, cards): - rank = 0 - memo = 0 # bit memo - for card in cards: - mask = 1 << card.rank - if memo & mask != 0: rank = max(rank, card.rank) - memo |= mask - return rank << 4 - - @classmethod - def __is_twopair(self, cards): - return len(self.__search_twopair(cards)) == 2 - - @classmethod - def __eval_twopair(self, cards): - ranks = self.__search_twopair(cards) - return ranks[0] << 4 | ranks[1] - - @classmethod - def __search_twopair(self, cards): - ranks = [] - memo = 0 - for card in cards: - mask = 1 << card.rank - if memo & mask != 0: ranks.append(card.rank) - memo |= mask - return sorted(ranks)[::-1][:2] - - @classmethod - def __is_threecard(self, cards): - return self.__search_threecard(cards) != -1 - - @classmethod - def __eval_threecard(self, cards): - return self.__search_threecard(cards) << 4 - - @classmethod - def __search_threecard(self, cards): - rank = -1 - bit_memo = reduce(lambda memo,card: memo + (1 << (card.rank-1)*3), cards, 0) - for r in range(2, 15): - bit_memo >>= 3 - count = bit_memo & 7 - if count >= 3: rank = r - return rank - - @classmethod - def __is_straight(self, cards): - return self.__search_straight(cards) != -1 - - @classmethod - def __eval_straight(self, cards): - return self.__search_straight(cards) << 4 - - @classmethod - def __search_straight(self, cards): - bit_memo = reduce(lambda memo, card: memo | 1 << card.rank, cards, 0) - rank = -1 - straight_check = lambda acc, i: acc & (bit_memo >> (r+i) & 1) == 1 - for r in range(2, 15): - if reduce(straight_check, range(5), True): rank = r - return rank - - @classmethod - def __is_flash(self, cards): - return self.__search_flash(cards) != -1 - - @classmethod - def __eval_flash(self, cards): - return self.__search_flash(cards) << 4 - - @classmethod - def __search_flash(self, cards): - best_suit_rank = -1 - fetch_suit = lambda card: card.suit - fetch_rank = lambda card: card.rank - for suit, group_obj in groupby(sorted(cards, key=fetch_suit), key=fetch_suit): - g = list(group_obj) - if len(g) >= 5: - max_rank_card = max(g, key=fetch_rank) - best_suit_rank = max(best_suit_rank, max_rank_card.rank) - return best_suit_rank - - @classmethod - def __is_fullhouse(self, cards): - r1, r2 = self.__search_fullhouse(cards) - return r1 and r2 - - @classmethod - def __eval_fullhouse(self, cards): - r1, r2 = self.__search_fullhouse(cards) - return r1 << 4 | r2 - - @classmethod - def __search_fullhouse(self, cards): - fetch_rank = lambda card: card.rank - three_card_ranks, two_pair_ranks = [], [] - for rank, group_obj in groupby(sorted(cards, key=fetch_rank), key=fetch_rank): - g = list(group_obj) - if len(g) >= 3: - three_card_ranks.append(rank) - if len(g) >= 2: - two_pair_ranks.append(rank) - two_pair_ranks = [rank for rank in two_pair_ranks if not rank in three_card_ranks] - if len(three_card_ranks) == 2: - two_pair_ranks.append(min(three_card_ranks)) - max_ = lambda l: None if len(l)==0 else max(l) - return max_(three_card_ranks), max_(two_pair_ranks) - - @classmethod - def __is_fourcard(self, cards): - return self.__eval_fourcard(cards) != 0 - - @classmethod - def __eval_fourcard(self, cards): - rank = self.__search_fourcard(cards) - return rank << 4 - - @classmethod - def __search_fourcard(self, cards): - fetch_rank = lambda card: card.rank - for rank, group_obj in groupby(sorted(cards, key=fetch_rank), key=fetch_rank): - g = list(group_obj) - if len(g) >= 4: - return rank - return 0 - - @classmethod - def __is_straightflash(self, cards): - return self.__search_straightflash(cards) != -1 - - @classmethod - def __eval_straightflash(self, cards): - return self.__search_straightflash(cards) << 4 - - @classmethod - def __search_straightflash(self, cards): - flash_cards = [] - fetch_suit = lambda card: card.suit - for suit, group_obj in groupby(sorted(cards, key=fetch_suit), key=fetch_suit): - g = list(group_obj) - if len(g) >= 5: flash_cards = g - return self.__search_straight(flash_cards) - - @classmethod - def __mask_hand_strength(self, bit): - mask = 511 << 16 - return (bit & mask) >> 8 # 511 = (1 << 9) -1 - - @classmethod - def __mask_hand_high_rank(self, bit): - mask = 15 << 12 - return (bit & mask) >> 12 - - @classmethod - def __mask_hand_low_rank(self, bit): - mask = 15 << 8 - return (bit & mask) >> 8 - - @classmethod - def __mask_hole_high_rank(self, bit): - mask = 15 << 4 - return (bit & mask) >> 4 - - @classmethod - def __mask_hole_low_rank(self, bit): - mask = 15 - return bit & mask + # PyPokerEngine card.rank → treys rank char + _RANK_CHARS = { + 2: '2', 3: '3', 4: '4', 5: '5', 6: '6', 7: '7', + 8: '8', 9: '9', 10: 'T', 11: 'J', 12: 'Q', 13: 'K', 14: 'A', + } + + # PyPokerEngine card.suit (CLUB=2, DIAMOND=4, HEART=8, SPADE=16) → treys suit char + _SUIT_CHARS = {2: 'c', 4: 'd', 8: 'h', 16: 's'} + + # treys Evaluator.get_rank_class() returns 1-9 (1 = best hand) + _TREYS_CLASS_TO_STRENGTH = { + 0: STRAIGHTFLASH, + 1: STRAIGHTFLASH, + 2: FOURCARD, + 3: FULLHOUSE, + 4: FLASH, + 5: STRAIGHT, + 6: THREECARD, + 7: TWOPAIR, + 8: ONEPAIR, + 9: HIGHCARD, + } + + _evaluator = Evaluator() + + # ── Public API ──────────────────────────────────────────────────────────── + + @classmethod + def gen_hand_rank_info(cls, hole, community): + hand = cls.eval_hand(hole, community) + row_strength = cls.__mask_hand_strength(hand) + strength = cls.HAND_STRENGTH_MAP[row_strength] + return { + "hand": { + "strength": strength, + "high": cls.__mask_hand_high_rank(hand), + "low": cls.__mask_hand_low_rank(hand), + }, + "hole": { + "high": cls.__mask_hole_high_rank(hand), + "low": cls.__mask_hole_low_rank(hand), + }, + } + @classmethod + def eval_hand(cls, hole, community): + ranks = sorted(card.rank for card in hole) + hole_flg = ranks[1] << 4 | ranks[0] + hand_flg = cls.__calc_hand_info_flg(hole, community) << 8 + return hand_flg | hole_flg + + # ── Core evaluation (treys decides the hand class) ──────────────────────── + + @classmethod + def __calc_hand_info_flg(cls, hole, community): + all_cards = hole + community + strength = cls.__detect_strength(hole, community) + + if strength == cls.STRAIGHTFLASH: return strength | cls.__eval_straightflash(all_cards) + if strength == cls.FOURCARD: return strength | cls.__eval_fourcard(all_cards) + if strength == cls.FULLHOUSE: return strength | cls.__eval_fullhouse(all_cards) + if strength == cls.FLASH: return strength | cls.__eval_flash(all_cards) + if strength == cls.STRAIGHT: return strength | cls.__eval_straight(all_cards) + if strength == cls.THREECARD: return strength | cls.__eval_threecard(all_cards) + if strength == cls.TWOPAIR: return strength | cls.__eval_twopair(all_cards) + if strength == cls.ONEPAIR: return strength | cls.__eval_onepair(all_cards) + return cls.__eval_holecard(hole) + + @classmethod + def __detect_strength(cls, hole, community): + """Use treys to correctly classify the best 5-card hand from all 7 cards. + Falls back to simple detection when there are fewer than 3 community cards.""" + if len(community) >= 3: + t_hole = [cls.__to_treys(c) for c in hole] + t_comm = [cls.__to_treys(c) for c in community] + t_rank = cls._evaluator.evaluate(t_comm, t_hole) + return cls._TREYS_CLASS_TO_STRENGTH[cls._evaluator.get_rank_class(t_rank)] + # Pre-flop: only hole cards available + return cls.ONEPAIR if hole[0].rank == hole[1].rank else cls.HIGHCARD + + @classmethod + def __to_treys(cls, card): + return TreysCard.new(cls._RANK_CHARS[card.rank] + cls._SUIT_CHARS[card.suit]) + + # ── Rank extractors ─────────────────────────────────────────────────────── + + @classmethod + def __eval_holecard(cls, hole): + ranks = sorted(card.rank for card in hole) + return ranks[1] << 4 | ranks[0] + + @classmethod + def __eval_onepair(cls, cards): + rank, memo = 0, 0 + for card in cards: + mask = 1 << card.rank + if memo & mask: + rank = max(rank, card.rank) + memo |= mask + return rank << 4 + + @classmethod + def __eval_twopair(cls, cards): + pairs, memo = [], 0 + for card in cards: + mask = 1 << card.rank + if memo & mask: + pairs.append(card.rank) + memo |= mask + top2 = sorted(pairs, reverse=True)[:2] + return top2[0] << 4 | top2[1] + + @classmethod + def __eval_threecard(cls, cards): + rank = -1 + bit_memo = reduce(lambda m, c: m + (1 << (c.rank - 1) * 3), cards, 0) + for r in range(2, 15): + bit_memo >>= 3 + if (bit_memo & 7) >= 3: + rank = r + return rank << 4 + + @classmethod + def __eval_straight(cls, cards): + ranks = set(card.rank for card in cards) + if 14 in ranks: # Ace can play low + ranks.add(1) + best = -1 + for low in range(1, 11): # low card 1..10 → high card 5..14 + if all(r in ranks for r in range(low, low + 5)): + best = low + 4 + return best << 4 if best != -1 else 0 + + @classmethod + def __eval_flash(cls, cards): + best = (-1, -1) + fetch_suit = lambda c: c.suit + fetch_rank = lambda c: c.rank + for _suit, grp in groupby(sorted(cards, key=fetch_suit), key=fetch_suit): + g = sorted(grp, key=fetch_rank, reverse=True) + if len(g) >= 5: + candidate = (g[0].rank, g[1].rank) + if candidate > best: + best = candidate + return best[0] << 4 | best[1] + + @classmethod + def __eval_fullhouse(cls, cards): + fetch_rank = lambda c: c.rank + threes, twos = [], [] + for rank, grp in groupby(sorted(cards, key=fetch_rank), key=fetch_rank): + g = list(grp) + if len(g) >= 3: + threes.append(rank) + if len(g) >= 2: + twos.append(rank) + # A second three-of-a-kind contributes as the pair + twos = [r for r in twos if r not in threes] + if len(threes) == 2: + twos.append(min(threes)) + best_three = max(threes) if threes else None + best_two = max(twos) if twos else None + return best_three << 4 | best_two + + @classmethod + def __eval_fourcard(cls, cards): + fetch_rank = lambda c: c.rank + for rank, grp in groupby(sorted(cards, key=fetch_rank), key=fetch_rank): + if len(list(grp)) >= 4: + return rank << 4 + return 0 + + @classmethod + def __eval_straightflash(cls, cards): + """Highest straight flush in any suit; wheel-aware via __eval_straight.""" + fetch_suit = lambda c: c.suit + best = -1 + for _suit, grp in groupby(sorted(cards, key=fetch_suit), key=fetch_suit): + g = list(grp) + if len(g) >= 5: + val = cls.__eval_straight(g) + if val: + best = max(best, val >> 4) + return best << 4 if best != -1 else 0 + + # ── Bit-mask helpers (unchanged) ────────────────────────────────────────── + + @classmethod + def __mask_hand_strength(cls, bit): + mask = 511 << 16 + return (bit & mask) >> 8 + + @classmethod + def __mask_hand_high_rank(cls, bit): + return (bit & (15 << 12)) >> 12 + + @classmethod + def __mask_hand_low_rank(cls, bit): + return (bit & (15 << 8)) >> 8 + + @classmethod + def __mask_hole_high_rank(cls, bit): + return (bit & (15 << 4)) >> 4 + + @classmethod + def __mask_hole_low_rank(cls, bit): + return bit & 15 \ No newline at end of file diff --git a/pypokerengine/engine/hand_history_writer.py b/pypokerengine/engine/hand_history_writer.py new file mode 100644 index 0000000..bd372c1 --- /dev/null +++ b/pypokerengine/engine/hand_history_writer.py @@ -0,0 +1,345 @@ +""" +PokerStars Hand History Writer +============================== +Writes one hand history file per player in PokerStars format, matching +the perspective-based layout of real PokerStars hand histories: each file +shows only that player's hole cards, making the output compatible with +third-party tools such as PokerTracker and HoldemManager. + +Usage: + from pypokerengine.api.game import setup_config, start_poker + from pypokerengine.engine.hand_history_writer import PokerStarsHandHistoryWriter + + # output_file must contain {player} — one file is created per player. + writer = PokerStarsHandHistoryWriter("histories/{player}.txt") + + config = setup_config(max_round=100, initial_stack=10000, small_blind_amount=1) + config.register_player(name="p1", algorithm=MyBot()) + config.register_player(name="p2", algorithm=OtherBot()) + + start_poker(config, verbose=0, hand_history_writer=writer) + # Creates: histories/p1.txt (p1's hole cards shown) + # histories/p2.txt (p2's hole cards shown) +""" + +import os +import uuid +from datetime import datetime + + +# --------------------------------------------------------------------------- +# Card conversion helpers +# --------------------------------------------------------------------------- + +def _ps_card(engine_card: str) -> str: + """ + Converts an engine card string to PokerStars format. + Engine format: suit+rank, e.g. "H9", "DA", "ST", "CT", "C2" + PokerStars format: rank+suit, e.g. "9h", "Ad", "Ts", "Tc", "2c" + """ + if not engine_card or len(engine_card) < 2: + return "??" + suit = engine_card[0].lower() # H->h, D->d, S->s, C->c + rank = engine_card[1:] # 9, A, T, J, Q, K, 2... + return f"{rank}{suit}" + + +def _ps_cards(engine_cards: list) -> str: + """Converts a list of engine card strings to PokerStars bracket notation: [9h Ad Ts]""" + return "[" + " ".join(_ps_card(c) for c in engine_cards) + "]" + + +def _player_name_by_uuid(seats: list, target_uuid: str) -> str: + for s in seats: + if s["uuid"] == target_uuid: + return s["name"] + return target_uuid[:8] + + +# --------------------------------------------------------------------------- +# Writer +# --------------------------------------------------------------------------- + +class PokerStarsHandHistoryWriter: + """ + Writes one hand history file per player in PokerStars format. + + Each file is identical except for the HOLE CARDS section, where only + that player's own cards are shown — exactly as real PokerStars files work. + + Parameters: + output_file : Path template containing {player}, e.g. "histories/{player}.txt". + The directory is created automatically if it does not exist. + stakes : Stakes string, e.g. "$1/$2". Auto-derived from blinds if omitted. + table_name : Table name written in the hand header. + """ + + def __init__( + self, + output_file: str = "{player}_history.txt", + stakes: str | None = None, + table_name: str = "PyPokerEngine", + ): + if "{player}" not in output_file: + raise ValueError( + "output_file must contain {player}, e.g. 'histories/{player}.txt'. " + "One file is written per player." + ) + self.output_file = output_file + self.table_name = table_name + self._stakes = stakes + self._sb_amount = 0 + self._bb_amount = 0 + self._seats_meta = [] # stable list set once at game_start: [{name, uuid, seat_no}] + self._reset_round() + + # ------------------------------------------------------------------ + # Engine callbacks — called by Dealer + # ------------------------------------------------------------------ + + def on_game_start(self, game_info: dict): + """Called once when the game session begins.""" + rule = game_info["rule"] + self._sb_amount = rule["small_blind_amount"] + self._bb_amount = self._sb_amount * 2 + self._stakes = self._stakes or f"${self._sb_amount}/${self._bb_amount}" + + self._seats_meta = [ + {"name": p["name"], "uuid": p["uuid"], "seat_no": i + 1} + for i, p in enumerate(game_info["seats"]) + ] + + # Create output directories up front so we don't fail mid-game + for meta in self._seats_meta: + path = self.output_file.format(player=meta["name"]) + dirpath = os.path.dirname(path) + if dirpath: + os.makedirs(dirpath, exist_ok=True) + + def on_round_start(self, round_count: int, dealer_btn: int, + hole_cards_by_uuid: dict, seats: list): + """ + Called at the start of each hand, after hole cards have been dealt. + + hole_cards_by_uuid: {uuid: ["H9", "DA"]} + seats: encoded seat list from round_state + """ + self._reset_round() + self._round_count = round_count + self._hand_id = int(uuid.uuid4()) % (10 ** 15) + self._dealer_btn = dealer_btn + self._hole_cards = hole_cards_by_uuid + self._stacks_at_start = {s["uuid"]: s["stack"] for s in seats} + + self._write_header(seats) + self._write_blinds() + # Mark where HOLE CARDS section begins; dealt lines are injected per-player in _flush + self._lines.append("*** HOLE CARDS ***") + self._hole_cards_insert_idx = len(self._lines) + + def on_street_start(self, street: str, community_card: list): + """Called at the start of each betting street.""" + self._current_street = street + self._street_first_raise[street] = True # next aggression on this street is a bet + + if street == "preflop": + return # already written in on_round_start + + if street == "flop": + self._lines.append(f"*** FLOP *** {_ps_cards(community_card)}") + elif street == "turn": + self._lines.append( + f"*** TURN *** {_ps_cards(community_card[:3])} {_ps_cards(community_card[3:4])}" + ) + elif street == "river": + self._lines.append( + f"*** RIVER *** {_ps_cards(community_card[:4])} {_ps_cards(community_card[4:5])}" + ) + + def on_action(self, actor_uuid: str, action: str, amount: int, seats: list): + """ + Called after each player action. + + action: "fold" | "call" | "raise" + amount: absolute total bet size (not add_amount) + """ + name = _player_name_by_uuid(self._seats_meta, actor_uuid) + street = self._current_street + + if action == "fold": + self._lines.append(f"{name}: folds") + elif action == "call": + if amount == 0: + self._lines.append(f"{name}: checks") + else: + self._lines.append(f"{name}: calls {amount}") + elif action == "raise": + # PokerStars convention: first aggression post-flop is "bets X", + # subsequent aggression is "raises to Y". + if self._street_first_raise.get(street, True) and street != "preflop": + self._lines.append(f"{name}: bets {amount}") + else: + self._lines.append(f"{name}: raises to {amount}") + self._street_first_raise[street] = False + + # Sync stacks + for s in seats: + for meta in self._seats_meta: + if meta["uuid"] == s["uuid"]: + meta["stack"] = s["stack"] + + def on_round_result(self, winners: list, hand_info: list, round_state: dict): + """Called at the end of each hand.""" + seats = round_state["seats"] + final_stacks = {s["uuid"]: s["stack"] for s in seats} + + # Showdown: all remaining players' cards are revealed (same in every file) + active = [s for s in seats if s["state"] != "folded"] + if len(active) > 1: + self._lines.append("*** SHOW DOWN ***") + for hi in hand_info: + u = hi.get("uuid") + if u and u in self._hole_cards: + name = _player_name_by_uuid(self._seats_meta, u) + cards = _ps_cards(self._hole_cards[u]) + strength = hi.get("hand", {}).get("hand", {}).get("strength", "") + self._lines.append(f"{name}: shows {cards} ({strength})") + + # Pot collected lines — write GROSS amount (PokerStars standard). + # In PokerStars format, 'X collected Y from pot' means Y is the gross pot + # amount taken out, not the net profit. Net profit = gross - invested. + pot = round_state.get("pot", {}) + pot_total = pot.get("main", {}).get("amount", 0) + for sp in pot.get("side", []): + pot_total += sp.get("amount", 0) + + # Store gross winnings per winner uuid so _write_summary can use the same value. + # PokerStars requires the SUMMARY collected(Y) to be identical to the mid-hand + # 'collected Y from pot' line — both must be the GROSS pot, not net profit. + self._gross_winnings = {} + if winners: + gross_per_winner = pot_total // len(winners) + for winner in winners: + self._gross_winnings[winner["uuid"]] = gross_per_winner + self._lines.append(f"{winner['name']} collected {gross_per_winner} from pot") + + self._write_summary(round_state) + self._flush() + + # ------------------------------------------------------------------ + # Internal helpers + # ------------------------------------------------------------------ + + def _reset_round(self): + self._round_count = 0 + self._hand_id = None + self._dealer_btn = 0 + self._hole_cards = {} + self._lines = [] + self._hole_cards_insert_idx = None + self._street_first_raise = {} + self._current_street = "preflop" + self._stacks_at_start = {} + self._gross_winnings = {} # uuid -> gross chips collected from pot this hand + + def _write_header(self, seats: list): + ts = datetime.now().strftime("%Y/%m/%d %H:%M:%S ET") + nb = len(self._seats_meta) + btn_seat_no = self._dealer_btn + 1 + + self._lines.append( + f"PokerStars Hand #{self._hand_id}: " + f"Hold'em No Limit ({self._stakes}) - {ts}" + ) + self._lines.append( + f"Table '{self.table_name}' {nb}-max Seat #{btn_seat_no} is the button" + ) + stack_map = {s["uuid"]: s["stack"] for s in seats} + for meta in self._seats_meta: + stack = stack_map.get(meta["uuid"], 0) + self._lines.append(f"Seat {meta['seat_no']}: {meta['name']} ({stack} in chips)") + + def _write_blinds(self): + """Writes the small blind and big blind posting lines.""" + nb = len(self._seats_meta) + btn = self._dealer_btn + + if nb == 2: + # Heads-up: dealer posts small blind + sb_idx, bb_idx = btn, (btn + 1) % nb + else: + sb_idx = (btn + 1) % nb + bb_idx = (btn + 2) % nb + + self._lines.append(f"{self._seats_meta[sb_idx]['name']}: posts small blind {self._sb_amount}") + self._lines.append(f"{self._seats_meta[bb_idx]['name']}: posts big blind {self._bb_amount}") + + def _write_summary(self, round_state: dict): + seats = round_state["seats"] + community = round_state.get("community_card", []) + pot = round_state.get("pot", {}) + pot_total = pot.get("main", {}).get("amount", 0) + for sp in pot.get("side", []): + pot_total += sp.get("amount", 0) + + self._lines.append("*** SUMMARY ***") + self._lines.append(f"Total pot {pot_total} | Rake 0") + + if community: + self._lines.append(f"Board {_ps_cards(community)}") + + nb = len(self._seats_meta) + btn = self._dealer_btn + sb_idx = btn if nb == 2 else (btn + 1) % nb + bb_idx = (btn + 1) % nb if nb == 2 else (btn + 2) % nb + btn_seat_no = self._dealer_btn + 1 + + folded_uuids = {s["uuid"] for s in seats if s["state"] == "folded"} + + for i, meta in enumerate(self._seats_meta): + parts = [f"Seat {meta['seat_no']}: {meta['name']}"] + + if meta["seat_no"] == btn_seat_no: + parts.append("(button)") + if i == sb_idx: + parts.append("(small blind)") + if i == bb_idx: + parts.append("(big blind)") + + # FIX: use the gross pot collected (same value as mid-hand 'collected X from pot') + # rather than the net profit (final_stack - start_stack). + # Real PokerStars format: + # *** SHOW DOWN *** + # dhduncan collected 1417 from pot ← gross + # *** SUMMARY *** + # Seat 9: dhduncan … and won (1417) … ← same gross, NOT net (967) + gross = self._gross_winnings.get(meta["uuid"], 0) + if gross > 0: + parts.append(f"collected ({gross})") + elif meta["uuid"] in folded_uuids: + parts.append("folded") + else: + parts.append("lost") + + self._lines.append(" ".join(parts)) + + self._lines.append("") # Blank line between hands + + def _flush(self): + """ + Writes one file per player. The only difference between files is the + HOLE CARDS section: each file shows only that player's own dealt cards. + """ + idx = self._hole_cards_insert_idx + + for meta in self._seats_meta: + cards = self._hole_cards.get(meta["uuid"]) + dealt_lines = ( + [f"Dealt to {meta['name']} {_ps_cards(cards)}"] if cards else [] + ) + lines = self._lines[:idx] + dealt_lines + self._lines[idx:] + + filepath = self.output_file.format(player=meta["name"]) + with open(filepath, "a", encoding="utf-8") as f: + for line in lines: + f.write(line + "\n") \ No newline at end of file diff --git a/tests/base_unittest.py b/tests/base_unittest.py index 0be503a..8671cc0 100644 --- a/tests/base_unittest.py +++ b/tests/base_unittest.py @@ -1,6 +1,5 @@ import unittest -from mock import Mock -from nose.tools import * +from unittest.mock import Mock class BaseUnitTest(unittest.TestCase): @@ -14,5 +13,4 @@ def true(self, target): return self.assertTrue(target) def false(self, target): - return self.assertFalse(target) - + return self.assertFalse(target) \ No newline at end of file diff --git a/tests/pypokerengine/api/emulator_test.py b/tests/pypokerengine/api/emulator_test.py index 718272b..844c480 100644 --- a/tests/pypokerengine/api/emulator_test.py +++ b/tests/pypokerengine/api/emulator_test.py @@ -1,7 +1,7 @@ from collections import OrderedDict from functools import reduce -from nose.tools import raises +import pytest from tests.base_unittest import BaseUnitTest from pypokerengine.api.emulator import Emulator, Event from pypokerengine.utils.game_state_utils import restore_game_state, attach_hole_card,\ @@ -32,9 +32,9 @@ def test_register_and_fetch_player(self): self.eq(p1, self.emu.fetch_player("uuid-1")) self.eq(p2, self.emu.fetch_player("uuid-2")) - @raises(TypeError) def test_register_invalid_player(self): - self.emu.register_player("uuid", "hoge") + with pytest.raises(TypeError): + self.emu.register_player("uuid", "hoge") def test_blind_structure(self): game_state = restore_game_state(TwoPlayerSample.round_state) @@ -146,18 +146,16 @@ def test_apply_action_start_next_round(self): self.eq(100, game_state["table"].seats.players[0].stack) self.eq(70, game_state["table"].seats.players[1].stack) - @raises(Exception) def test_apply_action_when_game_finished(self): game_state = restore_game_state(TwoPlayerSample.round_state) game_state = attach_hole_card_from_deck(game_state, "tojrbxmkuzrarnniosuhct") game_state = attach_hole_card_from_deck(game_state, "pwtwlmfciymjdoljkhagxa") self.emu.set_game_rule(2, 3, 5, 0) - p1, p2 = FoldMan(), FoldMan() self.emu.register_player("tojrbxmkuzrarnniosuhct", FoldMan()) self.emu.register_player("pwtwlmfciymjdoljkhagxa", FoldMan()) - game_state, events = self.emu.apply_action(game_state, "fold") - self.emu.apply_action(game_state, "fold") + with pytest.raises(Exception): + self.emu.apply_action(game_state, "fold") def test_run_until_round_finish(self): diff --git a/tests/pypokerengine/api/game_test.py b/tests/pypokerengine/api/game_test.py index 65462c5..a835367 100644 --- a/tests/pypokerengine/api/game_test.py +++ b/tests/pypokerengine/api/game_test.py @@ -1,6 +1,6 @@ import pypokerengine.api.game as G -from nose.tools import raises +import pytest from tests.base_unittest import BaseUnitTest from examples.players.fold_man import FoldMan @@ -51,8 +51,8 @@ def test_start_poker_validation_when_one_player(self): result = G.start_poker(config) self.assertIn("only 1 player", str(e.exception)) - @raises(TypeError) def test_register_player_when_invalid(self): - config = G.setup_config(1, 100, 10) - config.register_player("p1", "dummy") + with pytest.raises(TypeError): + config = G.setup_config(1, 100, 10) + config.register_player("p1", "dummy") diff --git a/tests/pypokerengine/engine/action_checker_test.py b/tests/pypokerengine/engine/action_checker_test.py index 7db5548..40bda63 100644 --- a/tests/pypokerengine/engine/action_checker_test.py +++ b/tests/pypokerengine/engine/action_checker_test.py @@ -147,7 +147,8 @@ def test_legal_actions_when_short_of_money(self): legal_actions = ActionChecker.legal_actions(players, 0, 2.5) self.eq({"action":"fold", "amount":0}, legal_actions[0]) self.eq({"action":"call", "amount":10}, legal_actions[1]) - self.eq({"action":"raise", "amount": { "min":-1, "max":-1} }, legal_actions[2]) + # stack=9, paid_sum=5 → max_raise=14 > call=10 → all-in possible + self.eq({"action":"raise", "amount": { "min":14, "max":14} }, legal_actions[2]) def test_need_amount_after_ante(self): # situation => SB=$5 (players[0]), BB=$10 (players[1]), ANTE=$3 @@ -180,13 +181,32 @@ def set_stack(stacks, ps): set_stack([12,12,12], players) actions = ActionChecker.legal_actions(players, 2, 5) - self.eq(-1, actions[2]["amount"]["max"]) + # stack=12, paid_sum=3 → max_raise=15 > call=10 → all-in possible + self.eq(12, actions[2]["amount"]["max"]) set_stack([10,5,12], players) self.eq(("raise", 15), ActionChecker.correct_action(players, 0, 5, "raise", 15)) self.eq(("raise", 15), ActionChecker.correct_action(players, 1, 5, "raise", 15)) self.eq(("fold", 0), ActionChecker.correct_action(players, 2, 5, "raise", 15)) + def test_raise_allin_when_stack_below_min_raise(self): + # Player has more than call amount but less than min raise + # → should be able to go all-in, not return -1 + players = [Player("uuid-0", 100, "p0"), Player("uuid-1", 100, "p1")] + # p0 raised to 20 (add_amount=10), so min_raise=30 + players[0].add_action_history(Const.Action.RAISE, 20, 10) + players[0].collect_bet(20) + # p1 has 25 chips, paid nothing → max_raise=25 < min_raise=30 + # but max_raise=25 > call_amount=20 → all-in should be possible + players[1].stack = 25 + + actions = ActionChecker.legal_actions(players, 1, 5) + raise_action = actions[2] + # Should not be -1 since player has more than call amount (20) + self.neq(-1, raise_action["amount"]["min"]) + self.neq(-1, raise_action["amount"]["max"]) + self.eq(25, raise_action["amount"]["min"]) + self.eq(25, raise_action["amount"]["max"]) def __setup_clean_players(self): return [Player("uuid", 100) for _ in range(2)] diff --git a/tests/pypokerengine/engine/dealer_message_integration_test.py b/tests/pypokerengine/engine/dealer_message_integration_test.py index 010e839..9a19b33 100644 --- a/tests/pypokerengine/engine/dealer_message_integration_test.py +++ b/tests/pypokerengine/engine/dealer_message_integration_test.py @@ -8,7 +8,6 @@ from pypokerengine.engine.message_builder import MessageBuilder from pypokerengine.engine.dealer import MessageHandler from pypokerengine.players import BasePokerPlayer -from nose.tools import * class MessageIntegrationTest(BaseUnitTest): diff --git a/tests/pypokerengine/engine/game_evaluator_test.py b/tests/pypokerengine/engine/game_evaluator_test.py index a95e2d1..761731f 100644 --- a/tests/pypokerengine/engine/game_evaluator_test.py +++ b/tests/pypokerengine/engine/game_evaluator_test.py @@ -7,7 +7,6 @@ from pypokerengine.engine.pay_info import PayInfo from pypokerengine.engine.table import Table from pypokerengine.engine.game_evaluator import GameEvaluator -from nose.tools import * class GameEvaluatorTest(BaseUnitTest): diff --git a/tests/pypokerengine/engine/hand_evaluator_test.py b/tests/pypokerengine/engine/hand_evaluator_test.py index fd0cf80..d8d2b2b 100644 --- a/tests/pypokerengine/engine/hand_evaluator_test.py +++ b/tests/pypokerengine/engine/hand_evaluator_test.py @@ -130,12 +130,11 @@ def test_straight(self): ] hole = [ Card(Card.CLUB, 4), - Card(Card.DIAMOND, 5) + Card(Card.SPADE, 5) ] - bit = HandEvaluator.eval_hand(hole, community) self.eq(HandEvaluator.STRAIGHT, HandEvaluator._HandEvaluator__mask_hand_strength(bit)) - self.eq(3, HandEvaluator._HandEvaluator__mask_hand_high_rank(bit)) + self.eq(7, HandEvaluator._HandEvaluator__mask_hand_high_rank(bit)) self.eq(0, HandEvaluator._HandEvaluator__mask_hand_low_rank(bit)) self.eq(5, HandEvaluator._HandEvaluator__mask_hole_high_rank(bit)) self.eq(4, HandEvaluator._HandEvaluator__mask_hole_low_rank(bit)) @@ -145,19 +144,18 @@ def test_flash(self): Card(Card.CLUB, 7), Card(Card.DIAMOND, 2), Card(Card.DIAMOND, 3), - Card(Card.DIAMOND, 5 ), + Card(Card.DIAMOND, 5), Card(Card.DIAMOND, 6) ] hole = [ Card(Card.CLUB, 4), - Card(Card.DIAMOND, 5) + Card(Card.DIAMOND, 9) ] - bit = HandEvaluator.eval_hand(hole, community) self.eq(HandEvaluator.FLASH, HandEvaluator._HandEvaluator__mask_hand_strength(bit)) - self.eq(6, HandEvaluator._HandEvaluator__mask_hand_high_rank(bit)) - self.eq(0, HandEvaluator._HandEvaluator__mask_hand_low_rank(bit)) - self.eq(5, HandEvaluator._HandEvaluator__mask_hole_high_rank(bit)) + self.eq(9, HandEvaluator._HandEvaluator__mask_hand_high_rank(bit)) + self.eq(6, HandEvaluator._HandEvaluator__mask_hand_low_rank(bit)) + self.eq(9, HandEvaluator._HandEvaluator__mask_hole_high_rank(bit)) self.eq(4, HandEvaluator._HandEvaluator__mask_hole_low_rank(bit)) def test_fullhouse(self): @@ -165,14 +163,13 @@ def test_fullhouse(self): Card(Card.CLUB, 4), Card(Card.DIAMOND, 2), Card(Card.DIAMOND, 4), - Card(Card.DIAMOND, 5 ), + Card(Card.DIAMOND, 5), Card(Card.DIAMOND, 6) ] hole = [ - Card(Card.CLUB, 4), - Card(Card.DIAMOND, 5) + Card(Card.HEART, 4), + Card(Card.SPADE, 5) ] - bit = HandEvaluator.eval_hand(hole, community) self.eq(HandEvaluator.FULLHOUSE, HandEvaluator._HandEvaluator__mask_hand_strength(bit)) self.eq(4, HandEvaluator._HandEvaluator__mask_hand_high_rank(bit)) @@ -232,13 +229,97 @@ def test_straightflash(self): ] hole = [ Card(Card.HEART, 10), - Card(Card.HEART, 1) + Card(Card.HEART, 14) ] - bit = HandEvaluator.eval_hand(hole, community) self.eq(HandEvaluator.STRAIGHTFLASH, HandEvaluator._HandEvaluator__mask_hand_strength(bit)) - self.eq(10, HandEvaluator._HandEvaluator__mask_hand_high_rank(bit)) + self.eq(14, HandEvaluator._HandEvaluator__mask_hand_high_rank(bit)) self.eq(0, HandEvaluator._HandEvaluator__mask_hand_low_rank(bit)) self.eq(14, HandEvaluator._HandEvaluator__mask_hole_high_rank(bit)) self.eq(10, HandEvaluator._HandEvaluator__mask_hole_low_rank(bit)) + def test_wheel_straight(self): + community = [ + Card(Card.CLUB, 2), + Card(Card.DIAMOND, 3), + Card(Card.HEART, 4), + Card(Card.DIAMOND, 5), + Card(Card.CLUB, 9) + ] + hole = [ + Card(Card.HEART, 14), # Ace + Card(Card.SPADE, 8) + ] + bit = HandEvaluator.eval_hand(hole, community) + self.eq(HandEvaluator.STRAIGHT, HandEvaluator._HandEvaluator__mask_hand_strength(bit)) + self.eq(5, HandEvaluator._HandEvaluator__mask_hand_high_rank(bit)) + + + def test_flush_comparison_uses_all_five_cards(self): + community_strong = [ + Card(Card.HEART, 14), + Card(Card.HEART, 13), + Card(Card.HEART, 12), + Card(Card.HEART, 11), + Card(Card.CLUB, 2) + ] + hole_strong = [ + Card(Card.HEART, 9), + Card(Card.SPADE, 3) + ] + community_weak = [ + Card(Card.DIAMOND, 14), + Card(Card.DIAMOND, 13), + Card(Card.DIAMOND, 12), + Card(Card.DIAMOND, 11), + Card(Card.CLUB, 3) + ] + hole_weak = [ + Card(Card.DIAMOND, 8), + Card(Card.SPADE, 4) + ] + strong = HandEvaluator.eval_hand(hole_strong, community_strong) + weak = HandEvaluator.eval_hand(hole_weak, community_weak) + self.eq(HandEvaluator.FLASH, HandEvaluator._HandEvaluator__mask_hand_strength(strong)) + self.eq(HandEvaluator.FLASH, HandEvaluator._HandEvaluator__mask_hand_strength(weak)) + self.true(strong > weak) + + + def test_best_five_from_seven(self): + community = [ + Card(Card.HEART, 2), + Card(Card.HEART, 4), + Card(Card.HEART, 6), + Card(Card.HEART, 8), + Card(Card.CLUB, 5) + ] + hole = [ + Card(Card.HEART, 10), + Card(Card.SPADE, 3) + ] + bit = HandEvaluator.eval_hand(hole, community) + self.eq(HandEvaluator.STRAIGHT, HandEvaluator._HandEvaluator__mask_hand_strength(bit)) + self.eq(6, HandEvaluator._HandEvaluator__mask_hand_high_rank(bit)) + + + def test_flush_comparison_uses_all_five_cards(self): + community = [ + Card(Card.HEART, 14), + Card(Card.HEART, 13), + Card(Card.HEART, 12), + Card(Card.HEART, 11), + Card(Card.CLUB, 2) + ] + hole_strong = [ + Card(Card.HEART, 9), + Card(Card.SPADE, 3) + ] + hole_weak = [ + Card(Card.HEART, 8), + Card(Card.SPADE, 4) + ] + strong = HandEvaluator.eval_hand(hole_strong, community) + weak = HandEvaluator.eval_hand(hole_weak, community) + self.eq(HandEvaluator.FLASH, HandEvaluator._HandEvaluator__mask_hand_strength(strong)) + self.eq(HandEvaluator.FLASH, HandEvaluator._HandEvaluator__mask_hand_strength(weak)) + self.true(strong > weak) \ No newline at end of file diff --git a/tests/pypokerengine/engine/hand_history_writer_test.py b/tests/pypokerengine/engine/hand_history_writer_test.py new file mode 100644 index 0000000..4c453eb --- /dev/null +++ b/tests/pypokerengine/engine/hand_history_writer_test.py @@ -0,0 +1,342 @@ +""" +Replicates the example hand history scenario: + - 6 players, blinds 50/100 + - Preflop: player5 raises to 350, player6 + player1 call, others fold + - Flop/Turn/River: check check check -> player5 bets 100, others call + - player6 wins the pot + +Key assertion: SUMMARY collected(X) == mid-hand "collected X from pot" +i.e. both values are the gross pot (1500), NOT the net profit (1050 = 1500 - 450). + +Real PokerStars reference: + *** SHOW DOWN *** + player6 collected 1417 from pot <- gross (rake deducted) + *** SUMMARY *** + Total pot 1500 | Rake 83 + Seat 9: player6 showed [8d As] and won (1417) with a pair of Eights <- same 1417 + + player6 invested 450 (350 preflop + 100 river) -> net = 1417 - 450 = 967 + SUMMARY uses 1417 (gross), NOT 967 (net). + Our engine has no rake, so the gross pot is 1500. +""" + +import re +import pytest +from pypokerengine.engine.hand_history_writer import PokerStarsHandHistoryWriter + + +# --------------------------------------------------------------------------- +# Hand parameters +# --------------------------------------------------------------------------- + +PLAYERS = [ + # (name, uuid, starting stack) + ("player1", "u1", 10361), + ("player2", "u2", 10849), # button + ("player3", "u3", 9305), # small blind + ("player4", "u4", 10000), # big blind + ("player5", "u5", 10125), + ("player6", "u6", 6919), +] + +DEALER_BTN = 1 # player2 is the button (0-indexed) +SB_AMOUNT = 50 +BB_AMOUNT = 100 + +# Investments per player: +# player1: 350 preflop + 100 river = 450 (lost at showdown) +# player2: 0 (folded preflop, did not bet) +# player3: 50 (SB) (folded preflop) +# player4: 100 (BB) (folded preflop) +# player5: 350 preflop + 100 river = 450 (lost at showdown) +# player6: 350 preflop + 100 river = 450 (WON) +# +# Total pot: 50+100+350+350+350 + 100+100+100 = 1500 + +GROSS_POT = 1500 + +FINAL_STACKS = { + "u1": 10361 - 450, # 9911 -- lost at showdown + "u2": 10849, # 10849 -- folded preflop without investing + "u3": 9305 - 50, # 9255 -- SB, folded preflop + "u4": 10000 - 100, # 9900 -- BB, folded preflop + "u5": 10125 - 450, # 9675 -- lost at showdown + "u6": 6919 - 450 + GROSS_POT, # 7969 -- WON +} + +WINNER_UUID = "u6" +WINNER_NAME = "player6" +WINNER_GROSS = GROSS_POT # 1500 -- correct value +WINNER_NET = 7969 - 6919 # 1050 -- wrong value (what BUG produced) + + +# --------------------------------------------------------------------------- +# Fixture: run one hand and return the winner's history file as lines +# --------------------------------------------------------------------------- + +@pytest.fixture +def history_lines(tmp_path): + """ + Calls the writer directly with the same arguments the Dealer would pass. + Returns the winner's (player6) history file as a list of lines. + """ + tmpl = str(tmp_path / "{player}.txt") + w = PokerStarsHandHistoryWriter( + output_file=tmpl, + table_name="TestTable", + ) + + # on_game_start + w.on_game_start({ + "rule": {"small_blind_amount": SB_AMOUNT}, + "seats": [{"name": n, "uuid": u} for n, u, _ in PLAYERS], + }) + + # Starting seat list (used in on_round_start and action calls) + seats_start = [ + {"uuid": u, "stack": s, "state": "participating", "name": n} + for n, u, s in PLAYERS + ] + + # Hole cards matching the reference hand (engine format: suit+rank) + hole_cards = { + "u1": ["C5", "D5"], # player1: 5c 5d + "u2": ["C2", "C3"], # player2: placeholder + "u3": ["D2", "D3"], # player3: placeholder + "u4": ["H6", "CT"], # player4: 6h Tc + "u5": ["ST", "CA"], # player5: Ts Ac + "u6": ["D8", "SA"], # player6: 8d As + } + + # on_round_start + w.on_round_start( + round_count=1, + dealer_btn=DEALER_BTN, + hole_cards_by_uuid=hole_cards, + seats=seats_start, + ) + + # -- Preflop ---------------------------------------------------------- + w.on_street_start("preflop", []) + w.on_action("u5", "raise", 350, seats_start) # player5 raises to 350 + w.on_action("u6", "call", 350, seats_start) # player6 calls 350 + w.on_action("u1", "call", 350, seats_start) # player1 calls 350 + w.on_action("u2", "fold", 0, seats_start) # player2 folds + w.on_action("u3", "fold", 0, seats_start) # player3 folds + w.on_action("u4", "fold", 0, seats_start) # player4 folds + + community_flop = ["HJ", "C4", "S8"] + community_turn = ["HJ", "C4", "S8", "CK"] + community_river = ["HJ", "C4", "S8", "CK", "H7"] + + # -- Flop: all check -------------------------------------------------- + w.on_street_start("flop", community_flop) + w.on_action("u5", "call", 0, seats_start) + w.on_action("u6", "call", 0, seats_start) + w.on_action("u1", "call", 0, seats_start) + + # -- Turn: all check -------------------------------------------------- + w.on_street_start("turn", community_turn) + w.on_action("u5", "call", 0, seats_start) + w.on_action("u6", "call", 0, seats_start) + w.on_action("u1", "call", 0, seats_start) + + # -- River: player5 bets 100, others call ----------------------------- + w.on_street_start("river", community_river) + w.on_action("u5", "raise", 100, seats_start) + w.on_action("u6", "call", 100, seats_start) + w.on_action("u1", "call", 100, seats_start) + + # -- Round result ----------------------------------------------------- + seats_final = [ + { + "uuid": u, + "stack": FINAL_STACKS[u], + "state": "participating" if u in ("u1", "u5", "u6") else "folded", + "name": n, + } + for n, u, _ in PLAYERS + ] + round_state = { + "seats": seats_final, + "community_card": community_river, + "pot": {"main": {"amount": GROSS_POT}, "side": []}, + } + hand_info = [ + {"uuid": "u5", "hand": {"hand": {"strength": "HIGHCARD"}}}, # player5 loses + {"uuid": "u6", "hand": {"hand": {"strength": "ONEPAIR"}}}, # player6 wins + {"uuid": "u1", "hand": {"hand": {"strength": "ONEPAIR"}}}, # player1 loses + ] + winners = [{"uuid": WINNER_UUID, "name": WINNER_NAME, "stack": FINAL_STACKS[WINNER_UUID]}] + w.on_round_result(winners, hand_info, round_state) + + filepath = str(tmp_path / f"{WINNER_NAME}.txt") + with open(filepath) as f: + return f.read().splitlines() + + +# --------------------------------------------------------------------------- +# Helper +# --------------------------------------------------------------------------- + +def find_line(lines, pattern): + """Returns the first line matching the pattern, or None.""" + for line in lines: + if re.search(pattern, line): + return line + return None + + +# --------------------------------------------------------------------------- +# Tests +# --------------------------------------------------------------------------- + +class TestGrossBugFix: + """Bug fix: SUMMARY collected(Y) must use the gross pot, not the net profit.""" + + def test_summary_collected_equals_gross_not_net(self, history_lines): + """ + Bug: _write_summary used net profit (final_stack - start_stack = 1050). + Fix: use the gross pot amount (pot_total = 1500). + """ + line = find_line(history_lines, r"collected \(\d+\)") + assert line is not None, "SUMMARY section is missing a 'collected (X)' line" + + amount = int(re.search(r"collected \((\d+)\)", line).group(1)) + + assert amount != WINNER_NET, ( + f"SUMMARY collected is {WINNER_NET} -- bug is NOT fixed! " + f"Expected {WINNER_GROSS} (gross)." + ) + assert amount == WINNER_GROSS, ( + f"SUMMARY collected is {amount}, expected {WINNER_GROSS} (gross)." + ) + + def test_midhand_collected_equals_gross(self, history_lines): + """Mid-hand 'collected X from pot' line must use the gross pot amount.""" + line = find_line(history_lines, r"collected \d+ from pot") + assert line is not None, "Missing 'collected X from pot' line" + + amount = int(re.search(r"collected (\d+) from pot", line).group(1)) + assert amount == GROSS_POT, ( + f"Mid-hand collected is {amount}, expected {GROSS_POT}." + ) + + def test_midhand_and_summary_collected_are_identical(self, history_lines): + """ + In PokerStars format the mid-hand and SUMMARY collected values must be identical. + This is the invariant identified in the bug report. + """ + mid_line = find_line(history_lines, r"collected \d+ from pot") + summ_line = find_line(history_lines, r"collected \(\d+\)") + assert mid_line is not None, "Missing mid-hand collected line" + assert summ_line is not None, "Missing summary collected line" + + mid_amount = int(re.search(r"collected (\d+) from pot", mid_line).group(1)) + summ_amount = int(re.search(r"collected \((\d+)\)", summ_line).group(1)) + + assert mid_amount == summ_amount, ( + f"Mid-hand ({mid_amount}) and SUMMARY ({summ_amount}) differ. " + f"PokerStars format requires these to be identical." + ) + + +class TestHandStructure: + """Verifies the overall structure of the hand history output.""" + + def test_all_sections_present(self, history_lines): + """All mandatory *** SECTION *** headers must be present.""" + text = "\n".join(history_lines) + for section in [ + "*** HOLE CARDS ***", + "*** FLOP ***", + "*** TURN ***", + "*** RIVER ***", + "*** SHOW DOWN ***", + "*** SUMMARY ***", + ]: + assert section in text, f"Missing section: {section}" + + def test_hero_hole_cards_shown(self, history_lines): + """The hero (player6) sees their own hole cards in the HOLE CARDS section.""" + line = find_line(history_lines, r"Dealt to player6") + assert line is not None, "Missing 'Dealt to player6' line" + # player6 holds 8d As + assert "8d" in line or "D8" in line.upper(), ( + f"Expected 8d in hole cards line: {line}" + ) + + def test_total_pot_correct(self, history_lines): + """The SUMMARY header must show the correct total pot.""" + line = find_line(history_lines, r"Total pot") + assert line is not None, "Missing 'Total pot' line" + pot = int(re.search(r"Total pot (\d+)", line).group(1)) + assert pot == GROSS_POT, f"Total pot is {pot}, expected {GROSS_POT}" + + def test_board_cards_in_summary(self, history_lines): + """SUMMARY must include the Board line with community cards.""" + line = find_line(history_lines, r"^Board ") + assert line is not None, "Missing 'Board [...]' line in SUMMARY" + + def test_winner_only_in_collected_lines(self, history_lines): + """Only the winner (player6) should appear on collected lines.""" + for line in history_lines: + if "collected" in line: + assert WINNER_NAME in line, ( + f"Unexpected player on collected line: {line}" + ) + + +class TestPreflopActions: + """Preflop actions are written correctly.""" + + def test_raise_written(self, history_lines): + """player5 raises to 350 must appear in the hand history.""" + line = find_line(history_lines, r"player5.*raises to 350") + assert line is not None, "Missing 'player5 raises to 350'" + + def test_calls_written(self, history_lines): + """player6 and player1 each call 350 preflop.""" + assert find_line(history_lines, r"player6.*calls 350") is not None + assert find_line(history_lines, r"player1.*calls 350") is not None + + def test_folds_written(self, history_lines): + """player2, player3, and player4 all fold preflop.""" + for name in ("player2", "player3", "player4"): + assert find_line(history_lines, rf"{name}.*folds") is not None, ( + f"Missing '{name} folds'" + ) + + +class TestPostflopActions: + """Post-flop actions are written correctly.""" + + def test_flop_checks(self, history_lines): + """All three remaining players check on the flop.""" + text = "\n".join(history_lines) + flop_start = text.find("*** FLOP ***") + turn_start = text.find("*** TURN ***") + flop_section = text[flop_start:turn_start] + assert flop_section.count(": checks") == 3, ( + f"Expected 3 checks on the flop:\n{flop_section}" + ) + + def test_river_bet_and_calls(self, history_lines): + """player5 bets 100 on the river; the other two call.""" + text = "\n".join(history_lines) + river_start = text.find("*** RIVER ***") + showdown_start = text.find("*** SHOW DOWN ***") + river_section = text[river_start:showdown_start] + assert "player5: bets 100" in river_section + assert river_section.count(": calls 100") == 2 + + def test_first_postflop_aggression_is_bet_not_raise(self, history_lines): + """In PokerStars format the first post-flop aggression is 'bets', not 'raises'.""" + text = "\n".join(history_lines) + river_start = text.find("*** RIVER ***") + showdown_start = text.find("*** SHOW DOWN ***") + river_section = text[river_start:showdown_start] + assert "bets 100" in river_section, ( + "First river aggression should be written as 'bets', not 'raises'" + ) + assert "raises to 100" not in river_section \ No newline at end of file diff --git a/tests/pypokerengine/engine/player_test.py b/tests/pypokerengine/engine/player_test.py index d765de6..59576a3 100644 --- a/tests/pypokerengine/engine/player_test.py +++ b/tests/pypokerengine/engine/player_test.py @@ -1,8 +1,8 @@ +import pytest from tests.base_unittest import BaseUnitTest from pypokerengine.engine.card import Card from pypokerengine.engine.player import Player from pypokerengine.engine.poker_constants import PokerConstants as Const -from nose.tools import * class PlayerTest(BaseUnitTest): @@ -15,18 +15,18 @@ def test_add_holecard(self): self.true(cards[0] in self.player.hole_card) self.true(cards[1] in self.player.hole_card) - @raises(ValueError) def test_add_single_hole_card(self): - self.player.add_holecard([Card.from_id(1)]) + with pytest.raises(ValueError): + self.player.add_holecard([Card.from_id(1)]) - @raises(ValueError) def test_add_too_many_hole_card(self): - self.player.add_holecard([Card.from_id(cid) for cid in range(1,4)]) + with pytest.raises(ValueError): + self.player.add_holecard([Card.from_id(cid) for cid in range(1,4)]) - @raises(ValueError) def test_add_hole_card_twice(self): - self.player.add_holecard([Card.from_id(cid) for cid in range(1,3)]) - self.player.add_holecard([Card.from_id(cid) for cid in range(1,3)]) + with pytest.raises(ValueError): + self.player.add_holecard([Card.from_id(cid) for cid in range(1,3)]) + self.player.add_holecard([Card.from_id(cid) for cid in range(1,3)]) def test_clear_holecard(self): self.player.add_holecard([Card.from_id(cid) for cid in range(1,3)]) @@ -41,9 +41,9 @@ def test_collect_bet(self): self.player.collect_bet(10) self.eq(90, self.player.stack) - @raises(ValueError) def test_collect_too_much_bet(self): - self.player.collect_bet(200) + with pytest.raises(ValueError): + self.player.collect_bet(200) def test_is_active(self): self.player.pay_info.update_by_pay(10) @@ -128,9 +128,9 @@ def test_add_ante_history(self): self.eq("ANTE", action["action"]) self.eq(10, action["amount"]) - @raises(AssertionError) def test_add_empty_ante_history(self): - self.player.add_action_history(Const.Action.ANTE, 0) + with pytest.raises(AssertionError): + self.player.add_action_history(Const.Action.ANTE, 0) def test_save_street_action_histories(self): self.assertIsNone(self.player.round_action_histories[Const.Street.PREFLOP]) diff --git a/tests/pypokerengine/engine/table_test.py b/tests/pypokerengine/engine/table_test.py index b0086d6..52b162e 100644 --- a/tests/pypokerengine/engine/table_test.py +++ b/tests/pypokerengine/engine/table_test.py @@ -1,5 +1,5 @@ +import pytest from tests.base_unittest import BaseUnitTest -from nose.tools import * from pypokerengine.engine.card import Card from pypokerengine.engine.pay_info import PayInfo @@ -43,9 +43,9 @@ def test_reset_player_status(self): self.eq(0, len(self.player.action_histories)) self.eq(PayInfo.PAY_TILL_END, self.player.pay_info.status) - @raises(ValueError) def test_community_card_exceed_size(self): - self.table.add_community_card(Card.from_id(1)) + with pytest.raises(ValueError): + self.table.add_community_card(Card.from_id(1)) def test_shift_dealer_btn_skip(self): table = self.__setup_players_with_table() diff --git a/tests/pypokerengine/utils/game_state_utils_test.py b/tests/pypokerengine/utils/game_state_utils_test.py index 16a3891..8096b54 100644 --- a/tests/pypokerengine/utils/game_state_utils_test.py +++ b/tests/pypokerengine/utils/game_state_utils_test.py @@ -1,4 +1,4 @@ -from nose.tools import raises +import pytest from tests.base_unittest import BaseUnitTest from pypokerengine.utils.game_state_utils import restore_game_state,\ attach_hole_card, replace_community_card,\ @@ -50,17 +50,17 @@ def test_attach_hole_card(self): self.eq(hole2, players[1].hole_card) self.eq([0,0], [len(p.hole_card) for p in game_state["table"].seats.players]) - @raises(Exception) def test_attach_hole_card_when_uuid_is_wrong(self): - game_state = restore_game_state(TwoPlayerSample.round_state) - attach_hole_card(game_state, "hoge", "dummy_hole") + with pytest.raises(Exception): + game_state = restore_game_state(TwoPlayerSample.round_state) + attach_hole_card(game_state, "hoge", "dummy_hole") - @raises(Exception) def test_attach_hole_card_when_same_uuid_players_exist(self): - game_state = restore_game_state(TwoPlayerSample.round_state) - p1, p2 = game_state["table"].seats.players[:2] - p2.uuid = p1.uuid - attach_hole_card(game_state, p1.uuid, "dummy_hole") + with pytest.raises(Exception): + game_state = restore_game_state(TwoPlayerSample.round_state) + p1, p2 = game_state["table"].seats.players[:2] + p2.uuid = p1.uuid + attach_hole_card(game_state, p1.uuid, "dummy_hole") def test_replace_community_card(self): game_state = restore_game_state(TwoPlayerSample.round_state)