""" commands/record.py — 게임 전체 기록 (Main Command) JSON 리포트를 읽고 관리자 사이트의 폼을 제어하여 경기 전체를 순차적으로 입력합니다. """ from __future__ import annotations import argparse import os import re from time import time, sleep from typing import Any from playwright.sync_api import Playwright, Page from core.normalizer import ( normalize_game_type, normalize_stadium_name, normalize_team_name, ) from core.pitch_classifier import infer_batter_result_label, get_last_pitch_result_text from core.field_calculator import extract_defense_sequence, extract_error_position, is_throwing_error, is_error_result from automation.page_helpers import ( get_last_visible_locator, get_checked_event_name, set_radio_by_label, show_debug_overlay, wait_for_operator_control, get_checked_batter_defense_type, get_history_count, ) from automation.defense_popup import fill_error_defense_popup, click_defense_button_robustly from automation.pitch_input import set_pitch, set_pitch_meta_only, get_pitch_runner_events from automation.batter_input import set_batter_result_type, set_batter_advancement, set_hit_ball_and_defense from automation.runner_input import set_runner_events from automation.lineup_input import apply_change_event from automation.review_input import record_review_events from commands.base import launch_browser_context, load_report, add_common_arguments _DEFENSE_CLEAR_JS = """ () => { document.querySelectorAll("input[name='defenseNumberBtn']").forEach(el => { el.checked = false; }); document.querySelectorAll("input[name='hitBallType']").forEach(el => { el.checked = false; }); const datIds = [ "putout", "assist", "error", "upstruction", "dat_putout_hitter", "dat_assist_hitter", "dat_error_hitter", "dat_upstruction_hitter", "dat_putout_runner1", "dat_assist_runner1", "dat_error_runner1", "dat_upstruction_runner1", "dat_putout_runner2", "dat_assist_runner2", "dat_error_runner2", "dat_upstruction_runner2", "dat_putout_runner3", "dat_assist_runner3", "dat_error_runner3", "dat_upstruction_runner3", "dat_error_type", "dat_error_type1", "dat_error_type2", "dat_error_type3", "dat_multiplay_type", "multiplay_type", "hitBallXY", "hitBallDistance", "dat_hitball_speed", "dat_hitball_type", "dat_hitball_x", "dat_hitball_y", "dat_hitball_xy", "dat_hitball_distance", "hitball_speed", "hitball_type", "hitball_xy", "hitball_distance" ]; datIds.forEach(id => { const el = document.getElementById(id); if (el) el.value = ""; }); document.querySelectorAll("input[name='hitBallType'], [id^='hitBallSpeed'], [id^='hitBallType']").forEach(el => { if (el.type === 'radio' || el.type === 'checkbox') { el.checked = false; } else { el.value = ""; } }); } """ def clear_defense_selections(page: Page) -> None: page.evaluate(_DEFENSE_CLEAR_JS) def is_simple_terminal_result_type(result_type: str) -> bool: return result_type in {"strikeout", "strikeout_not_out", "walk", "intentional_walk", "hit_by_pitch"} def advance_count(balls: int, strikes: int, pitch_result: str) -> tuple[int, int]: if pitch_result == "B": return min(balls + 1, 3), strikes if pitch_result in {"T", "S"}: return balls, min(strikes + 1, 2) if pitch_result == "F": return balls, strikes if strikes >= 2 else strikes + 1 return balls, strikes def find_status_href(page: Page, report: dict[str, Any], manager_game_no: str | None) -> str: game_info = report["game_info"] target_date = game_info["date"] target_stadium = normalize_stadium_name(game_info["stadium"]) target_home_team = normalize_team_name(game_info["home_team"]) target_away_team = normalize_team_name(game_info["away_team"]) target_game_type = normalize_game_type(game_info["game_type"]) rows: list[dict[str, Any]] = page.locator("table.gclist tr").evaluate_all( """(rows) => rows.slice(1).map((row) => { const cells = [...row.cells].map((cell) => cell.innerText.trim()); const statusLink = [...row.querySelectorAll('a')].find((anchor) => anchor.textContent.trim() === '게임기록'); return { gameNo: cells[0] || '', date: cells[1] || '', gameType: cells[2] || '', stadium: cells[3] || '', homeTeam: cells[4] || '', awayTeam: cells[5] || '', href: statusLink ? statusLink.getAttribute('href') : '', }; })""" ) if manager_game_no: matched = next((row for row in rows if row["gameNo"] == str(manager_game_no)), None) if not matched or not matched["href"] or matched["href"].startswith("javascript:"): raise ValueError(f"관리자 게임번호 {manager_game_no} 행의 게임기록 링크를 찾지 못했습니다.") return matched["href"] candidates = [ row for row in rows if row["href"] and not row["href"].startswith("javascript:") and row["date"] == target_date and (not row["gameType"] or normalize_game_type(row["gameType"]) == target_game_type) and normalize_stadium_name(row["stadium"]) == target_stadium and normalize_team_name(row["homeTeam"]) == target_home_team and normalize_team_name(row["awayTeam"]) == target_away_team ] if not candidates: print("\n=== 게임 매칭 실패 디버그 정보 ===") print(f"Target: Date='{target_date}', Type='{target_game_type}', Stadium='{target_stadium}', Home='{target_home_team}', Away='{target_away_team}'") print("Rows parsed from table:") for r in rows: print(f" - Date='{r['date']}', Type='{r['gameType']}'(norm: '{normalize_game_type(r['gameType'])}'), Stadium='{r['stadium']}'(norm: '{normalize_stadium_name(r['stadium'])}'), Home='{r['homeTeam']}'(norm: '{normalize_team_name(r['homeTeam'])}'), Away='{r['awayTeam']}'(norm: '{normalize_team_name(r['awayTeam'])}')") raise ValueError("목록에서 일치하는 게임기록 링크를 찾지 못했습니다.") return candidates[0]["href"] def open_game_status_page(page: Page, base_url: str, report: dict[str, Any], manager_game_no: str | None) -> None: if manager_game_no: page.goto(f"{base_url}/manager/game/status?game_no={manager_game_no}", wait_until="domcontentloaded") page.wait_for_selector("#eventWriteBtn", timeout=10000) return page.goto(f"{base_url}/manager/game/list", wait_until="domcontentloaded") page.wait_for_selector("table.gclist", timeout=10000) status_href = find_status_href(page, report, manager_game_no) with page.expect_navigation(wait_until="domcontentloaded"): page.locator(f"a[href='{status_href}']").first.click() page.wait_for_selector("#eventWriteBtn", timeout=10000) def _try_log_pitch(log_info: dict[str, Any] | None, is_success: bool, error_code: str, error_detail: str, duration: float) -> None: if log_info and log_info.get("job_id"): try: from db_logging import log_pitch log_pitch( log_info["job_id"], log_info.get("inning", ""), log_info.get("batter", ""), log_info.get("pitch_no", 0), log_info.get("target_value", ""), log_info.get("selected_value", ""), is_success, error_code, error_detail, duration ) except Exception: pass def _try_log_event(log_info: dict[str, Any] | None, is_success: bool, error_msg: str = "") -> None: if log_info and log_info.get("job_id"): try: from db_logging import log_event log_event( log_info["job_id"], log_info.get("inning", ""), log_info.get("event_type", ""), log_info.get("target_player", ""), log_info.get("actual_player", ""), is_success, error_msg ) except Exception: pass def submit_input_complete(page: Page, debug_label: str = "", clear_defense: bool = False, log_info: dict[str, Any] | None = None) -> None: t0 = time() try: page.evaluate("""() => { window.confirm = () => true; window.alert = () => {}; const defenseDiv = document.querySelector('#defenseDiv'); if (defenseDiv && defenseDiv.style.display !== 'none') { const btnAdd = document.querySelector('#btnAdd'); if (btnAdd) btnAdd.click(); } }""") if clear_defense: clear_defense_selections(page) prev_history = get_history_count(page) for i in range(40): curr_history = get_history_count(page) if curr_history > prev_history: page.wait_for_timeout(30) _try_log_pitch(log_info, True, "", "", time() - t0) return if i % 8 == 0: submit_btn = get_last_visible_locator(page, "#eventWriteBtn") if not submit_btn: submit_btn = page.get_by_role("button", name="입력완료").last if submit_btn: try: submit_btn.click(force=True, timeout=500) except Exception: page.evaluate("document.querySelector('#eventWriteBtn')?.click() || [...document.querySelectorAll('a, button')].find(el => el.innerText.includes('입력완료'))?.click()") page.wait_for_timeout(50) page.evaluate("() => { window.confirm = () => true; window.alert = () => {}; }") raise TimeoutError(f"입력완료가 반영되지 않았습니다: {debug_label}") except Exception as e: _try_log_pitch(log_info, False, type(e).__name__, str(e), time() - t0) raise e def handle_late_runner_events(page: Page, event: dict[str, Any], late_events: list[dict[str, Any]], write_events: bool, job_id: str | None = None) -> None: if not late_events or not write_events: return page.wait_for_timeout(800) from automation.page_helpers import get_last_history_text current_history = get_last_history_text(page) all_matched = True for le in late_events: le_text = le.get("text", "") if le_text and le_text not in current_history: all_matched = False break if all_matched: return new_late = set_runner_events(page, event, late_events) submit_input_complete( page, f"지연 주루 처리: {', '.join(e.get('text', '') for e in late_events)}", clear_defense=True, log_info={"job_id": job_id} if job_id else None ) if new_late: handle_late_runner_events(page, event, new_late, write_events, job_id) def build_runner_event_lines(event: dict[str, Any]) -> list[str]: from core.runner_classifier import infer_runner_action_label lines: list[str] = [] for runner_event in (event.get("runnerEvents") or []): r_text = runner_event.get("text", "") from_b = runner_event.get("fromBase", "?") to_b = runner_event.get("toBase", "?") label = infer_runner_action_label(event, runner_event) line = f"🏃 {from_b}루주자 -> {to_b}루 : {r_text}" if label: line += f" | 라벨: {label}" lines.append(line) return lines def process_only_reviews(page: Page, report: dict[str, Any], write_events: bool, job_id: str = None) -> None: all_reviews = [] from automation.review_input import _normalize_review_event for half_inning in report.get("game_contents", []): for event in half_inning.get("events", []): if event.get("event_type") == "at_bat": reviews = event.get("reviewEvents") or [] for r in reviews: all_reviews.append(_normalize_review_event(r)) if not all_reviews: show_debug_overlay(page, ["입력할 합의판정 기록이 없습니다."]) page.wait_for_timeout(2000) return show_debug_overlay(page, [f"합의판정 {len(all_reviews)}건 일괄 등록 시작"]) if write_events: record_review_events(page, all_reviews) show_debug_overlay(page, ["합의판정 일괄 등록 완료"]) page.wait_for_timeout(1000) def process_report(page: Page, report: dict[str, Any], write_events: bool, job_id: str = None) -> None: from core.pitch_classifier import normalize_pitch_result_code outs = 0 change_cache: dict[str, tuple[str, int]] = {} applied_change_texts: set[str] = set() for half_inning in report.get("game_contents", []): inning = half_inning.get("inning", "") outs = 0 for event in half_inning.get("events", []): if event.get("event_type") == "change": change_text = (event.get("text") or "").strip() show_debug_overlay(page, [f"교체 입력: {change_text or '-'}"]) wait_for_operator_control(page) if write_events: if change_text and change_text in applied_change_texts: show_debug_overlay(page, ["교체 중복 건너뜀", change_text]) page.wait_for_timeout(250) continue log_info_event = { "job_id": job_id, "inning": inning, "event_type": event.get("change_type", "change"), "target_player": event.get("in_player") or event.get("to_position", ""), "actual_player": event.get("actor_name") or event.get("player_name", "") } if job_id else None try: apply_change_event(page, half_inning, event, change_cache) _try_log_event(log_info_event, True) except Exception as e: _try_log_event(log_info_event, False, str(e)) raise e if change_text: applied_change_texts.add(change_text) show_debug_overlay(page, ["교체 완료", f"{change_text or '-'}"]) page.wait_for_timeout(120) continue if event.get("event_type") != "at_bat": continue clear_defense_selections(page) balls = 0 strikes = 0 pitches = event.get("pitches") or [] result = event.get("result") or {} for pitch_index, pitch in enumerate(pitches): pitch_result_text = (pitch.get("pitchResultText") or "").strip() normalized_pitch_result = normalize_pitch_result_code(pitch, event) is_balk_strike = "보크" in pitch_result_text and ("스트라이크" in pitch_result_text or "헛스윙" in pitch_result_text) if "피치클락" in pitch_result_text and "투수위반" in pitch_result_text: pitch_result_text = "피치클락 투수위반 볼" show_debug_overlay( page, [ f"다음 카운트: {balls}볼 {strikes}스트 {outs}아웃", f"다음 공: {pitch.get('pitchNo')}구 {pitch_result_text}", f"구종/구속: {(pitch.get('pitchType') or '-')} / {(pitch.get('speedKmh') or '-')}", f"타석: {event.get('batter') or '-'}", ], ) wait_for_operator_control(page) is_last_pitch = pitch_index == len(pitches) - 1 is_action_result = is_last_pitch and result.get("type") in { "hit_by_pitch", "strikeout", "strikeout_not_out", "walk", "intentional_walk", "out", "single", "double", "triple", "home_run", "sacrifice_fly", "sacrifice_bunt", "double_play", "reach_on_error", "reach_on_fielder_choice", "bunt_hit", "single_runner_out", "double_runner_out", "triple_runner_out" } is_in_play = (pitch.get("pitchResult") == "H") or is_action_result if is_last_pitch and is_in_play: continue if is_balk_strike: if write_events: current_late = [] p_runner_events = pitch.get("runnerEvents") if p_runner_events: current_late.extend(set_runner_events(page, event, p_runner_events)) if is_last_pitch and event.get("runnerEvents"): current_late.extend(set_runner_events(page, event)) submit_input_complete( page, f"{event.get('batter') or '-'} / {pitch.get('pitchNo')}구 보크", clear_defense=True, log_info={"job_id": job_id, "inning": inning, "batter": event.get("batter", ""), "pitch_no": pitch.get("pitchNo", 0), "target_value": "보크", "selected_value": "보크"} if job_id else None ) if current_late: handle_late_runner_events(page, event, current_late, True, job_id) page.wait_for_timeout(80) set_pitch_meta_only(page, pitch) if "헛스윙" in pitch_result_text: set_radio_by_label(page, "evt_batter", "헛스윙(스트라이크)") else: set_radio_by_label(page, "evt_batter", "스트라이크(루킹)") if write_events: submit_input_complete( page, f"{event.get('batter') or '-'} / {pitch.get('pitchNo')}구 보크 후 {'헛스윙' if '헛스윙' in pitch_result_text else '스트라이크'}", clear_defense=True, log_info={"job_id": job_id, "inning": inning, "batter": event.get("batter", ""), "pitch_no": pitch.get("pitchNo", 0), "target_value": pitch_result_text, "selected_value": pitch_result_text} if job_id else None ) else: set_pitch(page, pitch, event) if write_events: current_late = [] p_runner_events = get_pitch_runner_events(pitch, event) is_wild_pitch = any(re.get("type") == "wild_pitch_advance" or "폭투" in (re.get("text") or "") for re in p_runner_events) is_passed_ball = any(re.get("type") == "passed_ball_advance" or "포일" in (re.get("text") or "") for re in p_runner_events) extra_log = " (폭투)" if is_wild_pitch else " (포일)" if is_passed_ball else "" if p_runner_events: current_late.extend(set_runner_events(page, event, p_runner_events)) if is_last_pitch and event.get("runnerEvents"): current_late.extend(set_runner_events(page, event)) if "파울플라이" in pitch_result_text and "실책" in pitch_result_text: fill_error_defense_popup(page, pitch_result_text) submit_input_complete( page, f"{event.get('batter') or '-'} / {pitch.get('pitchNo')}구 {pitch_result_text or '-'}{extra_log}", clear_defense=True, log_info={"job_id": job_id, "inning": inning, "batter": event.get("batter", ""), "pitch_no": pitch.get("pitchNo", 0), "target_value": f"{pitch_result_text}{extra_log}", "selected_value": "폭투-볼" if is_wild_pitch else "포일-볼" if is_passed_ball else pitch_result_text} if job_id else None ) if current_late: handle_late_runner_events(page, event, current_late, True, job_id) balls, strikes = advance_count(balls, strikes, normalized_pitch_result) if result: last_pitch = pitches[-1] if pitches else {} action_result_types = { "hit_by_pitch", "strikeout", "strikeout_not_out", "walk", "intentional_walk", "out", "single", "double", "triple", "home_run", "sacrifice_fly", "sacrifice_bunt", "double_play", "reach_on_error", "reach_on_fielder_choice", "reach_on_grounder", "bunt_hit", "single_runner_out", "double_runner_out", "triple_runner_out", "play" } if last_pitch.get("pitchResult") == "H" or result.get("type") in action_result_types: runner_lines = build_runner_event_lines(event) result_text = result.get('text') or '' def_seq = [] if is_error_result(result_text): err_pos = extract_error_position(result_text) if err_pos: click_count = 2 if is_throwing_error(result_text) else 1 def_seq = [err_pos] * click_count elif result.get("type") in {"out", "double_play", "sacrifice_fly", "sacrifice_bunt", "strikeout", "reach_on_fielder_choice", "reach_on_grounder", "single_runner_out", "double_runner_out", "triple_runner_out", "play"}: def_seq = extract_defense_sequence(result_text) if "직선타" in result_text or "라인드라이브" in result_text or "플라이" in result_text: if def_seq: def_seq = [def_seq[0]] defense_lines = [f"⚾ 누를 수비수: {', '.join(def_seq)}"] if def_seq else [] show_debug_overlay( page, [ f"📌 타격 결과: {result_text or '-'}", f"🎯 현재 카운트: {balls}B {strikes}S {outs}O", *defense_lines, *runner_lines, ], ) wait_for_operator_control(page) set_pitch_meta_only(page, last_pitch) if write_events: page.wait_for_timeout(120) simple_terminal_result = is_simple_terminal_result_type(result.get("type") or "") expected_batter_event = infer_batter_result_label(result, event) or "" if expected_batter_event: for _ in range(5): set_batter_result_type(page, result, event) page.wait_for_timeout(50) if get_checked_event_name(page, "evt_batter") == expected_batter_event: break else: set_batter_result_type(page, result, event) page.wait_for_timeout(50) popup_defense_used = False if not simple_terminal_result: if result.get("type") in {"reach_on_error", "double_play"} or get_checked_batter_defense_type(page): set_hit_ball_and_defense(page, event) popup_defense_used = True set_batter_advancement(page, result) current_late = [] all_runner_events = (event.get("runnerEvents") or []).copy() if last_pitch.get("runnerEvents"): all_runner_events.extend(last_pitch["runnerEvents"]) if all_runner_events: current_late.extend(set_runner_events(page, event, all_runner_events)) page.wait_for_timeout(30) submit_input_complete( page, f"{event.get('batter') or '-'} / {result_text or '-'}", clear_defense=not popup_defense_used, log_info={"job_id": job_id, "inning": inning, "batter": event.get("batter", ""), "pitch_no": (last_pitch.get("pitchNo", 0) if isinstance(last_pitch, dict) else 0), "target_value": result_text, "selected_value": expected_batter_event} if job_id else None ) if current_late: handle_late_runner_events(page, event, current_late, True, job_id) result_type = result.get("type") result_text = (result.get("text") or "").strip() if result_type == "double_play": outs += 2 if outs >= 3: break elif "낫아웃" in result_text and not any(token in result_text for token in ("폭투", "포일", "진루", "출루", "세이프")): outs += 1 if outs >= 3: break elif result_type in {"out", "strikeout", "sacrifice_fly", "sacrifice_bunt", "single_runner_out", "double_runner_out", "triple_runner_out"}: outs += 1 if outs >= 3: break def run(playwright: Playwright, args: argparse.Namespace, report: dict[str, Any]) -> None: browser = launch_browser_context(playwright, args.user_data_dir, args.channel, args.headless) page = browser.pages[0] if browser.pages else browser.new_page() job_id = getattr(args, "job_id", None) or os.environ.get("JOB_ID") if args.write_events and not job_id: try: import db_logging import uuid job_id = f"standalone-{uuid.uuid4().hex[:8]}" db_logging.start_job(job_id, args.game_id, getattr(args, "start_inning", ""), getattr(args, "end_inning", "")) except Exception: job_id = None try: open_game_status_page(page, args.base_url, report, args.manager_game_no) if getattr(args, "review_only", False): process_only_reviews(page, report, args.write_events, job_id=job_id) else: process_report(page, report, args.write_events, job_id=job_id) if not args.close: page.wait_for_timeout(3600 * 1000) finally: if args.write_events and job_id: try: import db_logging db_logging.finish_job(job_id) except Exception: pass if args.close: try: browser.close() except Exception: pass