Files
2026-05-02 16:24:42 +09:00

557 lines
27 KiB
Python

"""
commands/record.py — 게임 전체 기록 (Main Command)
JSON 리포트를 읽고 관리자 사이트의 폼을 제어하여
경기 전체를 순차적으로 입력합니다.
"""
from __future__ import annotations
import argparse
import os
import re
from time import time, sleep
from typing import Any
from playwright.sync_api import Playwright, Page
from core.normalizer import (
normalize_game_type,
normalize_stadium_name,
normalize_team_name,
)
from core.pitch_classifier import infer_batter_result_label, get_last_pitch_result_text
from core.field_calculator import extract_defense_sequence, extract_error_position, is_throwing_error, is_error_result
from automation.page_helpers import (
get_last_visible_locator,
get_checked_event_name,
set_radio_by_label,
show_debug_overlay,
wait_for_operator_control,
get_checked_batter_defense_type,
get_history_count,
)
from automation.defense_popup import fill_error_defense_popup, click_defense_button_robustly
from automation.pitch_input import set_pitch, set_pitch_meta_only, get_pitch_runner_events
from automation.batter_input import set_batter_result_type, set_batter_advancement, set_hit_ball_and_defense
from automation.runner_input import set_runner_events
from automation.lineup_input import apply_change_event
from automation.review_input import record_review_events
from commands.base import launch_browser_context, load_report, add_common_arguments
_DEFENSE_CLEAR_JS = """
() => {
document.querySelectorAll("input[name='defenseNumberBtn']").forEach(el => { el.checked = false; });
document.querySelectorAll("input[name='hitBallType']").forEach(el => { el.checked = false; });
const datIds = [
"putout", "assist", "error", "upstruction",
"dat_putout_hitter", "dat_assist_hitter", "dat_error_hitter", "dat_upstruction_hitter",
"dat_putout_runner1", "dat_assist_runner1", "dat_error_runner1", "dat_upstruction_runner1",
"dat_putout_runner2", "dat_assist_runner2", "dat_error_runner2", "dat_upstruction_runner2",
"dat_putout_runner3", "dat_assist_runner3", "dat_error_runner3", "dat_upstruction_runner3",
"dat_error_type", "dat_error_type1", "dat_error_type2", "dat_error_type3",
"dat_multiplay_type", "multiplay_type",
"hitBallXY", "hitBallDistance",
"dat_hitball_speed", "dat_hitball_type", "dat_hitball_x", "dat_hitball_y", "dat_hitball_xy", "dat_hitball_distance",
"hitball_speed", "hitball_type", "hitball_xy", "hitball_distance"
];
datIds.forEach(id => {
const el = document.getElementById(id);
if (el) el.value = "";
});
document.querySelectorAll("input[name='hitBallType'], [id^='hitBallSpeed'], [id^='hitBallType']").forEach(el => {
if (el.type === 'radio' || el.type === 'checkbox') {
el.checked = false;
} else {
el.value = "";
}
});
}
"""
def clear_defense_selections(page: Page) -> None:
page.evaluate(_DEFENSE_CLEAR_JS)
def is_simple_terminal_result_type(result_type: str) -> bool:
return result_type in {"strikeout", "strikeout_not_out", "walk", "intentional_walk", "hit_by_pitch"}
def advance_count(balls: int, strikes: int, pitch_result: str) -> tuple[int, int]:
if pitch_result == "B":
return min(balls + 1, 3), strikes
if pitch_result in {"T", "S"}:
return balls, min(strikes + 1, 2)
if pitch_result == "F":
return balls, strikes if strikes >= 2 else strikes + 1
return balls, strikes
def find_status_href(page: Page, report: dict[str, Any], manager_game_no: str | None) -> str:
game_info = report["game_info"]
target_date = game_info["date"]
target_stadium = normalize_stadium_name(game_info["stadium"])
target_home_team = normalize_team_name(game_info["home_team"])
target_away_team = normalize_team_name(game_info["away_team"])
target_game_type = normalize_game_type(game_info["game_type"])
rows: list[dict[str, Any]] = page.locator("table.gclist tr").evaluate_all(
"""(rows) => rows.slice(1).map((row) => {
const cells = [...row.cells].map((cell) => cell.innerText.trim());
const statusLink = [...row.querySelectorAll('a')].find((anchor) => anchor.textContent.trim() === '게임기록');
return {
gameNo: cells[0] || '',
date: cells[1] || '',
gameType: cells[2] || '',
stadium: cells[3] || '',
homeTeam: cells[4] || '',
awayTeam: cells[5] || '',
href: statusLink ? statusLink.getAttribute('href') : '',
};
})"""
)
if manager_game_no:
matched = next((row for row in rows if row["gameNo"] == str(manager_game_no)), None)
if not matched or not matched["href"] or matched["href"].startswith("javascript:"):
raise ValueError(f"관리자 게임번호 {manager_game_no} 행의 게임기록 링크를 찾지 못했습니다.")
return matched["href"]
candidates = [
row for row in rows if row["href"] and not row["href"].startswith("javascript:")
and row["date"] == target_date
and (not row["gameType"] or normalize_game_type(row["gameType"]) == target_game_type)
and normalize_stadium_name(row["stadium"]) == target_stadium
and normalize_team_name(row["homeTeam"]) == target_home_team
and normalize_team_name(row["awayTeam"]) == target_away_team
]
if not candidates:
print("\n=== 게임 매칭 실패 디버그 정보 ===")
print(f"Target: Date='{target_date}', Type='{target_game_type}', Stadium='{target_stadium}', Home='{target_home_team}', Away='{target_away_team}'")
print("Rows parsed from table:")
for r in rows:
print(f" - Date='{r['date']}', Type='{r['gameType']}'(norm: '{normalize_game_type(r['gameType'])}'), Stadium='{r['stadium']}'(norm: '{normalize_stadium_name(r['stadium'])}'), Home='{r['homeTeam']}'(norm: '{normalize_team_name(r['homeTeam'])}'), Away='{r['awayTeam']}'(norm: '{normalize_team_name(r['awayTeam'])}')")
raise ValueError("목록에서 일치하는 게임기록 링크를 찾지 못했습니다.")
return candidates[0]["href"]
def open_game_status_page(page: Page, base_url: str, report: dict[str, Any], manager_game_no: str | None) -> None:
if manager_game_no:
page.goto(f"{base_url}/manager/game/status?game_no={manager_game_no}", wait_until="domcontentloaded")
page.wait_for_selector("#eventWriteBtn", timeout=10000)
return
page.goto(f"{base_url}/manager/game/list", wait_until="domcontentloaded")
page.wait_for_selector("table.gclist", timeout=10000)
status_href = find_status_href(page, report, manager_game_no)
with page.expect_navigation(wait_until="domcontentloaded"):
page.locator(f"a[href='{status_href}']").first.click()
page.wait_for_selector("#eventWriteBtn", timeout=10000)
def _try_log_pitch(log_info: dict[str, Any] | None, is_success: bool, error_code: str, error_detail: str, duration: float) -> None:
if log_info and log_info.get("job_id"):
try:
from db_logging import log_pitch
log_pitch(
log_info["job_id"], log_info.get("inning", ""), log_info.get("batter", ""),
log_info.get("pitch_no", 0), log_info.get("target_value", ""), log_info.get("selected_value", ""),
is_success, error_code, error_detail, duration
)
except Exception:
pass
def _try_log_event(log_info: dict[str, Any] | None, is_success: bool, error_msg: str = "") -> None:
if log_info and log_info.get("job_id"):
try:
from db_logging import log_event
log_event(
log_info["job_id"], log_info.get("inning", ""), log_info.get("event_type", ""),
log_info.get("target_player", ""), log_info.get("actual_player", ""),
is_success, error_msg
)
except Exception:
pass
def submit_input_complete(page: Page, debug_label: str = "", clear_defense: bool = False, log_info: dict[str, Any] | None = None) -> None:
t0 = time()
try:
page.evaluate("""() => {
window.confirm = () => true;
window.alert = () => {};
const defenseDiv = document.querySelector('#defenseDiv');
if (defenseDiv && defenseDiv.style.display !== 'none') {
const btnAdd = document.querySelector('#btnAdd');
if (btnAdd) btnAdd.click();
}
}""")
if clear_defense:
clear_defense_selections(page)
prev_history = get_history_count(page)
for i in range(40):
curr_history = get_history_count(page)
if curr_history > prev_history:
page.wait_for_timeout(30)
_try_log_pitch(log_info, True, "", "", time() - t0)
return
if i % 8 == 0:
submit_btn = get_last_visible_locator(page, "#eventWriteBtn")
if not submit_btn:
submit_btn = page.get_by_role("button", name="입력완료").last
if submit_btn:
try:
submit_btn.click(force=True, timeout=500)
except Exception:
page.evaluate("document.querySelector('#eventWriteBtn')?.click() || [...document.querySelectorAll('a, button')].find(el => el.innerText.includes('입력완료'))?.click()")
page.wait_for_timeout(50)
page.evaluate("() => { window.confirm = () => true; window.alert = () => {}; }")
raise TimeoutError(f"입력완료가 반영되지 않았습니다: {debug_label}")
except Exception as e:
_try_log_pitch(log_info, False, type(e).__name__, str(e), time() - t0)
raise e
def handle_late_runner_events(page: Page, event: dict[str, Any], late_events: list[dict[str, Any]], write_events: bool, job_id: str | None = None) -> None:
if not late_events or not write_events:
return
page.wait_for_timeout(800)
from automation.page_helpers import get_last_history_text
current_history = get_last_history_text(page)
all_matched = True
for le in late_events:
le_text = le.get("text", "")
if le_text and le_text not in current_history:
all_matched = False
break
if all_matched:
return
new_late = set_runner_events(page, event, late_events)
submit_input_complete(
page,
f"지연 주루 처리: {', '.join(e.get('text', '') for e in late_events)}",
clear_defense=True,
log_info={"job_id": job_id} if job_id else None
)
if new_late:
handle_late_runner_events(page, event, new_late, write_events, job_id)
def build_runner_event_lines(event: dict[str, Any]) -> list[str]:
from core.runner_classifier import infer_runner_action_label
lines: list[str] = []
for runner_event in (event.get("runnerEvents") or []):
r_text = runner_event.get("text", "")
from_b = runner_event.get("fromBase", "?")
to_b = runner_event.get("toBase", "?")
label = infer_runner_action_label(event, runner_event)
line = f"🏃 {from_b}루주자 -> {to_b}루 : {r_text}"
if label:
line += f" | 라벨: {label}"
lines.append(line)
return lines
def process_only_reviews(page: Page, report: dict[str, Any], write_events: bool, job_id: str = None) -> None:
all_reviews = []
from automation.review_input import _normalize_review_event
for half_inning in report.get("game_contents", []):
for event in half_inning.get("events", []):
if event.get("event_type") == "at_bat":
reviews = event.get("reviewEvents") or []
for r in reviews:
all_reviews.append(_normalize_review_event(r))
if not all_reviews:
show_debug_overlay(page, ["입력할 합의판정 기록이 없습니다."])
page.wait_for_timeout(2000)
return
show_debug_overlay(page, [f"합의판정 {len(all_reviews)}건 일괄 등록 시작"])
if write_events:
record_review_events(page, all_reviews)
show_debug_overlay(page, ["합의판정 일괄 등록 완료"])
page.wait_for_timeout(1000)
def process_report(page: Page, report: dict[str, Any], write_events: bool, job_id: str = None) -> None:
from core.pitch_classifier import normalize_pitch_result_code
outs = 0
change_cache: dict[str, tuple[str, int]] = {}
applied_change_texts: set[str] = set()
for half_inning in report.get("game_contents", []):
inning = half_inning.get("inning", "")
outs = 0
for event in half_inning.get("events", []):
if event.get("event_type") == "change":
change_text = (event.get("text") or "").strip()
show_debug_overlay(page, [f"교체 입력: {change_text or '-'}"])
wait_for_operator_control(page)
if write_events:
if change_text and change_text in applied_change_texts:
show_debug_overlay(page, ["교체 중복 건너뜀", change_text])
page.wait_for_timeout(250)
continue
log_info_event = {
"job_id": job_id, "inning": inning, "event_type": event.get("change_type", "change"),
"target_player": event.get("in_player") or event.get("to_position", ""),
"actual_player": event.get("actor_name") or event.get("player_name", "")
} if job_id else None
try:
apply_change_event(page, half_inning, event, change_cache)
_try_log_event(log_info_event, True)
except Exception as e:
_try_log_event(log_info_event, False, str(e))
raise e
if change_text:
applied_change_texts.add(change_text)
show_debug_overlay(page, ["교체 완료", f"{change_text or '-'}"])
page.wait_for_timeout(120)
continue
if event.get("event_type") != "at_bat":
continue
clear_defense_selections(page)
balls = 0
strikes = 0
pitches = event.get("pitches") or []
result = event.get("result") or {}
for pitch_index, pitch in enumerate(pitches):
pitch_result_text = (pitch.get("pitchResultText") or "").strip()
normalized_pitch_result = normalize_pitch_result_code(pitch, event)
is_balk_strike = "보크" in pitch_result_text and ("스트라이크" in pitch_result_text or "헛스윙" in pitch_result_text)
if "피치클락" in pitch_result_text and "투수위반" in pitch_result_text:
pitch_result_text = "피치클락 투수위반 볼"
show_debug_overlay(
page,
[
f"다음 카운트: {balls}{strikes}스트 {outs}아웃",
f"다음 공: {pitch.get('pitchNo')}{pitch_result_text}",
f"구종/구속: {(pitch.get('pitchType') or '-')} / {(pitch.get('speedKmh') or '-')}",
f"타석: {event.get('batter') or '-'}",
],
)
wait_for_operator_control(page)
is_last_pitch = pitch_index == len(pitches) - 1
is_action_result = is_last_pitch and result.get("type") in {
"hit_by_pitch", "strikeout", "strikeout_not_out", "walk", "intentional_walk", "out", "single", "double", "triple",
"home_run", "sacrifice_fly", "sacrifice_bunt", "double_play", "reach_on_error", "reach_on_fielder_choice", "bunt_hit",
"single_runner_out", "double_runner_out", "triple_runner_out"
}
is_in_play = (pitch.get("pitchResult") == "H") or is_action_result
if is_last_pitch and is_in_play:
continue
if is_balk_strike:
if write_events:
current_late = []
p_runner_events = pitch.get("runnerEvents")
if p_runner_events:
current_late.extend(set_runner_events(page, event, p_runner_events))
if is_last_pitch and event.get("runnerEvents"):
current_late.extend(set_runner_events(page, event))
submit_input_complete(
page, f"{event.get('batter') or '-'} / {pitch.get('pitchNo')}구 보크", clear_defense=True,
log_info={"job_id": job_id, "inning": inning, "batter": event.get("batter", ""), "pitch_no": pitch.get("pitchNo", 0), "target_value": "보크", "selected_value": "보크"} if job_id else None
)
if current_late:
handle_late_runner_events(page, event, current_late, True, job_id)
page.wait_for_timeout(80)
set_pitch_meta_only(page, pitch)
if "헛스윙" in pitch_result_text:
set_radio_by_label(page, "evt_batter", "헛스윙(스트라이크)")
else:
set_radio_by_label(page, "evt_batter", "스트라이크(루킹)")
if write_events:
submit_input_complete(
page, f"{event.get('batter') or '-'} / {pitch.get('pitchNo')}구 보크 후 {'헛스윙' if '헛스윙' in pitch_result_text else '스트라이크'}", clear_defense=True,
log_info={"job_id": job_id, "inning": inning, "batter": event.get("batter", ""), "pitch_no": pitch.get("pitchNo", 0), "target_value": pitch_result_text, "selected_value": pitch_result_text} if job_id else None
)
else:
set_pitch(page, pitch, event)
if write_events:
current_late = []
p_runner_events = get_pitch_runner_events(pitch, event)
is_wild_pitch = any(re.get("type") == "wild_pitch_advance" or "폭투" in (re.get("text") or "") for re in p_runner_events)
is_passed_ball = any(re.get("type") == "passed_ball_advance" or "포일" in (re.get("text") or "") for re in p_runner_events)
extra_log = " (폭투)" if is_wild_pitch else " (포일)" if is_passed_ball else ""
if p_runner_events:
current_late.extend(set_runner_events(page, event, p_runner_events))
if is_last_pitch and event.get("runnerEvents"):
current_late.extend(set_runner_events(page, event))
if "파울플라이" in pitch_result_text and "실책" in pitch_result_text:
fill_error_defense_popup(page, pitch_result_text)
submit_input_complete(
page, f"{event.get('batter') or '-'} / {pitch.get('pitchNo')}{pitch_result_text or '-'}{extra_log}", clear_defense=True,
log_info={"job_id": job_id, "inning": inning, "batter": event.get("batter", ""), "pitch_no": pitch.get("pitchNo", 0), "target_value": f"{pitch_result_text}{extra_log}", "selected_value": "폭투-볼" if is_wild_pitch else "포일-볼" if is_passed_ball else pitch_result_text} if job_id else None
)
if current_late:
handle_late_runner_events(page, event, current_late, True, job_id)
balls, strikes = advance_count(balls, strikes, normalized_pitch_result)
if result:
last_pitch = pitches[-1] if pitches else {}
action_result_types = {
"hit_by_pitch", "strikeout", "strikeout_not_out", "walk", "intentional_walk", "out", "single", "double", "triple",
"home_run", "sacrifice_fly", "sacrifice_bunt", "double_play", "reach_on_error", "reach_on_fielder_choice", "reach_on_grounder", "bunt_hit",
"single_runner_out", "double_runner_out", "triple_runner_out", "play"
}
if last_pitch.get("pitchResult") == "H" or result.get("type") in action_result_types:
runner_lines = build_runner_event_lines(event)
result_text = result.get('text') or ''
def_seq = []
if is_error_result(result_text):
err_pos = extract_error_position(result_text)
if err_pos:
click_count = 2 if is_throwing_error(result_text) else 1
def_seq = [err_pos] * click_count
elif result.get("type") in {"out", "double_play", "sacrifice_fly", "sacrifice_bunt", "strikeout", "reach_on_fielder_choice", "reach_on_grounder", "single_runner_out", "double_runner_out", "triple_runner_out", "play"}:
def_seq = extract_defense_sequence(result_text)
if "직선타" in result_text or "라인드라이브" in result_text or "플라이" in result_text:
if def_seq:
def_seq = [def_seq[0]]
defense_lines = [f"⚾ 누를 수비수: {', '.join(def_seq)}"] if def_seq else []
show_debug_overlay(
page,
[
f"📌 타격 결과: {result_text or '-'}",
f"🎯 현재 카운트: {balls}B {strikes}S {outs}O",
*defense_lines,
*runner_lines,
],
)
wait_for_operator_control(page)
set_pitch_meta_only(page, last_pitch)
if write_events:
page.wait_for_timeout(120)
simple_terminal_result = is_simple_terminal_result_type(result.get("type") or "")
expected_batter_event = infer_batter_result_label(result, event) or ""
if expected_batter_event:
for _ in range(5):
set_batter_result_type(page, result, event)
page.wait_for_timeout(50)
if get_checked_event_name(page, "evt_batter") == expected_batter_event:
break
else:
set_batter_result_type(page, result, event)
page.wait_for_timeout(50)
popup_defense_used = False
if not simple_terminal_result:
if result.get("type") in {"reach_on_error", "double_play"} or get_checked_batter_defense_type(page):
set_hit_ball_and_defense(page, event)
popup_defense_used = True
set_batter_advancement(page, result)
current_late = []
all_runner_events = (event.get("runnerEvents") or []).copy()
if last_pitch.get("runnerEvents"):
all_runner_events.extend(last_pitch["runnerEvents"])
if all_runner_events:
current_late.extend(set_runner_events(page, event, all_runner_events))
page.wait_for_timeout(30)
submit_input_complete(
page, f"{event.get('batter') or '-'} / {result_text or '-'}", clear_defense=not popup_defense_used,
log_info={"job_id": job_id, "inning": inning, "batter": event.get("batter", ""), "pitch_no": (last_pitch.get("pitchNo", 0) if isinstance(last_pitch, dict) else 0), "target_value": result_text, "selected_value": expected_batter_event} if job_id else None
)
if current_late:
handle_late_runner_events(page, event, current_late, True, job_id)
result_type = result.get("type")
result_text = (result.get("text") or "").strip()
if result_type == "double_play":
outs += 2
if outs >= 3:
break
elif "낫아웃" in result_text and not any(token in result_text for token in ("폭투", "포일", "진루", "출루", "세이프")):
outs += 1
if outs >= 3:
break
elif result_type in {"out", "strikeout", "sacrifice_fly", "sacrifice_bunt", "single_runner_out", "double_runner_out", "triple_runner_out"}:
outs += 1
if outs >= 3:
break
def run(playwright: Playwright, args: argparse.Namespace, report: dict[str, Any]) -> None:
browser = launch_browser_context(playwright, args.user_data_dir, args.channel, args.headless)
page = browser.pages[0] if browser.pages else browser.new_page()
job_id = getattr(args, "job_id", None) or os.environ.get("JOB_ID")
if args.write_events and not job_id:
try:
import db_logging
import uuid
job_id = f"standalone-{uuid.uuid4().hex[:8]}"
db_logging.start_job(job_id, args.game_id, getattr(args, "start_inning", ""), getattr(args, "end_inning", ""))
except Exception:
job_id = None
try:
open_game_status_page(page, args.base_url, report, args.manager_game_no)
if getattr(args, "review_only", False):
process_only_reviews(page, report, args.write_events, job_id=job_id)
else:
process_report(page, report, args.write_events, job_id=job_id)
if not args.close:
page.wait_for_timeout(3600 * 1000)
finally:
if args.write_events and job_id:
try:
import db_logging
db_logging.finish_job(job_id)
except Exception:
pass
if args.close:
try:
browser.close()
except Exception:
pass