refactoring

This commit is contained in:
2026-05-02 16:24:42 +09:00
parent 296adf3073
commit 859c39fe0c
194 changed files with 5267 additions and 0 deletions

5
automation/__init__.py Normal file
View File

@@ -0,0 +1,5 @@
"""
automation/ — Playwright 기반 관리자 사이트 자동 입력 패키지
config/와 core/에 의존하며, 관리자 사이트의 UI 조작을 담당합니다.
"""

322
automation/batter_input.py Normal file
View File

@@ -0,0 +1,322 @@
"""
automation/batter_input.py — 타석 결과 입력
타석의 결과(타구 좌표, 타격 결과, 타자 진루)를 입력합니다.
"""
from __future__ import annotations
import math
import hashlib
from typing import Any
from playwright.sync_api import Page
from core.pitch_classifier import infer_batter_result_label, is_ball_in_play_event
from core.field_calculator import (
infer_hit_ball_type,
infer_field_zone,
get_hit_ball_type_code,
get_zone_coordinates,
get_foul_fly_coordinates,
extract_direction_offsets,
is_infield_zone,
)
from automation.page_helpers import (
click_radio_by_label,
get_checked_event_name,
get_last_visible_enabled_locator,
)
from automation.defense_popup import (
fill_error_defense_popup,
click_defense_sequence_in_popup,
fill_runner_out_defense,
)
def _deterministic_offset(seed_text: str, radius: int) -> tuple[int, int]:
"""텍스트 기반 결정적 난수 오프셋 생성"""
digest = hashlib.md5(seed_text.encode("utf-8")).digest()
x_offset = (digest[0] % (radius * 2 + 1)) - radius
y_offset = (digest[1] % (radius * 2 + 1)) - radius
return x_offset, y_offset
def _apply_hit_ball_variation(
result_text: str, result_type: str, zone: str, x: int, y: int,
) -> tuple[int, int]:
"""타구 텍스트에 따른 좌표 변화율(오프셋) 적용"""
dir_x, dir_y = extract_direction_offsets(result_text)
if "파울플라이" in result_text or "파울희생플라이" in result_text or "파울 희생플라이" in result_text:
is_left = any(token in result_text for token in ("", "3루", "유격"))
foul_x, foul_y = get_foul_fly_coordinates("left" if is_left else "right")
x_offset, y_offset = _deterministic_offset(result_text, 2)
return (
max(0, min(100, foul_x + x_offset)),
max(50, min(100, foul_y + y_offset)),
)
if result_type == "home_run":
x_offset, y_offset = _deterministic_offset(result_text, 2)
return (
max(15, min(85, x + x_offset)),
max(12, min(22, y + y_offset)),
)
if is_infield_zone(zone):
base_shift = 3
random_radius = 2 if result_type == "out" else 3
else:
base_shift = 12
random_radius = 5 if result_type == "out" else 7
x_offset, y_offset = _deterministic_offset(result_text, random_radius)
return (
max(10, min(90, x + dir_x * base_shift + x_offset)),
max(18, min(96, y + dir_y * base_shift + y_offset)),
)
def build_hit_ball_payload(page: Page, result_text: str) -> dict[str, str]:
"""타구 좌표 및 거리 페이로드 생성"""
zone = infer_field_zone(result_text)
x, y = get_zone_coordinates(zone)
meter_per_px_text = page.locator("#dat_meterPerPx").input_value() or "0"
try:
meter_per_px = float(meter_per_px_text)
except ValueError:
meter_per_px = 0.0
result_type = "home_run" if "홈런" in result_text else ("out" if "아웃" in result_text or "희생" in result_text else "safe")
hit_ball_type_label = infer_hit_ball_type(result_text)
x, y = _apply_hit_ball_variation(result_text, result_type, zone, x, y)
px_x = math.floor(650 * x / 100)
px_y = math.floor(621 * y / 100)
distance = 0
if meter_per_px:
distance = math.floor(math.sqrt((50 - x) ** 2 + (95 - y) ** 2) * math.hypot(6.5, 6.21) * meter_per_px / 100)
return {
"type": get_hit_ball_type_code(hit_ball_type_label),
"label": hit_ball_type_label,
"x": str(px_x),
"y": str(px_y),
"xy": f"{x},{y}",
"distance": str(distance),
}
def set_hit_ball_and_defense(page: Page, event: dict[str, Any]) -> bool:
"""구장 팝업이 열렸을 때 타구 좌표 지정 및 수비 결과 입력"""
if not is_ball_in_play_event(event):
return False
result_text = ((event.get("result") or {}).get("text") or "").strip()
if not result_text:
return False
# 팝업 가시성 대기
try:
page.wait_for_selector("#div_stadium_image", state="visible", timeout=2000)
except Exception:
return False
# 타구 좌표 계산 및 입력
payload = build_hit_ball_payload(page, result_text)
page.evaluate(
"""(payload) => {
const mapImg = document.getElementById('mapImg');
if (!mapImg) return;
document.getElementById('dat_evt_hit_type').value = payload.type;
const dropDown = document.querySelector("#div_hit_type button.dropdown-toggle");
if (dropDown) {
dropDown.innerHTML = payload.label + ' <span class="caret"></span>';
}
document.getElementById('dat_hit_x').value = payload.x;
document.getElementById('dat_hit_y').value = payload.y;
document.getElementById('dat_hit_xy').value = payload.xy;
document.getElementById('dat_hit_distance').value = payload.distance;
document.getElementById('distance').value = payload.distance;
const mark = document.getElementById('map_mark');
if (mark) {
mark.style.display = 'block';
mark.style.left = payload.x + 'px';
mark.style.top = payload.y + 'px';
}
}""",
payload,
)
page.wait_for_timeout(300)
# 타구 결과에 따른 수비 팝업/입력 처리
result_type = (event.get("result") or {}).get("type") or ""
if result_type in {"single_error_advance", "double_error_advance", "triple_error_advance"}:
fill_error_defense_popup(page, result_text)
elif result_type in {"reach_on_error"}:
fill_error_defense_popup(page, result_text)
elif result_type in {"single_runner_out", "double_runner_out", "triple_runner_out"}:
fill_runner_out_defense(page, result_text)
elif "병살" in result_text:
from core.field_calculator import build_double_play_first_sequence
seq = build_double_play_first_sequence(event)
if seq:
click_defense_sequence_in_popup(page, seq)
btn = get_last_visible_locator(page, "#btnNext")
if btn:
btn.click()
page.wait_for_timeout(100)
elif "실책" in result_text:
fill_error_defense_popup(page, result_text)
elif result_type in {"out", "double_play"} and "삼진" not in result_text:
from core.field_calculator import extract_defense_sequence
seq = extract_defense_sequence(result_text)
if seq:
click_defense_sequence_in_popup(page, seq, complete_button_selector="#btnAdd")
# 홈런일 경우 입력 완료 버튼 직접 클릭
if result_type == "home_run":
try:
page.locator("#btnInputComplete").click(timeout=1000)
except Exception:
pass
return True
def set_batter_result_type(page: Page, result: dict[str, Any] | None, event: dict[str, Any] | None = None) -> None:
"""타격 결과 종류(1루타, 수비실책 등)만 세팅"""
if not result:
return
label = infer_batter_result_label(result, event)
if not label:
return
# 강제 세팅 (병살 등)
if label == "병살-아웃":
forced = page.evaluate(
"""(eventName) => {
const nodes = [...document.querySelectorAll("input[type=radio][name='evt_batter']")];
for (const node of nodes) {
const name = (node.getAttribute('eventName') || '').trim();
if (name === eventName) {
node.disabled = false;
node.checked = true;
node.dispatchEvent(new MouseEvent('click', { bubbles: true, cancelable: true }));
node.dispatchEvent(new Event('change', { bubbles: true }));
return true;
}
}
return false;
}""",
label,
)
if forced:
page.wait_for_timeout(120)
if get_checked_event_name(page, "evt_batter") == label:
return
# JS 강제 이벤트 발생
marker = page.evaluate(
"""(eventName) => {
const nodes = [...document.querySelectorAll("input[type=radio][name='evt_batter']")];
for (let i = nodes.length - 1; i >= 0; i -= 1) {
const node = nodes[i];
const rect = node.getBoundingClientRect();
const style = window.getComputedStyle(node);
if (!rect.width || !rect.height || style.display === 'none' || style.visibility === 'hidden' || node.disabled) {
continue;
}
if ((node.getAttribute('eventName') || '') === eventName) {
const marker = `codex-batter-${Math.random().toString(36).slice(2)}`;
node.setAttribute('data-codex-marker', marker);
return marker;
}
}
return null;
}""",
label,
)
if marker:
candidate = page.locator(f"[data-codex-marker='{marker}']")
for _ in range(3):
try:
candidate.click(force=True)
except Exception:
candidate.evaluate(
"""(node) => {
node.disabled = false;
node.checked = true;
node.dispatchEvent(new Event('click', { bubbles: true }));
node.dispatchEvent(new Event('change', { bubbles: true }));
}"""
)
if get_checked_event_name(page, "evt_batter") == label:
return
page.wait_for_timeout(100)
# 폴백
for _ in range(3):
click_radio_by_label(page, "evt_batter", label)
if get_checked_event_name(page, "evt_batter") == label:
break
page.wait_for_timeout(100)
def set_batter_advancement(page: Page, result: dict[str, Any] | None) -> None:
"""타자의 최종 루(1루, 2루 등)와 주루가산 세팅"""
if not result:
return
to_base = result.get("toBase")
# 기본 진루
if to_base is None:
r_type = result.get("type")
if r_type in {"single", "bunt_hit", "reach_on_error", "reach_on_fielder_choice", "reach_on_grounder", "hit_by_pitch", "walk", "intentional_walk"}:
to_base = 1
elif r_type == "double":
to_base = 2
elif r_type == "triple":
to_base = 3
elif r_type == "home_run":
to_base = 4
if to_base is not None:
try:
selector = f"input[name='dat_evt_batter_advance'][value='{to_base}']"
locator = get_last_visible_enabled_locator(page, selector)
if locator is not None:
locator.check(force=True)
else:
fallback = page.locator(selector)
if fallback.count() > 0:
fallback.first.check(force=True)
except Exception:
pass
# 주루가산 (Extra Advance)
try:
extra_advance = result.get("extra_advance")
if extra_advance is not None and extra_advance > 0:
locator = get_last_visible_enabled_locator(page, "#batterRunningAdd")
if locator is not None:
locator.select_option(value=str(extra_advance))
else:
fallback = page.locator("#batterRunningAdd")
if fallback.count() > 0:
fallback.first.select_option(value=str(extra_advance))
except Exception:
pass
def set_batter_result(page: Page, result: dict[str, Any] | None, event: dict[str, Any] | None = None) -> None:
"""타격 결과와 진루/가산 세팅"""
set_batter_result_type(page, result, event)
set_batter_advancement(page, result)

118
automation/defense_popup.py Normal file
View File

@@ -0,0 +1,118 @@
"""
automation/defense_popup.py — 수비 팝업 조작
수비 버튼 클릭, 수비 시퀀스 입력, 실책 수비 팝업 처리.
"""
from __future__ import annotations
from typing import Any
from playwright.sync_api import Page
from core.config_loader import defense_button_id_map, position_number_map
from core.field_calculator import (
extract_defense_sequence,
extract_error_position,
infer_error_position_fallback,
is_throwing_error,
)
from automation.page_helpers import get_last_visible_locator
def click_defense_button_robustly(page: Page, position: str, click_count: int = 1) -> bool:
"""수비 포지션 버튼을 안정적으로 클릭
ID 기반 → value 기반 → label 기반 순서로 시도
"""
btn_map = defense_button_id_map()
pos_num = position_number_map()
# 1) ID 기반
button_selector = btn_map.get(position)
if button_selector:
loc = page.locator(button_selector)
if loc.count() > 0:
for _ in range(click_count):
loc.click(force=True)
page.wait_for_timeout(60)
return True
# 2) value 기반
value = pos_num.get(position)
if value:
loc = page.locator(f"input[name='defenseNumberBtn'][value='{value}']")
if loc.count() > 0:
for _ in range(click_count):
loc.click(force=True)
page.wait_for_timeout(60)
return True
# 3) label 기반
all_buttons = page.locator("input[name='defenseNumberBtn']").all()
for btn in all_buttons:
label = (btn.get_attribute("id") or "").lower()
if position in label or label in position:
for _ in range(click_count):
btn.click(force=True)
page.wait_for_timeout(60)
return True
return False
def clear_defense_selections(page: Page) -> None:
"""수비 선택 초기화"""
page.evaluate(
"""() => {
document.querySelectorAll("input[name='defenseNumberBtn']").forEach(btn => {
btn.checked = false;
});
}"""
)
def click_defense_sequence_in_popup(
page: Page,
sequence: list[str],
complete_button_selector: str | None = None,
) -> None:
"""수비 시퀀스 순서대로 클릭 후 완료 버튼 클릭"""
for position in sequence:
click_defense_button_robustly(page, position)
page.wait_for_timeout(80)
if complete_button_selector:
btn = get_last_visible_locator(page, complete_button_selector)
if btn:
btn.click()
page.wait_for_timeout(120)
def fill_runner_out_defense(
page: Page, text: str, sequence_override: list[str] | None = None,
) -> None:
"""주루 아웃 수비 팝업 처리"""
page.wait_for_timeout(300)
sequence = sequence_override or extract_defense_sequence(text)
if sequence:
click_defense_sequence_in_popup(page, sequence)
def fill_error_defense_popup(page: Page, text: str) -> None:
"""실책 수비 팝업 처리"""
defense_sequence = extract_defense_sequence(text)
if len(defense_sequence) >= 2:
click_defense_sequence_in_popup(page, defense_sequence)
else:
error_position = extract_error_position(text)
if not error_position:
error_position = infer_error_position_fallback(text)
click_count = 2 if is_throwing_error(text) else 1
click_defense_button_robustly(page, error_position, click_count)
complete_button = get_last_visible_locator(page, "#btnNext")
if complete_button is None:
complete_button = get_last_visible_locator(page, "#btnAdd")
if complete_button:
complete_button.click()
page.wait_for_timeout(120)

View File

@@ -0,0 +1,141 @@
"""
automation/game_end_input.py — 경기 종료 처리
투수들의 승패/홀드/세이브 기록 등을 경기 종료 팝업에 입력하고 저장합니다.
"""
from __future__ import annotations
from typing import Any
from playwright.sync_api import Page
from automation.page_helpers import show_debug_overlay
from automation.lineup_input import normalize_lineup_text
def _open_game_end_popup(page: Page) -> None:
"""경기 종료 팝업 열기"""
page.evaluate("""() => { window.confirm = () => true; window.alert = () => {}; }""")
page.locator("#gameEndBtn").click(force=True)
page.wait_for_selector("#btnGameEnd", timeout=5000)
page.wait_for_selector("input[name^='homeTeamPitcher_'], input[name^='awayTeamPitcher_']", timeout=5000)
def _get_game_end_pitcher_rows(page: Page) -> dict[str, list[dict[str, Any]]]:
"""팝업 내 홈/원정 투수 리스트 추출"""
return page.evaluate(
"""() => {
const rowsFor = (nameAttr) => {
return [...document.querySelectorAll(`input[name='${nameAttr}']`)].map((input, idx) => {
const tr = input.closest('tr');
const firstTd = tr ? tr.querySelector('td') : null;
return {
idx,
name: firstTd ? firstTd.textContent.trim() : '',
};
});
};
return {
home: rowsFor('home_player_id'),
away: rowsFor('away_player_id'),
};
}"""
)
def _select_game_end_role(page: Page, side: str, idx: int, role_value: str) -> None:
"""특정 투수의 역할(승/패/홀/세 등) 라디오 버튼 선택"""
selector = f"input[name='{side}TeamPitcher_{idx}'][value='{role_value}']"
ok = page.evaluate(
"""(selector) => {
const node = document.querySelector(selector);
if (!node) return false;
node.disabled = false;
node.checked = true;
node.dispatchEvent(new MouseEvent('click', { bubbles: true, cancelable: true }));
node.dispatchEvent(new Event('change', { bubbles: true }));
return node.checked === true;
}""",
selector,
)
if not ok:
page.locator(selector).click(force=True)
def _check_game_end_blown_save(page: Page, side: str, idx: int) -> None:
"""블론세이브 체크박스 선택"""
selector = f"input[name='{side}BlownSave_{idx}']"
ok = page.evaluate(
"""(selector) => {
const node = document.querySelector(selector);
if (!node) return false;
node.disabled = false;
node.checked = true;
node.dispatchEvent(new MouseEvent('click', { bubbles: true, cancelable: true }));
node.dispatchEvent(new Event('change', { bubbles: true }));
return node.checked === true;
}""",
selector,
)
if not ok:
page.locator(selector).check(force=True)
def fill_game_end_pitching(page: Page, report: dict[str, Any]) -> None:
"""리포트의 투수 요약을 바탕으로 경기 종료 팝업 폼 채우기"""
_open_game_end_popup(page)
rows = _get_game_end_pitcher_rows(page)
summary = report.get("pitching_summary") or {}
lineups = report.get("lineups") or {}
home_starter = normalize_lineup_text(((lineups.get("home_team") or {}).get("starter_pitcher") or {}).get("name") or "")
away_starter = normalize_lineup_text(((lineups.get("away_team") or {}).get("starter_pitcher") or {}).get("name") or "")
winners = {normalize_lineup_text(name) for name in (summary.get("승리투수") or [])}
losers = {normalize_lineup_text(name) for name in (summary.get("패전투수") or [])}
holds = {normalize_lineup_text(name) for name in (summary.get("홀드") or [])}
saves = {normalize_lineup_text(name) for name in (summary.get("세이브") or [])}
blown_saves = {normalize_lineup_text(name) for name in (summary.get("블론세이브") or [])}
fixed_roles = winners | losers | holds | saves
for side, side_rows in rows.items():
starter_name = home_starter if side == "home" else away_starter
for row in side_rows:
name = normalize_lineup_text(row.get("name") or "")
idx = int(row["idx"])
if name in winners:
_select_game_end_role(page, side, idx, "wins")
elif name in losers:
_select_game_end_role(page, side, idx, "loses")
elif name in saves:
_select_game_end_role(page, side, idx, "save")
elif name in holds:
_select_game_end_role(page, side, idx, "holds")
elif name and name != starter_name and name not in fixed_roles:
_select_game_end_role(page, side, idx, "re")
if name in blown_saves:
_check_game_end_blown_save(page, side, idx)
page.wait_for_timeout(300)
show_debug_overlay(
page,
[
"게임종료 팝업 입력 준비 완료",
f"승리: {', '.join(summary.get('승리투수') or []) or '-'}",
f"패전: {', '.join(summary.get('패전투수') or []) or '-'}",
f"홀드: {', '.join(summary.get('홀드') or []) or '-'}",
f"세이브: {', '.join(summary.get('세이브') or []) or '-'}",
f"블론세이브: {', '.join(summary.get('블론세이브') or []) or '-'}",
],
)
def submit_game_end(page: Page) -> None:
"""경기 종료 최종 완료(저장) 버튼 클릭"""
page.evaluate("""() => { window.confirm = () => true; window.alert = () => {}; }""")
page.locator("#btnGameEnd").click(force=True)
page.wait_for_timeout(1500)

299
automation/lineup_input.py Normal file
View File

@@ -0,0 +1,299 @@
"""
automation/lineup_input.py — 라인업 및 선수 교체 입력
관리자 사이트의 홈/원정 라인업 조작, 선수명 및 포지션 선택,
교체 이벤트 처리를 담당합니다.
"""
from __future__ import annotations
import re
from typing import Any
from playwright.sync_api import Page
from core.config_loader import position_to_defense_no
from core.change_parser import normalize_change_event
def normalize_lineup_text(text: str) -> str:
"""라인업 선수명 정규화 (괄호, 번호 등 제거하고 이름만 추출)"""
text = (text or "").strip()
text = text.replace("*", "")
text = re.sub(r"\[\d+(?:번)?\]", "", text)
text = re.sub(r"\s*\(.*?\)\s*", "", text)
text = "".join(re.findall(r"[가-힣A-Za-z]+", text))
return text.strip()
def get_lineup_state(page: Page) -> dict[str, Any]:
"""현재 사이트에 입력된 라인업 상태(홈/원정 0~9번 행) 추출"""
return page.evaluate(
"""() => {
const buildSide = (side) => {
const rows = [];
for (let idx = 0; idx <= 9; idx += 1) {
const player = document.querySelector(`#${side}_player_id_${idx}`);
const defense = document.querySelector(`#${side}_defense_no_${idx}`);
const orgPlayer = document.querySelector(`#org_${side}_player_id_${idx}`);
const orgDefense = document.querySelector(`#org_${side}_defense_no_${idx}`);
if (!player && !defense) continue;
rows.push({
idx,
playerText: player ? player.options[player.selectedIndex]?.text || '' : '',
playerValue: player ? player.value : '',
defenseValue: defense ? defense.value : '',
orgPlayerValue: orgPlayer ? orgPlayer.value : '',
orgDefenseValue: orgDefense ? orgDefense.value : '',
});
}
return rows;
};
return { home: buildSide('home'), away: buildSide('away') };
}"""
)
def detect_change_side(
half_inning: dict[str, Any], change_event: dict[str, Any], lineup_state: dict[str, Any],
) -> str:
"""교체가 일어난 팀(home/away) 추론"""
actor_name = normalize_lineup_text(
change_event.get("actor_name") or change_event.get("player_name") or ""
)
offense_side = "away" if half_inning.get("half") == "top" else "home"
defense_side = "home" if offense_side == "away" else "away"
matched_sides: list[str] = []
for side in ("home", "away"):
for row in lineup_state.get(side, []):
if normalize_lineup_text(row.get("playerText") or "") == actor_name:
matched_sides.append(side)
break
if len(matched_sides) == 1:
return matched_sides[0]
actor_role = change_event.get("actor_role")
if actor_role in {"batter", "대타", "대주자", "1루주자", "2루주자", "3루주자", "주자"}:
return offense_side
return defense_side
def find_change_row(side_rows: list[dict[str, Any]], change_event: dict[str, Any]) -> int | None:
"""교체 대상 선수가 속한 라인업 행 인덱스 검색"""
if change_event.get("bat_order") is not None:
return int(change_event["bat_order"])
actor_name_raw = (
change_event.get("actor_name")
or change_event.get("player_name")
or change_event.get("out_player")
or ""
)
actor_name = normalize_lineup_text(actor_name_raw)
# 1단계: 완전 일치
for row in side_rows:
if normalize_lineup_text(row.get("playerText") or "") == actor_name:
return int(row["idx"])
# 2단계: 부분 일치
for row in side_rows:
player_text = normalize_lineup_text(row.get("playerText") or "")
if actor_name and (actor_name in player_text or player_text in actor_name):
return int(row["idx"])
# 3단계: 역할(defenseValue) 일치
actor_role = change_event.get("actor_role")
pos_def_map = position_to_defense_no()
if actor_role in pos_def_map:
defense_no = pos_def_map[actor_role]
for row in side_rows:
if str(row.get("defenseValue") or "") == defense_no:
return int(row["idx"])
# 4단계: 이름 혼재 경우 (예: "1루주자 문보경")
if " " in actor_name_raw:
potential_name = normalize_lineup_text(actor_name_raw.split()[-1])
for row in side_rows:
if normalize_lineup_text(row.get("playerText") or "") == potential_name:
return int(row["idx"])
return None
def find_pitcher_row(side_rows: list[dict[str, Any]]) -> int | None:
"""투수가 위치한 행(일반적으로 0번) 찾기"""
pos_def_map = position_to_defense_no()
pitcher_no = pos_def_map.get("투수")
for row in side_rows:
if str(row.get("defenseValue") or "") == pitcher_no:
return int(row["idx"])
return None
def select_lineup_player(page: Page, side: str, row_idx: int, player_name: str) -> None:
"""라인업 select 상자에서 선수명으로 선택"""
select_id = f"#{side}_player_id_{row_idx}"
options = page.locator(f"{select_id} option").evaluate_all(
"""(nodes) => nodes.map((node) => ({ value: node.value, text: node.textContent.trim() }))"""
)
target_value = None
normalized_target = normalize_lineup_text(player_name)
for option in options:
if normalize_lineup_text(option["text"]) == normalized_target:
target_value = option["value"]
break
if not target_value:
raise ValueError(f"{side} {row_idx}번 행에서 선수 '{player_name}' 옵션을 찾지 못했습니다.")
page.locator(select_id).select_option(value=target_value)
def set_lineup_defense(page: Page, side: str, row_idx: int, position: str | None) -> None:
"""라인업 수비 포지션 세팅"""
if not position:
return
defense_no = position_to_defense_no().get(position)
if not defense_no:
return
page.locator(f"#{side}_defense_no_{row_idx}").select_option(value=defense_no)
def get_current_lineup_selection(page: Page, side: str, row_idx: int) -> tuple[str, str]:
"""해당 행의 현재 선택된 선수와 수비 번호 값 반환"""
player_value = page.locator(f"#{side}_player_id_{row_idx}").input_value()
defense_value = page.locator(f"#{side}_defense_no_{row_idx}").input_value()
return player_value, defense_value
def get_target_player_value(page: Page, side: str, row_idx: int, player_name: str | None) -> str | None:
if not player_name:
return None
select_id = f"#{side}_player_id_{row_idx}"
options = page.locator(f"{select_id} option").evaluate_all(
"""(nodes) => nodes.map((node) => ({ value: node.value, text: node.textContent.trim() }))"""
)
normalized_target = normalize_lineup_text(player_name)
for option in options:
if normalize_lineup_text(option["text"]) == normalized_target:
return option["value"]
return None
def get_target_defense_value(position: str | None) -> str | None:
if not position:
return None
return position_to_defense_no().get(position)
def apply_change_event(
page: Page,
half_inning: dict[str, Any],
change_event: dict[str, Any],
change_cache: dict[str, tuple[str, int]],
) -> None:
"""선수 교체 이벤트를 라인업에 반영하고 저장"""
change_event = normalize_change_event(change_event)
actor_name_raw = change_event.get("actor_name") or change_event.get("player_name") or change_event.get("out_player") or ""
cache_key = normalize_lineup_text(actor_name_raw)
actor_name = normalize_lineup_text(actor_name_raw)
lineup_state = get_lineup_state(page)
cached = change_cache.get(cache_key)
side = cached[0] if cached else detect_change_side(half_inning, change_event, lineup_state)
side_rows = lineup_state.get(side, [])
row_idx = find_change_row(side_rows, change_event)
if row_idx is None and cached and cached[0] == side:
row_idx = cached[1]
if row_idx is None:
all_players = [normalize_lineup_text(r.get("playerText", "")) for r in side_rows]
raise ValueError(
f"{side} 교체 행을 찾지 못했습니다 "
f"(Target: {actor_name}, Candidates: {all_players}): {change_event.get('text')}"
)
# 확인창 무시
page.evaluate("""() => { window.alert = () => {}; window.confirm = () => true; }""")
def trigger_lineup_save(idx: int):
home_away_gb = 2 if side == "home" else 1
page.evaluate(
"""({ batterNo, homeAwayGb }) => {
if (typeof window.f_lineup === 'function') {
window.f_lineup(batterNo, homeAwayGb);
}
}""",
{"batterNo": idx, "homeAwayGb": home_away_gb},
)
page.wait_for_timeout(200)
page.evaluate(
"""({ side, idx }) => {
const row = document.querySelector(`#${side}_player_id_${idx}`)?.parentElement?.parentElement;
if (row) {
const saveBtn = [...row.querySelectorAll("input[type=button], button, a")].find(el =>
el.value === 'V' || el.innerText.includes('V') || el.innerText.includes('저장') || el.id.includes('save')
);
if (saveBtn) {
saveBtn.click();
return true;
}
}
return false;
}""",
{"side": side, "idx": idx}
)
page.wait_for_timeout(200)
# 병합 교체 (예: 포수→지명타자 전환 + 새 투수 등판)
if change_event.get("change_type") == "merged_pitcher_substitution":
actor_player_name = change_event.get("player_name") or change_event.get("actor_name")
pitcher_in_player = change_event.get("pitcher_in_player") or change_event.get("in_player")
pitcher_row_idx = find_pitcher_row(side_rows)
if pitcher_row_idx is None or not pitcher_in_player:
raise ValueError(f"투수 교체 행을 찾지 못했습니다: {change_event.get('text')}")
set_lineup_defense(page, side, row_idx, "지명타자")
trigger_lineup_save(row_idx)
trigger_lineup_save(row_idx + 1)
select_lineup_player(page, side, pitcher_row_idx, pitcher_in_player)
set_lineup_defense(page, side, pitcher_row_idx, "투수")
trigger_lineup_save(pitcher_row_idx)
trigger_lineup_save(pitcher_row_idx + 1)
if actor_player_name:
change_cache[normalize_lineup_text(actor_player_name)] = (side, int(row_idx))
change_cache[normalize_lineup_text(pitcher_in_player)] = (side, int(pitcher_row_idx))
return
# 단순 선수 교체
if change_event.get("change_type") == "substitution":
in_player = change_event.get("in_player")
if not in_player:
return
select_lineup_player(page, side, row_idx, in_player)
set_lineup_defense(page, side, row_idx, change_event.get("to_position"))
trigger_lineup_save(row_idx)
trigger_lineup_save(row_idx + 1)
out_player = change_event.get("out_player")
if out_player:
change_cache[normalize_lineup_text(out_player)] = (side, int(row_idx))
change_cache[normalize_lineup_text(in_player)] = (side, int(row_idx))
return
# 단순 수비 위치 변경
if change_event.get("change_type") == "position_change":
set_lineup_defense(page, side, row_idx, change_event.get("to_position"))
trigger_lineup_save(row_idx)
trigger_lineup_save(row_idx + 1)
player_name = change_event.get("player_name")
if player_name:
change_cache[normalize_lineup_text(player_name)] = (side, int(row_idx))
return

338
automation/page_helpers.py Normal file
View File

@@ -0,0 +1,338 @@
"""
automation/page_helpers.py — 공통 Playwright 유틸리티
라디오 버튼, select 박스, 가시성 판별 등 사이트 조작의 기초 함수.
모든 automation 모듈이 이 모듈에 의존합니다.
"""
from __future__ import annotations
from time import sleep
from typing import Any
from playwright.sync_api import Page
# ──────────────────────────────────────────────
# 라디오 버튼 조작
# ──────────────────────────────────────────────
def get_radio_map(page: Page, name: str) -> dict[str, str]:
"""라디오 그룹의 eventName → value 맵 반환"""
return page.evaluate(
"""(name) => {
const nodes = [...document.querySelectorAll(`input[type=radio][name='${name}']`)];
const map = {};
for (const node of nodes) {
const eventName = (node.getAttribute('eventName') || '').trim();
if (eventName) {
map[eventName] = node.value;
}
}
return map;
}""",
name,
)
def get_checked_event_name(page: Page, radio_name: str) -> str:
"""현재 체크된 라디오의 eventName 반환"""
return page.evaluate(
"""(radioName) => {
const nodes = [...document.querySelectorAll(`input[type=radio][name='${radioName}']:checked`)];
for (const node of nodes) {
const rect = node.getBoundingClientRect();
const style = window.getComputedStyle(node);
if (rect.width > 0 && rect.height > 0 && style.display !== 'none' && style.visibility !== 'hidden' && !node.disabled) {
return node.getAttribute('eventName') || '';
}
}
return nodes.length > 0 ? (nodes[0].getAttribute('eventName') || '') : '';
}""",
radio_name,
)
def set_radio_by_label(page: Page, radio_name: str, label: str) -> None:
"""eventName이 label과 일치하는 라디오 클릭"""
radios = page.locator(f"input[type=radio][name='{radio_name}']").all()
target_radio = None
# 정확히 일치하는 라벨 우선
for rb in radios:
if rb.get_attribute("eventname") == label:
target_radio = rb
break
# 포함 관계로 탐색
if not target_radio:
for rb in radios:
if label in (rb.get_attribute("eventname") or ""):
target_radio = rb
break
if target_radio:
target_radio.click(force=True)
def click_radio_by_label(page: Page, radio_name: str, label: str) -> None:
"""라디오 버튼을 JS로 강제 클릭 (disabled 상태도 처리)"""
page.evaluate(
"""({ radioName, label }) => {
const nodes = [...document.querySelectorAll(`input[type=radio][name='${radioName}']`)];
for (const node of nodes) {
const eventName = (node.getAttribute('eventName') || '').trim();
if (eventName === label) {
node.disabled = false;
node.checked = true;
node.dispatchEvent(new MouseEvent('click', { bubbles: true, cancelable: true }));
node.dispatchEvent(new Event('change', { bubbles: true }));
return;
}
}
}""",
{"radioName": radio_name, "label": label},
)
def find_visible_radio_by_label(page: Page, radio_name: str, label: str):
"""가시적이고 활성화된 라디오를 찾아서 Locator 반환"""
marker = page.evaluate(
"""({ radioName, expectedLabel }) => {
const nodes = [...document.querySelectorAll(`input[type=radio][name='${radioName}']`)];
for (let i = nodes.length - 1; i >= 0; i -= 1) {
const node = nodes[i];
const rect = node.getBoundingClientRect();
const style = window.getComputedStyle(node);
if (!rect.width || !rect.height || style.display === 'none' || style.visibility === 'hidden' || node.disabled) {
continue;
}
const name = (node.getAttribute('eventName') || '').trim();
if (name === expectedLabel) {
const marker = `codex-radio-${Math.random().toString(36).slice(2)}`;
node.setAttribute('data-codex-marker', marker);
return marker;
}
}
return null;
}""",
{"radioName": radio_name, "expectedLabel": label},
)
if marker:
return page.locator(f"[data-codex-marker='{marker}']")
return None
# ──────────────────────────────────────────────
# 가시성 유틸
# ──────────────────────────────────────────────
def get_last_visible_locator(page: Page, selector: str):
"""selector 중 마지막으로 보이는 요소의 Locator 반환"""
marker = page.evaluate(
"""(selector) => {
const nodes = [...document.querySelectorAll(selector)];
for (let i = nodes.length - 1; i >= 0; i -= 1) {
const node = nodes[i];
const rect = node.getBoundingClientRect();
const style = window.getComputedStyle(node);
if (rect.width > 0 && rect.height > 0 && style.display !== 'none' && style.visibility !== 'hidden') {
const marker = `codex-visible-${Math.random().toString(36).slice(2)}`;
node.setAttribute('data-codex-marker', marker);
return marker;
}
}
return null;
}""",
selector,
)
if marker:
return page.locator(f"[data-codex-marker='{marker}']")
return None
def get_last_visible_enabled_locator(page: Page, selector: str):
"""selector 중 마지막으로 보이고 활성화된 요소의 Locator 반환"""
marker = page.evaluate(
"""(selector) => {
const nodes = [...document.querySelectorAll(selector)];
for (let i = nodes.length - 1; i >= 0; i -= 1) {
const node = nodes[i];
const rect = node.getBoundingClientRect();
const style = window.getComputedStyle(node);
if (rect.width > 0 && rect.height > 0 && style.display !== 'none' && style.visibility !== 'hidden' && !node.disabled) {
const marker = `codex-enabled-${Math.random().toString(36).slice(2)}`;
node.setAttribute('data-codex-marker', marker);
return marker;
}
}
return null;
}""",
selector,
)
if marker:
return page.locator(f"[data-codex-marker='{marker}']")
return None
# ──────────────────────────────────────────────
# select 박스 조작
# ──────────────────────────────────────────────
def set_select_by_partial_text(page: Page, selector: str, partial_text: str) -> None:
"""텍스트 부분 일치로 select option 선택"""
if not partial_text:
return
page.wait_for_selector(selector, timeout=3000)
options = page.locator(f"{selector} option").all_text_contents()
target = partial_text.strip()
for opt in options:
if opt.strip() == target:
page.select_option(selector, label=opt)
return
target_clean = target.replace(" ", "").replace("/", ",").replace("-", ",")
for opt in options:
opt_clean = opt.strip().replace(" ", "").replace("/", ",").replace("-", ",")
if target_clean in opt_clean or opt_clean in target_clean:
page.select_option(selector, label=opt)
return
print(f"DEBUG: '{selector}'에서 '{partial_text}'와 일치하는 옵션을 찾지 못함.")
def set_select_by_text_or_value(page: Page, selector: str, desired: str) -> None:
"""label 또는 value로 select option 선택"""
locator = page.locator(selector)
try:
locator.select_option(label=desired)
return
except Exception:
pass
try:
locator.select_option(value=desired)
return
except Exception:
pass
locator.select_option(index=0)
# ──────────────────────────────────────────────
# 디버그 오버레이 & 제어
# ──────────────────────────────────────────────
def show_debug_overlay(page: Page, lines: list[str]) -> None:
"""페이지에 디버그 오버레이 표시"""
page.evaluate(
"""(lines) => {
let box = document.querySelector('#codex-debug-overlay');
if (!box) {
window.codexControl = { paused: false, proceed: 0 };
box = document.createElement('div');
box.id = 'codex-debug-overlay';
box.style.cssText = 'position:fixed;top:12px;right:12px;z-index:999999;background:rgba(0,0,0,0.82);color:#fff;padding:10px 12px;border-radius:8px;font-size:14px;line-height:1.5;max-width:360px;white-space:pre-wrap;box-shadow:0 4px 16px rgba(0,0,0,0.35)';
const controls = document.createElement('div');
controls.style.cssText = 'margin-bottom:8px;display:flex;gap:8px';
const pauseBtn = document.createElement('button');
pauseBtn.id = 'codex-pause-btn';
pauseBtn.textContent = '일시정지';
pauseBtn.onclick = () => {
window.codexControl.paused = !window.codexControl.paused;
pauseBtn.textContent = window.codexControl.paused ? '재개' : '일시정지';
};
const nextBtn = document.createElement('button');
nextBtn.id = 'codex-next-btn';
nextBtn.textContent = '다음';
nextBtn.onclick = () => { window.codexControl.proceed += 1; };
controls.appendChild(pauseBtn);
controls.appendChild(nextBtn);
const body = document.createElement('div');
body.id = 'codex-debug-body';
box.appendChild(controls);
box.appendChild(body);
document.body.appendChild(box);
}
const body = document.querySelector('#codex-debug-body');
if (body) { body.textContent = lines.join('\\n'); }
}""",
lines,
)
def wait_for_operator_control(page: Page) -> None:
"""일시정지/다음 버튼 대기"""
state = page.evaluate("() => window.codexControl || { paused: false, proceed: 0 }")
last_proceed = state.get("proceed", 0)
while True:
state = page.evaluate("() => window.codexControl || { paused: false, proceed: 0 }")
if not state.get("paused"):
return
if state.get("proceed", 0) > last_proceed:
page.evaluate(
"(prev) => { if (window.codexControl && window.codexControl.proceed > prev) { window.codexControl.proceed = prev; } }",
last_proceed,
)
return
sleep(0.1)
# ──────────────────────────────────────────────
# 카운트 관리
# ──────────────────────────────────────────────
def advance_count(balls: int, strikes: int, pitch_result: str) -> tuple[int, int]:
"""투구 결과에 따른 카운트 갱신"""
if pitch_result in ("B",):
return balls + 1, strikes
if pitch_result in ("T", "S", "BS"):
return balls, strikes + 1
if pitch_result in ("F", "BF"):
if strikes < 2:
return balls, strikes + 1
return balls, strikes
def get_checked_batter_defense_type(page: Page) -> str:
"""현재 선택된 타격 결과의 수비 유형 반환"""
return page.evaluate(
"""() => {
const checked = document.querySelector("input[type=radio][name='evt_batter']:checked");
if (!checked) return '';
return checked.getAttribute('defenseType') || '';
}"""
)
def get_last_history_text(page: Page) -> str:
"""사이트 내역(historyView)의 마지막 항목 텍스트 추출"""
try:
return (
page.evaluate(
"""() => {
const nodes = document.querySelectorAll("div[name='historyView']");
const lastNode = nodes[nodes.length - 1];
return lastNode ? lastNode.textContent.trim() : '';
}"""
)
or ""
).strip()
except Exception:
return ""
def get_history_count(page: Page) -> int:
"""기록 영역에 추가된 이벤트(historyView)의 총 개수 반환"""
try:
return int(
page.evaluate(
"""() => document.querySelectorAll("div[name='historyView']").length"""
)
)
except Exception:
return 0

96
automation/pitch_input.py Normal file
View File

@@ -0,0 +1,96 @@
"""
automation/pitch_input.py — 투구 입력
개별 투구의 구종, 구속, 투구결과를 사이트에 입력합니다.
"""
from __future__ import annotations
from typing import Any
from playwright.sync_api import Page
from core.config_loader import pitch_type_map, pitch_result_map
from automation.page_helpers import set_radio_by_label
def get_pitch_runner_events(
pitch: dict[str, Any], event: dict[str, Any] | None = None,
) -> list[dict[str, Any]]:
"""투구에 연결된 주루 이벤트 반환"""
pitch_runner_events = list(pitch.get("runnerEvents") or [])
if pitch_runner_events:
return pitch_runner_events
if event and event.get("runnerEvents"):
return list(event.get("runnerEvents") or [])
return []
def set_pitch(page: Page, pitch: dict[str, Any], event: dict[str, Any] | None = None) -> None:
"""투구 하나를 사이트에 입력 (구종 + 결과 + 구속)"""
pt_map = pitch_type_map()
pr_map = pitch_result_map()
pitch_type = pt_map.get(pitch.get("pitchType") or "")
pitch_result_text = (pitch.get("pitchResultText") or "").strip()
normalized_text = pitch_result_text.replace(" ", "")
# 피치클락 투수위반 → 볼
if "피치클락" in pitch_result_text and "투수위반" in pitch_result_text:
pitch_result_text = ""
# 폭투/포일 체크
runner_events = get_pitch_runner_events(pitch, event)
is_wild_pitch = any(
re_.get("type") == "wild_pitch_advance" or "폭투" in (re_.get("text") or "")
for re_ in runner_events
)
is_passed_ball = any(
re_.get("type") == "passed_ball_advance" or "포일" in (re_.get("text") or "")
for re_ in runner_events
)
if is_wild_pitch:
pitch_result = "폭투-볼"
elif is_passed_ball:
pitch_result = "포일-볼"
else:
if "번트" in normalized_text and "헛스윙" in normalized_text:
pitch_result = "번트시도-스트라이크"
elif "번트" in normalized_text and "파울" in normalized_text:
pitch_result = "번트-파울"
else:
pitch_result = pr_map.get(pitch_result_text)
if not pitch_result and pitch.get("pitchResult") in {"BS", "V"}:
pitch_result = "번트시도-스트라이크"
if not pitch_result and pitch.get("pitchResult") == "BF":
pitch_result = "번트-파울"
if not pitch_result and "고의사구" in pitch_result_text:
pitch_result = "고의사구"
if not pitch_result and "파울플라이" in pitch_result_text and "실책" in pitch_result_text:
pitch_result = "파울플라이-실책"
# 구종 입력
if pitch_type:
set_radio_by_label(page, "evt_ballType", pitch_type)
# 투구 결과 입력
if pitch_result:
set_radio_by_label(page, "evt_batter", pitch_result)
# 구속 입력
speed_input = page.locator("#ballspeed")
speed_input.fill(str(pitch.get("speedKmh") or 0))
speed_input.evaluate("node => node.dispatchEvent(new Event('change', {bubbles:true}))")
page.wait_for_timeout(50)
def set_pitch_meta_only(page: Page, pitch: dict[str, Any]) -> None:
"""구종/구속만 세팅 (인플레이 마지막 구에서 사용)
evt_batter를 건드리지 않아 팝업이 미리 열리는 것을 방지.
"""
pt_map = pitch_type_map()
pitch_type = pt_map.get(pitch.get("pitchType") or "")
if pitch_type:
set_radio_by_label(page, "evt_ballType", pitch_type)
page.locator("#ballspeed").fill(str(pitch.get("speedKmh") or 0))

233
automation/review_input.py Normal file
View File

@@ -0,0 +1,233 @@
"""
automation/review_input.py — 비디오 판독/합의 판정 입력
비디오 판독 이벤트를 관리자 사이트 팝업에 입력합니다.
"""
from __future__ import annotations
import re
from typing import Any
from playwright.sync_api import Page
from core.review_parser import parse_review_event_text
from automation.page_helpers import (
set_select_by_partial_text,
set_select_by_text_or_value,
)
def _normalize_review_event(review_event: dict[str, Any]) -> dict[str, Any]:
"""텍스트 기반 이벤트 파싱 및 정규화"""
has_results = review_event.get("beforeResult") is not None and review_event.get("finalResult") is not None
if review_event.get("requestInningLabel") and review_event.get("reviewItem") and has_results:
return review_event
text = review_event.get("text") or ""
parsed = parse_review_event_text(text)
parsed.update({k: v for k, v in review_event.items() if k not in parsed})
return parsed
def _open_challenge_popup(page: Page) -> Page:
"""비디오 판독 팝업 열기"""
with page.expect_popup(timeout=5000) as popup_info:
page.locator("#challengeBtn").click()
popup = popup_info.value
popup.wait_for_load_state("domcontentloaded")
popup.wait_for_selector("#requestInning_0", timeout=5000)
return popup
def _select_review_final_result(
popup: Page, row_index: int, review_item: str, final_result: str | None,
) -> None:
"""판독 결과 선택"""
from core.config_loader import review_result_groups
groups = review_result_groups()
# 설정에 매핑된 그룹/기본값 찾기
group_info = groups.get(review_item)
if group_info:
group_key = group_info["type"]
default_a = group_info["options"][0]
else:
group_key = "type3"
default_a = "인정"
result_value = final_result or default_a
# 셀렉터 찾기 시도
select_selector = f"#finalResult_{group_key}_{row_index}"
if not popup.locator(select_selector).count():
select_selector = f"#finalResult_{group_key[-1]}_{row_index}"
if not popup.locator(select_selector).count():
select_selector = f"#finalResult_type{group_key[-1]}_{row_index}"
set_select_by_text_or_value(popup, select_selector, result_value)
# JS 강제 이벤트 발생
try:
popup.locator(f"#finalResult_{row_index}").evaluate(
"""(node, value) => {
node.value = value;
node.dispatchEvent(new Event('change', { bubbles: true }));
}""",
result_value,
)
except Exception:
pass
def _fill_review_row(popup: Page, row_index: int, review_event: dict[str, Any]) -> None:
"""팝업의 한 행에 판독 이벤트 입력"""
request_inning = review_event.get("requestInningLabel") or "1초"
request_team = review_event.get("requestTeam") or popup.locator(f"#requestTeamId_{row_index} option").first.text_content() or ""
review_item = review_event.get("reviewItem") or "기타"
final_result = review_event.get("finalResult")
is_success_val = "Y" if (review_event.get("isSuccess") == "성공") else "N"
popup.wait_for_selector(f"#requestInning_{row_index}", timeout=3000)
set_select_by_text_or_value(popup, f"#requestInning_{row_index}", request_inning)
try:
set_select_by_partial_text(popup, f"#requestTeamId_{row_index}", request_team)
except Exception:
pass
if popup.is_closed(): return
try:
set_select_by_partial_text(popup, f"#forWhat_{row_index}", review_item)
except Exception:
set_select_by_text_or_value(popup, f"#forWhat_{row_index}", "기타")
popup.wait_for_timeout(100)
if popup.is_closed(): return
_select_review_final_result(popup, row_index, review_item, final_result)
if popup.is_closed(): return
set_select_by_text_or_value(popup, f"#isSuccess_{row_index}", is_success_val)
def _append_review_row(popup: Page) -> int:
"""신규 판독 행 추가"""
before_count = popup.locator("select[id^='requestInning_']").count()
popup.get_by_role("button", name="신규추가").click()
popup.wait_for_function(
"""(expectedCount) => {
return document.querySelectorAll("select[id^='requestInning_']").length > expectedCount;
}""",
arg=before_count,
timeout=3000,
)
row_index = popup.evaluate(
"""() => {
const ids = [...document.querySelectorAll("select[id^='requestInning_']")]
.map((el) => el.id)
.map((id) => Number(id.split("_").pop()))
.filter((num) => Number.isFinite(num));
return ids.length ? Math.max(...ids) : 0;
}"""
)
popup.wait_for_selector(f"#requestInning_{row_index}", timeout=5000)
return int(row_index)
def _can_reuse_initial_review_row(popup: Page) -> bool:
"""초기화된 0번 행 재사용 가능 여부"""
try:
row_count = popup.locator("select[id^='requestInning_']").count()
if row_count != 1:
return False
hidden_id = (popup.locator("#id_0").input_value() or "").strip()
if hidden_id:
return False
request_inning = popup.locator("#requestInning_0").input_value()
request_team = popup.locator("#requestTeamId_0").input_value()
review_item = popup.locator("#forWhat_0").input_value()
final_result = (popup.locator("#finalResult_0").input_value() or "").strip()
is_success = popup.locator("#isSuccess_0").input_value()
return (
request_inning == "1"
and review_item == "홈런타구 페어 파울"
and final_result == "페어"
and is_success == "Y"
and bool(request_team)
)
except Exception:
return False
def _save_review_popup(popup: Page) -> None:
"""팝업 저장 및 닫기"""
if popup.is_closed():
return
popup.evaluate("""() => {
window.confirm = () => true;
window.alert = () => {};
}""")
try:
with popup.expect_response(
re.compile(r"/manager/game/status/challenge/ajax"),
timeout=3000,
) as _:
save_btn = popup.locator("#saveLog")
if save_btn.count() > 0:
save_btn.click(force=True)
else:
popup.evaluate("""() => {
const btn = document.querySelector('#btnAdd')
|| document.querySelector('#btnSave')
|| [...document.querySelectorAll('button, a')].find(
el => el.innerText.includes('입력완료') || el.innerText.includes('저장')
);
if (btn) btn.click();
}""")
except Exception:
try:
popup.evaluate("""() => {
const btn = document.querySelector('#saveLog')
|| document.querySelector('#btnAdd');
if (btn) btn.click();
}""")
popup.wait_for_timeout(1000)
except Exception:
pass
try:
if not popup.is_closed():
popup.close()
except Exception:
pass
def record_review_events(page: Page, review_events: list[dict[str, Any]]) -> None:
"""비디오 판독 이벤트 전체 처리 파이프라인"""
normalized_events = [_normalize_review_event(event) for event in (review_events or [])]
if not normalized_events:
return
popup = _open_challenge_popup(page)
reuse_initial_row = _can_reuse_initial_review_row(popup)
for index, review_event in enumerate(normalized_events):
if index == 0 and reuse_initial_row:
row_index = 0
else:
row_index = _append_review_row(popup)
_fill_review_row(popup, row_index, review_event)
_save_review_popup(popup)
page.wait_for_timeout(300)
try:
page.bring_to_front()
except Exception:
pass

271
automation/runner_input.py Normal file
View File

@@ -0,0 +1,271 @@
"""
automation/runner_input.py — 주루 입력
주루 이벤트(진루, 도루, 견제, 실책 진루 등)를 관리자 사이트에 입력합니다.
"""
from __future__ import annotations
import re
from time import time, sleep
from typing import Any
from playwright.sync_api import Page
from core.field_calculator import is_double_play_result, extract_error_position
from core.runner_classifier import infer_runner_action_label
from automation.page_helpers import (
get_last_visible_enabled_locator,
set_radio_by_label,
get_checked_event_name,
)
from automation.defense_popup import (
fill_error_defense_popup,
fill_runner_out_defense,
)
def _split_complex_runner_event(
runner_event: dict[str, Any],
) -> tuple[dict[str, Any], dict[str, Any] | None]:
"""복합 주루 이벤트(예: 1루주자 2루까지 진루 / 홈까지 들어오다 아웃) 분할"""
text = runner_event.get("text") or ""
if "/" not in text or ("실책" not in text and "아웃" not in text):
return runner_event, None
parts = [p.strip() for p in text.split("/") if p.strip()]
if len(parts) < 2:
return runner_event, None
def extract_base(t: str) -> int | None:
m = re.search(r"([123])루", t)
return int(m.group(1)) if m else None
# 1차 이벤트
primary = dict(runner_event)
primary["text"] = parts[0]
intermediate_to = extract_base(parts[0])
if intermediate_to:
primary["toBase"] = intermediate_to
# 2차 이벤트
secondary = dict(runner_event)
secondary["fromBase"] = primary.get("toBase")
secondary["text"] = parts[1]
if "실책" in parts[1]:
secondary["type"] = "error_advance"
elif "태그" in parts[1]:
secondary["type"] = "tag_out"
elif "포스" in parts[1]:
secondary["type"] = "force_out"
else:
secondary["type"] = "out"
return primary, secondary
def _open_runner_area(page: Page, from_base: int, area_type: int) -> None:
"""주루 영역(1: 진루, 2: 액션) 열기"""
function_name = f"changRunnerArea{from_base}"
page.evaluate(
"""({ functionName, areaType }) => {
const fn = window[functionName];
if (typeof fn === 'function') {
fn(areaType);
}
}""",
{"functionName": function_name, "areaType": area_type},
)
deadline = time() + 2
radio_name = f"evt_runner_{from_base}"
advance_name = f"dat_evt_runner_{from_base}_advance"
while time() < deadline:
ready = page.evaluate(
"""({ radioName, advanceName }) => {
const hasEnabledVisibleRadio = [...document.querySelectorAll(`input[type=radio][name='${radioName}']`)].some((node) => {
const rect = node.getBoundingClientRect();
const style = window.getComputedStyle(node);
return !!(rect.width && rect.height && style.display !== 'none' && style.visibility !== 'hidden' && !node.disabled);
});
const hasEnabledVisibleAdvance = [...document.querySelectorAll(`input[type=radio][name='${advanceName}']`)].some((node) => {
const rect = node.getBoundingClientRect();
const style = window.getComputedStyle(node);
return !!(rect.width && rect.height && style.display !== 'none' && style.visibility !== 'hidden' && !node.disabled);
});
return hasEnabledVisibleRadio || hasEnabledVisibleAdvance;
}""",
{"radioName": radio_name, "advanceName": advance_name},
)
if ready:
return
sleep(0.1)
def _set_runner_action(page: Page, from_base: int, label: str) -> None:
"""주자 액션(일반진루, 도루성공 등) 라디오 버튼 세팅"""
radio_name = f"evt_runner_{from_base}"
locator = page.evaluate(
r"""({ radioName, eventName }) => {
const nodes = [...document.querySelectorAll(`input[type=radio][name='${radioName}']`)];
// 1단계: eventName 속성으로 매칭
for (const node of nodes) {
const rect = node.getBoundingClientRect();
const style = window.getComputedStyle(node);
if (!rect.width || !rect.height || style.display === 'none' || style.visibility === 'hidden' || node.disabled) {
continue;
}
if ((node.getAttribute('eventName') || '') === eventName) {
const marker = `codex-runner-${Math.random().toString(36).slice(2)}`;
node.setAttribute('data-codex-marker', marker);
return marker;
}
}
// 2단계: 텍스트로 부분 매칭
for (const node of nodes) {
let text = '';
let p = node.parentElement;
if (p) text = p.textContent.trim();
if (!text && node.nextSibling) text = node.nextSibling.textContent || '';
if (eventName && text.replace(/\s/g, '').includes(eventName.replace(/\s/g, ''))) {
const marker = `codex-runner-${Math.random().toString(36).slice(2)}`;
node.setAttribute('data-codex-marker', marker);
return marker;
}
}
return null;
}""",
{"radioName": radio_name, "eventName": label},
)
if locator:
candidate = page.locator(f"[data-codex-marker='{locator}']")
for _ in range(5):
try:
candidate.click(force=True, timeout=500)
except Exception:
pass
try:
candidate.evaluate(
"""(node) => {
node.disabled = false;
node.checked = true;
node.dispatchEvent(new Event('click', { bubbles: true }));
node.dispatchEvent(new Event('change', { bubbles: true }));
}"""
)
except Exception:
pass
page.wait_for_timeout(100)
if get_checked_event_name(page, radio_name) == label:
return
else:
for _ in range(5):
set_radio_by_label(page, radio_name, label)
page.wait_for_timeout(50)
if get_checked_event_name(page, radio_name) == label:
return
def _set_runner_advance(page: Page, from_base: int, to_base: int | None) -> None:
"""주자 최종 목적지 루 세팅"""
if to_base is None:
return
selector = f"input[name='dat_evt_runner_{from_base}_advance'][value='{to_base}']"
deadline = time() + 3
locator = None
while time() < deadline:
locator = get_last_visible_enabled_locator(page, selector)
if locator is not None:
break
sleep(0.1)
if locator is None:
fallback = page.locator(selector)
if fallback.count():
locator = fallback.last
else:
return # timeout
try:
locator.click(force=True)
except Exception:
locator.evaluate(
"""(node) => {
node.disabled = false;
node.checked = true;
node.dispatchEvent(new Event('click', { bubbles: true }));
node.dispatchEvent(new Event('change', { bubbles: true }));
}"""
)
def set_runner_events(
page: Page, event: dict[str, Any], runner_events: list[dict[str, Any]] | None = None,
) -> list[dict[str, Any]]:
"""모든 주루 이벤트를 처리하고 지연 처리할 이벤트(late_events) 반환"""
if runner_events is None:
runner_events = (event.get("runnerEvents") or []).copy()
late_events = []
filtered_events = []
for re_item in runner_events:
primary, secondary = _split_complex_runner_event(re_item)
filtered_events.append(primary)
if secondary:
late_events.append(secondary)
for runner_event in filtered_events:
from_base = runner_event.get("fromBase")
if from_base not in {1, 2, 3}:
continue
label = infer_runner_action_label(event, runner_event)
if not label:
continue
if any(k in label for k in ["도루", "견제", "폭투", "포일", "아웃"]):
area_type = 2
else:
area_type = 1
if any(k in label for k in ["일반 진루", "볼넷 진루", "수비 실책"]):
area_type = 1
_open_runner_area(page, from_base, area_type)
_set_runner_action(page, from_base, label)
runner_text = runner_event.get("text") or ""
is_error_related_label = (
(label and "실책" in label)
or label in {"견제 에러", "수비 실책", "도루성공&실책"}
or "실책" in runner_text
)
if is_error_related_label:
if extract_error_position(runner_text):
fill_error_defense_popup(page, runner_text)
if label in {"태그아웃", "도루시도 아웃", "포스아웃"}:
fill_runner_out_defense(page, runner_text)
page.wait_for_timeout(150)
_set_runner_advance(page, from_base, runner_event.get("toBase"))
try:
extra_advance = runner_event.get("extra_advance")
if extra_advance and extra_advance > 0:
locator = get_last_visible_enabled_locator(page, f"select[name='runner{from_base}_running_add']")
if locator is not None:
locator.select_option(value=str(extra_advance))
except Exception:
pass
page.wait_for_timeout(150)
return late_events