import sqlite3 import datetime from pathlib import Path DB_PATH = Path(__file__).parent / "baseball_logs.db" def init_db(): conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() # execution_jobs 테이블: 전체 경기 기록 세션 관리 cursor.execute(""" CREATE TABLE IF NOT EXISTS execution_jobs ( job_id TEXT PRIMARY KEY, game_id TEXT NOT NULL, inning_range TEXT, status TEXT DEFAULT 'RUNNING', start_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP, end_time TIMESTAMP ) """) # pitch_logs 테이블: 투구 개별 트랜잭션 기록 cursor.execute(""" CREATE TABLE IF NOT EXISTS pitch_logs ( pitch_id INTEGER PRIMARY KEY AUTOINCREMENT, job_id TEXT, inning TEXT, batter TEXT, pitch_no INTEGER, target_value TEXT, selected_value TEXT, is_success INTEGER, error_code TEXT, error_detail TEXT, duration REAL, log_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY(job_id) REFERENCES execution_jobs(job_id) ) """) # event_logs 테이블: 선수 교체 및 기타 주요 이벤트 기록 cursor.execute(""" CREATE TABLE IF NOT EXISTS event_logs ( event_id INTEGER PRIMARY KEY AUTOINCREMENT, job_id TEXT, inning TEXT, event_type TEXT, target_player TEXT, actual_player TEXT, is_success INTEGER, error_msg TEXT, log_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY(job_id) REFERENCES execution_jobs(job_id) ) """) conn.commit() conn.close() def start_job(job_id: str, game_id: str, start_inning: str = "", end_inning: str = "") -> str: init_db() conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() inning_range = "ALL" if start_inning and end_inning: inning_range = f"{start_inning}-{end_inning}" elif start_inning: inning_range = f"From {start_inning}" cursor.execute( "INSERT INTO execution_jobs (job_id, game_id, inning_range) VALUES (?, ?, ?)", (job_id, game_id, inning_range) ) conn.commit() conn.close() return job_id def finish_job(job_id: str, status: str = "COMPLETED"): conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() cursor.execute( "UPDATE execution_jobs SET status = ?, end_time = ? WHERE job_id = ?", (status, datetime.datetime.now(), job_id) ) conn.commit() conn.close() def log_pitch(job_id: str, inning: str, batter: str, pitch_no: int, target_value: str, selected_value: str, is_success: bool, error_code: str = "", error_detail: str = "", duration: float = 0.0): conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() cursor.execute( """INSERT INTO pitch_logs (job_id, inning, batter, pitch_no, target_value, selected_value, is_success, error_code, error_detail, duration) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", (job_id, inning, batter, pitch_no, target_value, selected_value, 1 if is_success else 0, error_code, error_detail, duration) ) conn.commit() conn.close() def log_event(job_id: str, inning: str, event_type: str, target_player: str, actual_player: str, is_success: bool, error_msg: str = ""): conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() cursor.execute( """INSERT INTO event_logs (job_id, inning, event_type, target_player, actual_player, is_success, error_msg) VALUES (?, ?, ?, ?, ?, ?, ?)""", (job_id, inning, event_type, target_player, actual_player, 1 if is_success else 0, error_msg) ) conn.commit() conn.close() def get_pitch_logs(job_id: str) -> list[dict]: init_db() conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row cursor = conn.cursor() cursor.execute("SELECT * FROM pitch_logs WHERE job_id = ? ORDER BY pitch_id ASC", (job_id,)) rows = cursor.fetchall() conn.close() return [dict(row) for row in rows] def get_event_logs(job_id: str) -> list[dict]: init_db() conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row cursor = conn.cursor() cursor.execute("SELECT * FROM event_logs WHERE job_id = ? ORDER BY event_id ASC", (job_id,)) rows = cursor.fetchall() conn.close() return [dict(row) for row in rows] def get_combined_logs(job_id: str) -> list[dict]: """투구 로그와 이벤트 로그를 합쳐서 시간순으로 반환""" init_db() conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row cursor = conn.cursor() # 1. 투구 로그 (type='pitch') cursor.execute(""" SELECT 'pitch' as type, inning, batter as target_name, '[' || pitch_no || '구] ' || target_value as action_desc, selected_value as actual_desc, is_success, error_detail as error_msg, log_time FROM pitch_logs WHERE job_id = ? """, (job_id,)) pitches = [dict(row) for row in cursor.fetchall()] # 2. 이벤트 로그 (type='event') cursor.execute(""" SELECT 'event' as type, inning, event_type as target_name, target_player as action_desc, actual_player as actual_desc, is_success, error_msg, log_time FROM event_logs WHERE job_id = ? """, (job_id,)) events = [dict(row) for row in cursor.fetchall()] conn.close() combined = pitches + events # 시간순 정렬 (log_time 기준) combined.sort(key=lambda x: x['log_time']) return combined