Files
baseball-automation/register_game_playwright.py
2026-05-02 11:12:13 +09:00

487 lines
18 KiB
Python

from __future__ import annotations
import argparse
import json
import re
from datetime import datetime
from pathlib import Path
from typing import Any
from playwright.sync_api import Error, Page, Playwright, TimeoutError, sync_playwright
from browser_launch import launch_browser_context
DEFAULT_GAME_ID = "20250425LTOB02025"
DEFAULT_REPORT_DIR = Path("output")
DEFAULT_BASE_URL = "http://58.229.253.168:8089"
TEAM_NAME_MAP = {
"키움": "Hero",
"키움 히어로즈": "Hero",
"Hero": "Hero",
}
GAME_TYPE_MAP = {
"와일드카드": "와일드카드 결정전",
}
POSITION_NUMBER_MAP = {
"투수": "1",
"포수": "2",
"1루수": "3",
"2루수": "4",
"3루수": "5",
"유격수": "6",
"좌익수": "7",
"중견수": "8",
"우익수": "9",
"지명타자": "10",
}
STADIUM_NAME_MAP = {
"고척": "고척돔",
"고척스카이돔": "고척돔",
"잠실": "잠실",
"대구 삼성 라이온즈 파크": "대구라팍",
"대구라이온즈파크": "대구라팍",
"대구 라팍": "대구라팍",
"대구삼성라이온즈파크": "대구라팍",
"수원 케이티 위즈 파크": "수원",
"수원KT위즈파크": "수원",
"수원kt위즈파크": "수원",
"창원NC파크": "창원",
"창원 nc 파크": "창원",
"창원 NC 파크": "창원",
"대전 한화생명 볼파크": "대전",
"대전한화생명볼파크": "대전",
"대전 한화생명 이글스파크": "한밭(~2024)",
"대전한화생명이글스파크": "한밭(~2024)",
"인천": "문학",
"인천 SSG 랜더스필드": "문학",
"인천SSG랜더스필드": "문학",
"문학": "문학",
"광주-기아 챔피언스 필드": "광주",
"광주 기아 챔피언스 필드": "광주",
"광주KIA챔피언스필드": "광주",
"광주 kia 챔피언스 필드": "광주",
"사직야구장": "사직",
"사직": "사직",
"울산문수야구장": "울산",
"울산 문수야구장": "울산",
"울산": "울산",
"포항야구장": "포항",
"포항": "포항",
"마산야구장": "마산",
"마산": "마산",
"군산월명야구장": "군산",
"군산": "군산",
"청주야구장": "청주",
"청주": "청주",
"잠실야구장": "잠실",
"목동야구장": "목동",
"목동": "목동",
"무등야구장": "무등",
"무등": "무등",
"대구시민야구장": "대구",
"대구": "대구",
}
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="신규 등록으로 경기 생성 후 수정 화면에서 라인업까지 자동 입력합니다."
)
parser.add_argument("--game-id", default=DEFAULT_GAME_ID, help="예: 20250425NCSS02025")
parser.add_argument(
"--report-path",
help="기본값: output/<game_id>_report.json",
)
parser.add_argument("--base-url", default=DEFAULT_BASE_URL, help="관리자 사이트 기본 URL")
parser.add_argument("--manager-game-no", help="관리자 게임번호. 있으면 해당 행의 수정 버튼으로 진입")
parser.add_argument(
"--user-data-dir",
default="playwright-user-data",
help="로그인 세션을 유지할 Chromium 사용자 데이터 폴더",
)
parser.add_argument(
"--channel",
default="chrome",
help="브라우저 채널. 예: chrome, msedge",
)
parser.add_argument("--end-time", help="종료시간 HH:MM 형식. 예: 21:47")
parser.add_argument("--end-hour", help="종료시간 시. 예: 22")
parser.add_argument("--end-minute", help="종료시간 분. 예: 15")
parser.add_argument("--attendance", help="관중 수. 예: 12500")
parser.add_argument("--headless", action="store_true", help="헤드리스 모드")
parser.add_argument("--save", dest="save", action="store_true", help="라인업 저장 버튼까지 클릭")
parser.add_argument("--no-save", dest="save", action="store_false", help="저장 직전까지만 입력")
parser.add_argument("--close", action="store_true", help="작업 후 브라우저를 닫음")
parser.set_defaults(save=False)
return parser.parse_args()
def load_report(report_path: Path) -> dict[str, Any]:
return json.loads(report_path.read_text(encoding="utf-8"))
def normalize_team_name(name: str) -> str:
return TEAM_NAME_MAP.get(name, name)
def normalize_game_type(name: str) -> str:
return GAME_TYPE_MAP.get(name, name)
def normalize_stadium_name(name: str) -> str:
return STADIUM_NAME_MAP.get(name, name)
def split_time(iso_time: str | None) -> tuple[str, str]:
if not iso_time:
return "00", "00"
dt = datetime.fromisoformat(iso_time)
return f"{dt.hour:02d}", f"{dt.minute:02d}"
def resolve_end_time(game_info: dict[str, Any], args: argparse.Namespace) -> tuple[str, str]:
if args.end_time:
hour_text, minute_text = args.end_time.split(":", 1)
return f"{int(hour_text):02d}", f"{int(minute_text):02d}"
if args.end_hour is not None and args.end_minute is not None:
return f"{int(args.end_hour):02d}", f"{int(args.end_minute):02d}"
if game_info.get("end_time"):
return split_time(game_info["end_time"])
raise ValueError("종료시간이 없습니다. --end-time HH:MM 또는 --end-hour/--end-minute를 지정하세요.")
def get_report_path(args: argparse.Namespace) -> Path:
if args.report_path:
return Path(args.report_path)
return DEFAULT_REPORT_DIR / f"{args.game_id}_report.json"
def select_by_label(page: Page, selector: str, label: str) -> None:
loc = page.locator(selector)
try:
loc.click(timeout=1000)
except Exception:
pass
loc.select_option(label=label)
loc.dispatch_event("change")
def normalize_number_text(number: str | int | None) -> str:
text = str(number or "").strip()
digits = "".join(char for char in text if char.isdigit())
if not digits:
return ""
return str(int(digits))
def normalize_player_name_text(name: str | None) -> str:
text = (name or "").replace("*", "").strip()
text = re.sub(r"\([^)]*\)\s*$", "", text).strip()
return text
def normalize_option_player_text(text: str) -> tuple[str, str]:
stripped = " ".join(text.split())
matched = re.match(r"^(.*?)\s*\[(\d+)번\]$", stripped)
if matched:
return normalize_player_name_text(matched.group(1)), normalize_number_text(matched.group(2))
return normalize_player_name_text(stripped), ""
def infer_option_role_hint(text: str) -> str:
stripped = " ".join(text.split())
matched = re.search(r"\(([^)]*)\)\s*(?:\[\d+번\])?$", stripped)
if not matched:
return ""
hint = matched.group(1).strip()
if hint == "":
return "pitcher"
if hint == "":
return "batter"
return ""
def infer_target_role_hint(position_name: str | None) -> str:
if position_name == "투수":
return "pitcher"
return "batter"
def get_select_options(page: Page, selector: str) -> list[dict[str, str]]:
return page.locator(selector).evaluate(
"""(el) => [...el.options].map(option => ({
value: option.value,
text: option.textContent.trim()
}))"""
)
def select_player_option(
page: Page,
selector: str,
player_name: str,
player_number: str | None,
position_name: str | None = None,
) -> None:
options = get_select_options(page, selector)
target_number = normalize_number_text(player_number)
normalized_player_name = normalize_player_name_text(player_name)
target_role_hint = infer_target_role_hint(position_name)
name_matches = []
role_filtered_matches = []
for option in options:
option_name, option_number = normalize_option_player_text(option["text"])
if option_name != normalized_player_name:
continue
name_matches.append(option)
option_role_hint = infer_option_role_hint(option["text"])
role_matches = (
not target_role_hint
or not option_role_hint
or option_role_hint == target_role_hint
)
if role_matches:
role_filtered_matches.append(option)
if target_number and option_number == target_number:
if not option_role_hint or option_role_hint == target_role_hint:
page.locator(selector).select_option(value=option["value"])
return
if len(role_filtered_matches) == 1:
page.locator(selector).select_option(value=role_filtered_matches[0]["value"])
return
if len(name_matches) == 1:
page.locator(selector).select_option(value=name_matches[0]["value"])
return
normalized_options = [normalize_option_player_text(option["text"]) for option in options]
similar_options = [
option["text"]
for option, (option_name, option_number) in zip(options, normalized_options)
if normalized_player_name in option_name or option_name in normalized_player_name or (target_number and option_number == target_number)
]
if not similar_options:
similar_options = [option["text"] for option in options]
raise ValueError(
f"{selector}에서 선수 '{player_name}' #{player_number} 옵션을 찾지 못했습니다. "
f"후보: {', '.join(similar_options[:10])}"
)
def select_position_option(page: Page, selector: str, position_name: str) -> None:
position_value = POSITION_NUMBER_MAP.get(position_name)
if not position_value:
raise ValueError(f"포지션 매핑이 없습니다: {position_name}")
page.locator(selector).select_option(value=position_value)
def build_lineup_entries(lineups: dict[str, Any], team_key: str) -> list[tuple[int, dict[str, Any]]]:
team_lineup = lineups[team_key]
entries = [(0, team_lineup["starter_pitcher"])]
entries.extend((int(player["bat_order"]), player) for player in team_lineup["players"])
return entries
def fill_game_form(page: Page, report: dict[str, Any], args: argparse.Namespace) -> None:
game_info = report["game_info"]
season_label = f"{game_info['season']} 프로야구"
game_type_label = normalize_game_type(game_info["game_type"])
stadium_label = normalize_stadium_name(game_info["stadium"])
home_team_label = normalize_team_name(game_info["home_team"])
away_team_label = normalize_team_name(game_info["away_team"])
start_hour, start_minute = split_time(game_info["start_time"])
end_hour, end_minute = resolve_end_time(game_info, args)
attendance_source = args.attendance if args.attendance is not None else game_info.get("attendance")
attendance = None
if attendance_source is not None:
attendance = "".join(char for char in str(attendance_source) if char.isdigit())
select_by_label(page, "#season_id", season_label)
select_by_label(page, "#gameType", game_type_label)
select_by_label(page, "#stadium_id", stadium_label)
if attendance is not None:
page.locator("#spectatorCnt").fill(attendance)
page.locator("#gameDate").fill(game_info["date"])
select_by_label(page, "#startH", start_hour)
select_by_label(page, "#startM", start_minute)
page.locator("#endH").select_option(value=str(int(end_hour)))
select_by_label(page, "#endM", end_minute)
select_by_label(page, "#homeTeam_id", home_team_label)
select_by_label(page, "#awayTeam_id", away_team_label)
umpires = game_info["umpires"]
page.locator("#homeplate_umpire").fill(umpires["chief"] or "")
page.locator("#base_umpire1").fill(umpires["first_base"] or "")
page.locator("#base_umpire2").fill(umpires["second_base"] or "")
page.locator("#base_umpire3").fill(umpires["third_base"] or "")
select_by_label(page, "#gameType", game_type_label)
def fill_lineup_form(page: Page, report: dict[str, Any]) -> None:
lineups = report["lineups"]
team_selector_map = {
"home_team": "home",
"away_team": "away",
}
for team_key, prefix in team_selector_map.items():
for order, player in build_lineup_entries(lineups, team_key):
if not player:
continue
player_selector = f"#{prefix}_player_id_{order}"
defense_selector = f"#{prefix}_defense_no_{order}"
select_player_option(page, player_selector, player["name"], player.get("number"), player.get("position"))
select_position_option(page, defense_selector, player["position"])
def find_edit_href(page: Page, report: dict[str, Any], manager_game_no: str | None) -> str:
game_info = report["game_info"]
target_date = game_info["date"]
target_stadium = normalize_stadium_name(game_info["stadium"])
target_home_team = normalize_team_name(game_info["home_team"])
target_away_team = normalize_team_name(game_info["away_team"])
rows: list[dict[str, Any]] = page.locator("table.gclist tr").evaluate_all(
"""(rows) => rows.slice(1).map((row) => {
const cells = [...row.cells].map((cell) => cell.innerText.trim());
const editLink = [...row.querySelectorAll('a')].find((anchor) => anchor.textContent.trim() === '수정');
return {
gameNo: cells[0] || '',
date: cells[1] || '',
gameType: cells[2] || '',
stadium: cells[3] || '',
homeTeam: cells[4] || '',
awayTeam: cells[5] || '',
href: editLink ? editLink.getAttribute('href') : '',
};
})"""
)
if manager_game_no:
matched = next((row for row in rows if row["gameNo"] == str(manager_game_no)), None)
if not matched or not matched["href"]:
raise ValueError(f"관리자 게임번호 {manager_game_no} 행의 수정 링크를 찾지 못했습니다.")
return matched["href"]
candidates = [
row
for row in rows
if row["href"]
and row["date"] == target_date
and normalize_stadium_name(row["stadium"]) == target_stadium
and normalize_team_name(row["homeTeam"]) == target_home_team
and normalize_team_name(row["awayTeam"]) == target_away_team
]
if not candidates:
raise ValueError("목록에서 일치하는 경기 행을 찾지 못했습니다.")
return candidates[0]["href"]
def open_edit_page(page: Page, base_url: str, report: dict[str, Any], manager_game_no: str | None) -> None:
if manager_game_no:
page.goto(f"{base_url}/manager/game/write?id={manager_game_no}", wait_until="domcontentloaded")
page.wait_for_selector("#gameFrm", timeout=10000)
page.wait_for_selector("#home_player_id_1", timeout=10000)
return
page.goto(f"{base_url}/manager/game/list", wait_until="domcontentloaded")
page.wait_for_selector("table.gclist", timeout=10000)
edit_href = find_edit_href(page, report, manager_game_no)
with page.expect_navigation(wait_until="domcontentloaded"):
page.locator(f"a[href='{edit_href}']").first.click()
page.wait_for_selector("#gameFrm", timeout=10000)
page.wait_for_selector("#home_player_id_1", timeout=10000)
def open_create_page(page: Page, base_url: str) -> None:
page.goto(f"{base_url}/manager/game/list", wait_until="domcontentloaded")
page.wait_for_selector("table.gclist", timeout=10000)
with page.expect_navigation(wait_until="domcontentloaded"):
page.locator("a").filter(has_text="신규 등록").first.click()
page.wait_for_selector("#gameFrm", timeout=10000)
def create_game(page: Page, report: dict[str, Any], args: argparse.Namespace) -> None:
open_create_page(page, args.base_url)
fill_game_form(page, report, args)
page.on("dialog", lambda dialog: dialog.accept())
page.locator("#gameWriteBtn").click()
page.wait_for_url(f"{args.base_url}/manager/game/list", timeout=10000)
def update_game_header(page: Page, report: dict[str, Any], args: argparse.Namespace) -> None:
fill_game_form(page, report, args)
page.on("dialog", lambda dialog: dialog.accept())
page.locator("#gameUpdateBtn").click()
try:
page.wait_for_url(f"{args.base_url}/manager/game/list", timeout=10000)
except TimeoutError:
page.wait_for_load_state("domcontentloaded")
def run(playwright: Playwright, args: argparse.Namespace, report: dict[str, Any]) -> None:
browser = launch_browser_context(
playwright=playwright,
user_data_dir=args.user_data_dir,
channel=args.channel,
headless=args.headless,
)
page = browser.pages[0] if browser.pages else browser.new_page()
try:
create_game(page, report, args)
open_edit_page(page, args.base_url, report, args.manager_game_no)
update_game_header(page, report, args)
open_edit_page(page, args.base_url, report, args.manager_game_no)
fill_lineup_form(page, report)
if args.save:
page.on("dialog", lambda dialog: dialog.accept())
page.locator("#lineupWriteBtn").click()
try:
page.wait_for_url(f"{args.base_url}/manager/game/list", timeout=10000)
except TimeoutError:
pass
if args.close:
browser.close()
else:
try:
page.wait_for_timeout(3600 * 1000)
except KeyboardInterrupt:
pass
except Error as exc:
if "Target page, context or browser has been closed" not in str(exc):
raise
pass
finally:
try:
browser.close()
except Exception:
pass
def main() -> None:
args = parse_args()
report_path = get_report_path(args)
report = load_report(report_path)
with sync_playwright() as playwright:
run(playwright, args, report)
if __name__ == "__main__":
main()