chore: Remove legacy logging options and build scripts; update logging to use TimedRotatingFileHandler

This commit is contained in:
Fengqing Liu
2025-10-19 12:40:22 +11:00
parent 42a491331e
commit 3cedd0ef7f
7 changed files with 33 additions and 283 deletions

View File

@@ -34,9 +34,6 @@ source env/bin/activate && python main.py
# With verbose logging (stackable: -vv, -vvv)
source env/bin/activate && python main.py -v
# Enable legacy log file (logs always go to ./logs/TDM.TIMESTAMP.log)
source env/bin/activate && python main.py --log
# Create data dump for debugging
source env/bin/activate && python main.py --dump
@@ -228,7 +225,6 @@ The project does not include a test suite. Manual testing workflow:
1. Run with `-vvv` for maximum verbosity (levels: -v, -vv, -vvv, -vvvv)
2. Use `--dump` to generate debug data dumps
3. Check timestamped log files in `./logs/` directory (always created as `TDM.TIMESTAMP.log`)
4. Use `--log` flag to also create legacy `log.txt` file
5. Use `--debug-ws` for websocket debug logging
6. Use `--debug-gql` for GraphQL debug logging
7. Monitor web GUI console output and browser developer tools

View File

@@ -1,51 +0,0 @@
@echo off
REM Get the directory path of the script
set "dirpath=%~dp0"
if "%dirpath:~-1%" == "\" set "dirpath=%dirpath:~0,-1%"
REM Check if the virtual environment exists
if not exist "%dirpath%\env" (
echo:
echo No virtual environment found! Run setup_env.bat to set it up first.
echo:
if not "%~1"=="--nopause" pause
exit /b 1
)
REM Check if PyInstaller and pywin32 is installed in the virtual environment
if not exist "%dirpath%\env\scripts\pyinstaller.exe" (
echo Installing PyInstaller...
"%dirpath%\env\scripts\pip" install pyinstaller
if errorlevel 1 (
echo:
echo Failed to install PyInstaller.
echo:
if not "%~1"=="--nopause" pause
exit /b 1
)
"%dirpath%\env\scripts\python" "%dirpath%\env\scripts\pywin32_postinstall.py" -install -silent
if errorlevel 1 (
echo:
echo Failed to run pywin32_postinstall.py.
echo:
if not "%~1"=="--nopause" pause
exit /b 1
)
)
REM Run PyInstaller with the specified build spec file
echo Building...
"%dirpath%\env\scripts\pyinstaller" "%dirpath%\build.spec"
if errorlevel 1 (
echo:
echo PyInstaller build failed.
echo:
if not "%~1"=="--nopause" pause
exit /b 1
)
echo:
echo Build completed successfully.
echo:
if not "%~1"=="--nopause" pause

View File

@@ -1,43 +0,0 @@
#!/usr/bin/env bash
dirpath=$(dirname "$(readlink -f "$0")")
# Check if the virtual environment exists
if [ ! -d "$dirpath/env" ]; then
echo
echo "No virtual environment found! Run setup_env.sh to set it up first."
echo
[ "$1" != "--nopause" ] && read -p "Press any key to continue..."
exit 1
fi
# Check if PyInstaller is installed in the virtual environment
if [ ! -f "$dirpath/env/bin/pyinstaller" ]; then
echo
echo "Installing PyInstaller..."
"$dirpath/env/bin/pip" install pyinstaller
if [ $? -ne 0 ]; then
echo
echo "Failed to install PyInstaller."
echo
[ "$1" != "--nopause" ] && read -p "Press any key to continue..."
exit 1
fi
fi
# Run PyInstaller with the specified build spec file
echo
echo "Building..."
"$dirpath/env/bin/pyinstaller" "$dirpath/build.spec"
if [ $? -ne 0 ]; then
echo
echo "PyInstaller build failed."
echo
[ "$1" != "--nopause" ] && read -p "Press any key to continue..."
exit 1
fi
echo
echo "Build completed successfully."
echo
[ "$1" != "--nopause" ] && read -p "Press any key to continue..."

View File

@@ -1,144 +0,0 @@
# -*- mode: python ; coding: utf-8 -*-
from __future__ import annotations
import sys
import platform
import fnmatch
from pathlib import Path
from collections import abc
from traceback import format_exc
from typing import Any, TypeAlias, TYPE_CHECKING
SELF_PATH = str(Path(".").resolve())
if SELF_PATH not in sys.path:
sys.path.insert(0, SELF_PATH)
from constants import WORKING_DIR, SITE_PACKAGES_PATH, DEFAULT_LANG
if TYPE_CHECKING:
from PyInstaller.building.splash import Splash
from PyInstaller.building.build_main import Analysis
from PyInstaller.building.datastruct import _TOCTuple
from PyInstaller.building.api import PYZ, EXE, COLLECT
PYZTypeCOLLECT: TypeAlias = "abc.Iterable[_TOCTuple] | PYZ"
PYZTypeEXE: TypeAlias = "abc.Iterable[_TOCTuple] | PYZ | Splash"
# Simple configuration
upx: bool = False # Use UPX compression (reduces file size, may increase AV detections)
console: bool = False # True if you'd want to add a console window (useful for debugging)
one_dir: bool = False # True for one-dir, False for one-file
optimize: int | None = None # -1/None/0=none, 1=remove asserts, 2=also remove docstrings
app_name: str = "Twitch Drops Miner (by DevilXD)"
# (source_path, dest_path, required)
to_add: list[tuple[Path, str, bool]] = [
# icon files
(Path("icons/pickaxe.ico"), "./icons", True),
(Path("icons/active.ico"), "./icons", True),
(Path("icons/idle.ico"), "./icons", True),
(Path("icons/error.ico"), "./icons", True),
(Path("icons/maint.ico"), "./icons", True),
# SeleniumWire HTTPS/SSL cert file and key
(Path(SITE_PACKAGES_PATH, "seleniumwire/ca.crt"), "./seleniumwire", False),
(Path(SITE_PACKAGES_PATH, "seleniumwire/ca.key"), "./seleniumwire", False),
]
for lang_filepath in WORKING_DIR.joinpath("lang").glob("*.json"):
if lang_filepath.stem != DEFAULT_LANG:
to_add.append((lang_filepath, "lang", True))
# Ensure the required to-be-added data exists
datas: list[tuple[Path, str]] = []
for source_path, dest_path, required in to_add:
if source_path.exists():
datas.append((source_path, dest_path))
elif required:
raise FileNotFoundError(str(source_path))
hooksconfig: dict[str, Any] = {}
binaries: list[tuple[Path, str]] = []
hiddenimports: list[str] = [
"PIL._tkinter_finder",
"setuptools._distutils.log",
"setuptools._distutils.dir_util",
"setuptools._distutils.file_util",
"setuptools._distutils.archive_util",
]
if sys.platform == "linux":
# Needed files for better system tray support on Linux via pystray (AppIndicator backend).
arch: str = platform.machine()
libraries_path: Path = Path(f"/usr/lib/{arch}-linux-gnu")
if not libraries_path.exists():
libraries_path = Path("/usr/lib64")
datas.append(
(libraries_path / "girepository-1.0/AyatanaAppIndicator3-0.1.typelib", "gi_typelibs")
)
binaries.append((libraries_path / "libayatana-appindicator3.so.1", "."))
hiddenimports.extend([
"gi.repository.Gtk",
"gi.repository.GObject",
])
hooksconfig = {
"gi": {
"icons": [],
"themes": [],
"languages": ["en_US"]
}
}
a = Analysis(
["main.py"],
datas=datas,
binaries=binaries,
hooksconfig=hooksconfig,
hiddenimports=hiddenimports,
)
# Exclude unneeded Linux libraries (supports globbing)
excluded_binaries = [
"libicudata.so.*",
"libicuuc.so.*",
"librsvg-*.so.*"
]
a.binaries = [
b for b in a.binaries
if not any(fnmatch.fnmatch(b[0], pattern) for pattern in excluded_binaries)
]
if one_dir:
exe_args: PYZTypeEXE = tuple()
collect_args: PYZTypeCOLLECT = (a.datas, a.binaries)
else:
exe_args = (a.datas, a.binaries)
collect_args = tuple()
pyz = PYZ(a.pure)
try:
exe = EXE(
pyz,
a.scripts,
*exe_args,
upx=upx,
debug=False,
name=app_name,
console=console,
optimize=optimize,
exclude_binaries=one_dir,
icon="icons/pickaxe.ico",
)
except PermissionError as exc:
exc_text: str = format_exc()
if any(t in exc_text for t in ("os.remove", "os.unlink")):
raise PermissionError("Ensure the executable isn't running when rebuilding.") from exc
raise
if one_dir:
coll = COLLECT(
exe,
*collect_args,
upx=upx,
name=app_name,
)

View File

@@ -9,6 +9,7 @@ if __name__ == "__main__":
import argparse
import asyncio
import logging
from logging.handlers import TimedRotatingFileHandler
import signal
import sys
import traceback
@@ -85,7 +86,6 @@ if __name__ == "__main__":
)
parser.add_argument("--version", action="version", version=f"v{__version__}")
parser.add_argument("-v", dest="_verbose", action="count", default=0)
parser.add_argument("--log", action="store_true")
parser.add_argument("--dump", action="store_true")
# undocumented debug args
parser.add_argument(
@@ -106,7 +106,6 @@ if __name__ == "__main__":
sys.exit(4)
# client run
logger.debug("Defining main async function")
async def main():
# set language
from contextlib import suppress
@@ -124,21 +123,14 @@ if __name__ == "__main__":
# Generate timestamped log filename: TDM.YYYY-MM-DDTHH-MM-SS.log
timestamp = datetime.now().isoformat(timespec='seconds').replace(':', '-')
log_file = logs_dir / f"TDM.{timestamp}.log"
log_file = logs_dir / f"TDM.log"
# Add file handler for timestamped log
file_handler = logging.FileHandler(log_file)
file_handler = TimedRotatingFileHandler(log_file, when="midnight", backupCount=5)
file_handler.setFormatter(FILE_FORMATTER)
logger.addHandler(file_handler)
logger.info(f"Logging to file: {log_file}")
# Keep old log.txt for backward compatibility if --log flag is used
if settings.log:
legacy_handler = logging.FileHandler(LOG_PATH)
legacy_handler.setFormatter(FILE_FORMATTER)
logger.addHandler(legacy_handler)
logger.info(f"Legacy log file: {LOG_PATH}")
logging.getLogger("TwitchDrops.gql").setLevel(settings.debug_gql)
logging.getLogger("TwitchDrops.websocket").setLevel(settings.debug_ws)

View File

@@ -282,33 +282,33 @@ class Twitch:
logger.debug("inventories: %s", self.inventory)
# Log detailed game -> campaigns -> channels mapping
logger.info("=== Active Campaigns Mapping ===")
from collections import defaultdict
game_campaign_map: dict[str, list[tuple[DropsCampaign, list[str]]]] = defaultdict(list)
for campaign in self.inventory:
if campaign.eligible and not campaign.finished:
logger.info("eligible Campaign: %s - %s", campaign.name, campaign.game.name)
if campaign.can_earn_within(next_hour):
channel_names = []
if campaign.allowed_channels:
channel_names = [ch.name for ch in campaign.allowed_channels]
else:
channel_names = ["<directory>"]
game_campaign_map[campaign.game.name].append((campaign, channel_names))
for game_name in sorted(game_campaign_map.keys()):
logger.info(f"Game: {game_name}")
for campaign, channel_list in game_campaign_map[game_name]:
status_info = f"{'ACTIVE' if campaign.active else 'UPCOMING'}"
ends_info = campaign.ends_at.astimezone().strftime('%Y-%m-%d %H:%M')
channel_info = f"{len(channel_list)} channels" if channel_list[0] != "<directory>" else "directory"
logger.info(f" └─ Campaign: {campaign.name} [{status_info}] (ends: {ends_info})")
logger.info(f" Channels: {channel_info}")
if channel_list[0] != "<directory>" and len(channel_list) <= 10:
logger.info(f" └─ {', '.join(channel_list)}")
elif channel_list[0] != "<directory>":
logger.info(f" └─ {', '.join(channel_list[:10])} ... (+{len(channel_list)-10} more)")
logger.info("=== End Campaigns Mapping ===")
if logger.isEnabledFor(logging.DEBUG):
logger.info("=== Active Campaigns Mapping ===")
from collections import defaultdict
game_campaign_map: dict[str, list[tuple[DropsCampaign, list[str]]]] = defaultdict(list)
for campaign in self.inventory:
if campaign.eligible and not campaign.finished:
logger.info("eligible Campaign: %s - %s", campaign.name, campaign.game.name)
if campaign.can_earn_within(next_hour):
channel_names = []
if campaign.allowed_channels:
channel_names = [ch.name for ch in campaign.allowed_channels]
else:
channel_names = ["<directory>"]
game_campaign_map[campaign.game.name].append((campaign, channel_names))
for game_name in sorted(game_campaign_map.keys()):
logger.debug(f"Game: {game_name}")
for campaign, channel_list in game_campaign_map[game_name]:
status_info = f"{'ACTIVE' if campaign.active else 'UPCOMING'}"
ends_info = campaign.ends_at.astimezone().strftime('%Y-%m-%d %H:%M')
channel_info = f"{len(channel_list)} channels" if channel_list[0] != "<directory>" else "directory"
logger.debug(f" └─ Campaign: {campaign.name} [{status_info}] (ends: {ends_info})")
logger.debug(f" Channels: {channel_info}")
if channel_list[0] != "<directory>" and len(channel_list) <= 10:
logger.debug(f" └─ {', '.join(channel_list)}")
elif channel_list[0] != "<directory>":
logger.debug(f" └─ {', '.join(channel_list[:10])} ... (+{len(channel_list)-10} more)")
logger.info("=== End Campaigns Mapping ===")
# Build wanted_games list preserving the order from games_to_watch
for game_name in games_to_watch:

View File

@@ -192,7 +192,7 @@ class Websocket:
self.set_status(_("gui", "websocket", "initializing"))
await self._twitch.wait_until_login()
self.set_status(_("gui", "websocket", "connecting"))
ws_logger.info(f"Websocket[{self._idx}] connecting...")
ws_logger.debug(f"Websocket[{self._idx}] connecting...")
self._closed.clear()
# Connect/Reconnect loop
async for websocket in self._backoff_connect(
@@ -203,7 +203,7 @@ class Websocket:
# NOTE: _topics_changed doesn't start set,
# because there's no initial topics we can sub to right away
self.set_status(_("gui", "websocket", "connected"))
ws_logger.info(f"Websocket[{self._idx}] connected.")
ws_logger.debug(f"Websocket[{self._idx}] connected.")
try:
try:
while not self._reconnect_requested.is_set():
@@ -224,7 +224,7 @@ class Websocket:
)
elif self._closed.is_set():
# we closed it - exit
ws_logger.info(f"Websocket[{self._idx}] stopped.")
ws_logger.debug(f"Websocket[{self._idx}] stopped.")
self.set_status(_("gui", "websocket", "disconnected"))
return
except Exception: