Refactor Translator class and modernize translation access pattern

Simplified the Translator class to be more direct and Pythonic, and
updated all translation access from callable pattern to dict access.

Translator changes (src/i18n/translator.py):
- Removed complex fallback/merging logic with English translations
- Simplified from callable pattern _("key", "subkey") to dict access _.t["key"]["subkey"]
- Removed _load_english_translation() and module-level English translation
- Changed from list of language names to dict of Translation objects
- Removed unnecessary imports (abc, json_utils, MinerException, TYPE_CHECKING)
- Made language storage and access more direct (self.t property)
- Simplified set_language() to direct dict lookup

Translation access pattern changes (all other files):
- Changed from _("key", "subkey") to _.t["key"]["subkey"] throughout codebase
- This is more Pythonic and aligns with TypedDict structure
- Applied in: __main__.py, http_client.py, auth_state.py, models/drop.py,
  services (inventory, message_handlers, watch), web managers (login, settings),
  websocket.py

Benefits:
- More Pythonic: Direct dict access instead of callable
- Simpler: Removed ~70 lines of complex fallback logic
- Type-safe: Direct access to TypedDict structure
- More explicit: _.t["key"]["subkey"] shows the structure clearly
- Easier to maintain: No complex merging or fallback logic

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
Fengqing Liu
2025-10-25 14:00:29 +11:00
parent 221939e26a
commit 4b342d82ac
11 changed files with 45 additions and 119 deletions

View File

@@ -98,10 +98,7 @@ if __name__ == "__main__":
# client run
async def main():
# set language
from contextlib import suppress
with suppress(ValueError):
# this language doesn't exist - stick to English
if settings.language:
_.set_language(settings.language)
# Always log to file with timestamped filename in ./logs/ directory
@@ -159,7 +156,7 @@ if __name__ == "__main__":
except CaptchaRequired:
logger.error("Captcha required - cannot continue")
exit_status = 1
client.print(_("error", "captcha"))
client.print(_.t["error"]["captcha"])
except Exception:
logger.exception("Fatal error encountered during client run")
exit_status = 1
@@ -172,7 +169,7 @@ if __name__ == "__main__":
loop.remove_signal_handler(signal.SIGINT)
loop.remove_signal_handler(signal.SIGTERM)
logger.info("Notifying client of exit")
client.print(_("gui", "status", "exiting"))
client.print(_.t["gui"]["status"]["exiting"])
# Shutdown web server
if web_server_task and not web_server_task.done():
logger.info("Shutting down web server")
@@ -202,8 +199,8 @@ if __name__ == "__main__":
if exit_status != 0:
logger.warning("Application terminated with error - showing error state")
# Application terminated with error
client.print(_("status", "terminated"))
client.gui.status.update(_("gui", "status", "terminated"))
client.print(_.t["status"]["terminated"])
client.gui.status.update(_.t["gui"]["status"]["terminated"])
# notify the user about the closure
client.gui.grab_attention(sound=True)
# Web GUI doesn't need to wait - browser clients can stay connected

View File

@@ -191,7 +191,7 @@ class HTTPClient:
yield response
return
self.gui.print(_("error", "site_down").format(seconds=round(delay)))
self.gui.print(_.t["error"]["site_down"].format(seconds=round(delay)))
except aiohttp.ClientConnectorCertificateError:
# SSL verification failures should not be retried
raise
@@ -203,7 +203,7 @@ class HTTPClient:
# Connection problems, retry with backoff
if backoff.steps > 1:
# Don't show quick retries to the user
self.gui.print(_("error", "no_connection").format(seconds=round(delay)))
self.gui.print(_.t["error"]["no_connection"].format(seconds=round(delay)))
finally:
if response is not None:
response.release()

View File

@@ -232,7 +232,7 @@ class _AuthState:
# looks like we're missing something
login_form: LoginForm = self._twitch.gui.login
logger.info("Checking login")
login_form.update(_("login", "status", "logging_in"), None)
login_form.update(_.t["login"]["status"]["logging_in"], None)
for _client_mismatch_attempt in range(2):
for _invalid_token_attempt in range(2):
cookie = jar.filter_cookies(client_info.CLIENT_URL)
@@ -271,7 +271,7 @@ class _AuthState:
self.user_id = int(validate_response["user_id"])
cookie["persistent"] = str(self.user_id)
logger.info(f"Login successful, user ID: {self.user_id}")
login_form.update(_("login", "status", "logged_in"), self.user_id)
login_form.update(_.t["login"]["status"]["logged_in"], self.user_id)
# update our cookie and save it
jar.update_cookies(cookie, client_info.CLIENT_URL)
jar.save(COOKIES_PATH)

View File

@@ -1,17 +1,10 @@
from __future__ import annotations
import json
from collections import abc
from typing import TYPE_CHECKING, Any, TypedDict, cast
import logging
from typing import TypedDict, cast
from src.config import DEFAULT_LANG, LANG_PATH
from src.exceptions import MinerException
from src.utils.json_utils import json_load
if TYPE_CHECKING:
from typing_extensions import NotRequired
class StatusMessages(TypedDict):
terminated: str
@@ -212,96 +205,32 @@ class Translation(TypedDict):
gui: GUIMessages
# Load English translation from JSON file (single source of truth)
def _load_english_translation() -> Translation:
"""Load the English translation from lang/English.json.
This is the fallback translation used when other translations are missing keys.
"""
english_path = LANG_PATH / "English.json"
try:
with open(english_path, "r", encoding="utf-8") as f:
return cast(Translation, json.load(f))
except Exception as e:
raise MinerException(
f"Failed to load English translation from {english_path}: {e}"
) from e
# Module-level English translation (loaded once at import time)
_english_translation = _load_english_translation()
class Translator:
def __init__(self) -> None:
self._langs: list[str] = []
# start with English translation (loaded from JSON)
self._translation: Translation = _english_translation.copy()
self._translation["language_name"] = DEFAULT_LANG
self.logger: logging.Logger = logging.getLogger("TwitchDropsMiner.i18n.Translator")
self._langs: dict[str, Translation] = {}
self.current_language: str
self.t: Translation
# load available languages from JSON files by reading language_name field
for filepath in LANG_PATH.glob("*.json"):
try:
with open(filepath, "r", encoding="utf-8") as f:
data = json.load(f)
if "language_name" in data:
self._langs.append(data["language_name"])
else:
# fallback to filename if language_name is missing
self._langs.append(filepath.stem)
except Exception:
loaded_translation: Translation = json.load(open(filepath, "r"))
self._langs[loaded_translation["language_name"]] = loaded_translation
except Exception as e:
# if we can't read the file, skip it
self.logger.warning(f"Failed to load language file {filepath}: {e}")
continue
self._langs.sort()
# ensure DEFAULT_LANG is first in the list
if DEFAULT_LANG in self._langs:
self._langs.remove(DEFAULT_LANG)
self._langs.insert(0, DEFAULT_LANG)
self._langs = dict(sorted(self._langs.items()))
self.set_language(DEFAULT_LANG)
@property
def languages(self) -> abc.Iterable[str]:
return iter(self._langs)
@property
def current(self) -> str:
return self._translation["language_name"]
def get_languages(self) -> list[str]:
return list(self._langs.keys())
def set_language(self, language: str):
if language not in self._langs:
raise ValueError("Unrecognized language")
elif self._translation["language_name"] == language:
# same language as loaded selected
return
elif language == DEFAULT_LANG:
# default language selected - use English from JSON
self._translation = _english_translation.copy()
self._translation["language_name"] = DEFAULT_LANG
else:
# find the JSON file with matching language_name field
for filepath in LANG_PATH.glob("*.json"):
try:
with open(filepath, "r", encoding="utf-8") as f:
data = json.load(f)
if data.get("language_name") == language:
self._translation = json_load(filepath, _english_translation)
return
except Exception:
continue
# if we can't find a matching file, raise an error
raise ValueError(f"Cannot find translation file for language: {language}")
def __call__(self, *path: str) -> str:
if not path:
raise ValueError("Language path expected")
v: Any = self._translation
try:
for key in path:
v = v[key]
except KeyError as err:
# this can only really happen for the default translation
raise MinerException(
f"{self.current} translation is missing the '{' -> '.join(path)}' translation key"
) from err
return str(v)
raise ValueError(f"Unrecognized language {language}")
self.current_language = language
self.t = cast(Translation, self._langs.get(language))
_ = Translator()

View File

@@ -150,7 +150,7 @@ class BaseDrop:
# two different claim texts, becase a new line after the game name
# looks ugly in the output window - replace it with a space
self._twitch.print(
_("status", "claimed_drop").format(drop=claim_text.replace("\n", " "))
_.t["status"]["claimed_drop"].format(drop=claim_text.replace("\n", " "))
)
else:
logger.error(f"Drop claim has potentially failed! Drop ID: {self.id}")

View File

@@ -100,7 +100,7 @@ class InventoryService:
6. Sets up maintenance triggers for campaign timing changes
"""
status_update = self._twitch.gui.status.update
status_update(_("gui", "status", "fetching_inventory"))
status_update(_.t["gui"]["status"]["fetching_inventory"])
# fetch in-progress campaigns (inventory)
response = await self._twitch.gql_request(GQL_OPERATIONS["Inventory"])
@@ -125,7 +125,7 @@ class InventoryService:
}
# fetch detailed data for each campaign, in chunks
status_update(_("gui", "status", "fetching_campaigns"))
status_update(_.t["gui"]["status"]["fetching_campaigns"])
fetch_campaigns_tasks: list[asyncio.Task[Any]] = [
asyncio.create_task(self.fetch_campaigns(campaigns_chunk))
for campaigns_chunk in chunk(available_campaigns.items(), 20)
@@ -174,7 +174,7 @@ class InventoryService:
# concurrently add the campaigns into the GUI
# NOTE: this fetches pictures from the CDN, so might be slow without a cache
status_update(
_("gui", "status", "adding_campaigns").format(counter=f"(0/{len(campaigns)})")
_.t["gui"]["status"]["adding_campaigns"].format(counter=f"(0/{len(campaigns)})")
)
add_campaign_tasks: list[asyncio.Task[None]] = [
asyncio.create_task(self._twitch.gui.inv.add_campaign(campaign))
@@ -185,7 +185,7 @@ class InventoryService:
for i, coro in enumerate(asyncio.as_completed(add_campaign_tasks), start=1):
await coro
status_update(
_("gui", "status", "adding_campaigns").format(counter=f"({i}/{len(campaigns)})")
_.t["gui"]["status"]["adding_campaigns"].format(counter=f"({i}/{len(campaigns)})")
)
# this is needed here explicitly, because cache reads from disk don't raise this
from src.config import State

View File

@@ -143,7 +143,7 @@ class MessageHandlerService:
# Channel going from OFFLINE to ONLINE
if stream_before is None and stream_after is not None:
if self._twitch.can_watch(channel) and self._twitch.should_switch(channel):
self._twitch.print(_("status", "goes_online").format(channel=channel.name))
self._twitch.print(_.t["status"]["goes_online"].format(channel=channel.name))
self._twitch.watch(channel)
else:
logger.info(f"{channel.name} goes ONLINE")
@@ -151,7 +151,7 @@ class MessageHandlerService:
# Channel going from ONLINE to OFFLINE
elif stream_before is not None and stream_after is None:
if is_watching_this:
self._twitch.print(_("status", "goes_offline").format(channel=channel.name))
self._twitch.print(_.t["status"]["goes_offline"].format(channel=channel.name))
self._twitch.change_state(State.CHANNEL_SWITCH)
else:
logger.info(f"{channel.name} goes OFFLINE")

View File

@@ -129,7 +129,7 @@ class WatchService:
if self._twitch.is_manual_mode() and self._twitch._manual_target_game:
status_text = f"🎯 Manual Mode: Watching {channel.name} for {self._twitch._manual_target_game.name}"
else:
status_text = _("status", "watching").format(channel=channel.name)
status_text = _.t["status"]["watching"].format(channel=channel.name)
self._twitch.print(status_text)
self._twitch.gui.status.update(status_text)

View File

@@ -35,7 +35,7 @@ class LoginFormManager:
self._manager = manager
self._login_event = asyncio.Event()
self._login_data: LoginData | None = None
self._status = _("login", "status", "logged_out")
self._status = _.t["login"]["status"]["logged_out"]
self._user_id: int | None = None
self._oauth_pending: dict[str, str] | None = (
None # Store OAuth code for late-connecting clients
@@ -78,7 +78,7 @@ class LoginFormManager:
page_url: URL where user should enter the code (e.g., twitch.tv/activate)
user_code: The device code to enter
"""
self.update(_("login", "status", "required"), None)
self.update(_.t["login"]["status"]["required"], None)
self._login_event.clear()
# Store OAuth code for late-connecting clients
self._oauth_pending = {"url": str(page_url), "code": user_code}

View File

@@ -49,8 +49,8 @@ class SettingsManager:
Dictionary with available languages and current language
"""
return {
"available": list(_.languages),
"current": _.current,
"available": _.get_languages(),
"current": _.current_language,
}
def update_settings(self, settings_data: dict[str, Any]):

View File

@@ -74,7 +74,7 @@ class Websocket:
self.topics: dict[str, WebsocketTopic] = {}
self._submitted: set[WebsocketTopic] = set()
# notify GUI
self.set_status(_("gui", "websocket", "disconnected"))
self.set_status(_.t["gui"]["websocket"]["disconnected"])
@property
def connected(self) -> bool:
@@ -127,7 +127,7 @@ class Websocket:
self._closed.set()
ws = self._ws.get_with_default(None)
if ws is not None:
self.set_status(_("gui", "websocket", "disconnecting"))
self.set_status(_.t["gui"]["websocket"]["disconnecting"])
await ws.close()
if self._handle_task is not None:
with suppress(asyncio.TimeoutError, asyncio.CancelledError):
@@ -187,9 +187,9 @@ class Websocket:
async def _handle(self):
"""Main websocket handler that manages connection lifecycle and message processing."""
# ensure we're logged in before connecting
self.set_status(_("gui", "websocket", "initializing"))
self.set_status(_.t["gui"]["websocket"]["initializing"])
await self._twitch.wait_until_login()
self.set_status(_("gui", "websocket", "connecting"))
self.set_status(_.t["gui"]["websocket"]["connecting"])
ws_logger.debug(f"Websocket[{self._idx}] connecting...")
self._closed.clear()
# Connect/Reconnect loop
@@ -201,7 +201,7 @@ class Websocket:
self._reconnect_requested.clear()
# NOTE: _topics_changed doesn't start set,
# because there's no initial topics we can sub to right away
self.set_status(_("gui", "websocket", "connected"))
self.set_status(_.t["gui"]["websocket"]["connected"])
ws_logger.debug(f"Websocket[{self._idx}] connected.")
try:
try:
@@ -224,11 +224,11 @@ class Websocket:
elif self._closed.is_set():
# we closed it - exit
ws_logger.debug(f"Websocket[{self._idx}] stopped.")
self.set_status(_("gui", "websocket", "disconnected"))
self.set_status(_.t["gui"]["websocket"]["disconnected"])
return
except Exception:
ws_logger.exception(f"Exception in Websocket[{self._idx}]")
self.set_status(_("gui", "websocket", "reconnecting"))
self.set_status(_.t["gui"]["websocket"]["reconnecting"])
ws_logger.warning(f"Websocket[{self._idx}] reconnecting...")
async def _handle_ping(self):