# -*- coding: utf-8 -*- """ agentm.logic.roms This module handles the verification and caching of ROM files for the AgentM application. It interacts with the DIAMBRA CLI to check ROM validity and updates the database accordingly. It also provides functions to extract game information from the DIAMBRA CLI output. """ import os import subprocess import re from agentm.logic.db_functions import get_cached_rom, upsert_rom_record, get_all_verified_roms from agentm.utils.logger import log_with_caller ROM_FOLDER = "agentm/roms" GAME_FILES = { "Dead Or Alive ++": "doapp.zip", "The King of Fighters '98 UM": "kof98umh.zip", "Marvel vs. Capcom": "mvsc.zip", "Samurai Shodown V Special": "samsh5sp.zip", "Street Fighter III: 3rd Strike": "sfiii3n.zip", "Soul Calibur": "soulclbr.zip", "Tekken Tag Tournament": "tektagt.zip", "Ultimate Mortal Kombat 3": "umk3.zip", "X-Men vs. Street Fighter": "xmvsf.zip", } def get_image_path(rom_file: str) -> str: """Returns relative image path for a given ROM zip.""" base_name = os.path.splitext(rom_file)[0] return f"agentm/assets/game_images/{base_name}.jpg" def get_verified_roms(): verified = [] log_with_caller("debug", f"Starting ROM verification loop for {len(GAME_FILES)} files") # Call `list-roms` once for efficiency list_result = subprocess.run(["diambra", "arena", "list-roms"], capture_output=True, text=True) full_list_output = list_result.stdout for title, rom_file in GAME_FILES.items(): rom_path = os.path.join(ROM_FOLDER, rom_file) log_with_caller("debug", f"Checking ROM path: {rom_path}") if not os.path.exists(rom_path): log_with_caller("warning", f"ROM not found: {rom_file} (expected at {rom_path})") continue cached = get_cached_rom(rom_file) if cached and cached["verified"]: # Add image path to runtime version even if not stored in DB cached["image_path"] = get_image_path(rom_file) log_with_caller("info", f"✓ Cached ROM is valid: {title} ({rom_file})") verified.append(cached) continue log_with_caller("debug", f"No valid cache found. Verifying with DIAMBRA CLI: {rom_file}") result = subprocess.run( ["diambra", "arena", "check-roms", os.path.abspath(rom_path)], capture_output=True, text=True ) log_with_caller("debug", f"DIAMBRA output for {rom_file}:\n{result.stdout.strip()}") if "Correct ROM file" in result.stdout: sha256_match = re.search(r"sha256\s*=\s*([a-f0-9]{64})", result.stdout, re.IGNORECASE) sha256 = sha256_match.group(1) if sha256_match else "" game_id = rom_file.replace(".zip", "") image_path = get_image_path(rom_file) block = extract_game_block(full_list_output, game_id) difficulty_min = extract_line_int(block, "Difficulty levels", index=0) difficulty_max = extract_line_int(block, "Difficulty levels", index=1) characters = extract_line_list(block, "Characters list") keywords = extract_line_list(block, "Search keywords") upsert_rom_record( title=title, rom_file=rom_file, game_id=game_id, sha256=sha256, difficulty_min=difficulty_min, difficulty_max=difficulty_max, characters=characters, keywords=keywords # Note: if your DB schema supports image_path, include it here ) verified.append({ "title": title, "rom_file": rom_file, "game_id": game_id, "sha256": sha256, "difficulty_min": difficulty_min, "difficulty_max": difficulty_max, "characters": characters, "keywords": keywords, "verified": True, "image_path": image_path, }) log_with_caller("info", f"✓ Verified and cached: {title} ({rom_file})") else: log_with_caller("error", f"✗ Invalid ROM: {title} ({rom_file})") # If using DB records, append image paths before returning if not verified: verified = get_all_verified_roms() for v in verified: v["image_path"] = get_image_path(v["rom_file"]) log_with_caller("info", f"ROM verification completed: {len(verified)} valid game(s)") return verified def extract_game_block(output: str, game_id: str) -> str: """Extracts the block of text for the given game_id from `list-roms` output.""" lines = output.splitlines() block = [] inside = False for line in lines: if f"game_id: {game_id.lower()}" in line.lower(): inside = True elif inside and line.strip() == "": break if inside: block.append(line) return "\n".join(block) def extract_line_list(block: str, label: str) -> list[str]: match = re.search(rf"{re.escape(label)}:\s*(\[[^\]]*\])", block) if not match: return [] try: return eval(match.group(1), {"__builtins__": None}, {}) except Exception: return [] def extract_line_int(block: str, label: str, index: int = 0) -> int: match = re.search(rf"{re.escape(label)}:\s*Min\s*(\d+)\s*-\s*Max\s*(\d+)", block) if match: return int(match.group(index + 1)) return 0