mirror of
https://github.com/rangermix/TwitchDropsMiner.git
synced 2026-05-26 15:13:32 +00:00
130 lines
4.7 KiB
Python
130 lines
4.7 KiB
Python
from __future__ import annotations
|
|
|
|
import asyncio
|
|
from datetime import datetime, timedelta, timezone
|
|
|
|
import io
|
|
import json
|
|
from typing import Dict, TypedDict, NewType, TYPE_CHECKING
|
|
|
|
from utils import json_load, json_save
|
|
from constants import URLType, CACHE_PATH, CACHE_DB
|
|
|
|
from PIL import Image as Image_module
|
|
from PIL.ImageTk import PhotoImage
|
|
|
|
|
|
if TYPE_CHECKING:
|
|
from gui import GUIManager
|
|
from PIL.Image import Image
|
|
from typing_extensions import TypeAlias
|
|
|
|
|
|
ImageHash = NewType("ImageHash", str)
|
|
ImageSize: TypeAlias = "tuple[int, int]"
|
|
|
|
|
|
class ExpiringHash(TypedDict):
|
|
hash: ImageHash
|
|
expires: datetime
|
|
|
|
|
|
Hashes = Dict[URLType, ExpiringHash]
|
|
default_database: Hashes = {}
|
|
|
|
|
|
class ImageCache:
|
|
LIFETIME = timedelta(days=7)
|
|
|
|
def __init__(self, manager: GUIManager) -> None:
|
|
self._root = manager._root
|
|
self._twitch = manager._twitch
|
|
cleanup: bool = False
|
|
CACHE_PATH.mkdir(parents=True, exist_ok=True)
|
|
try:
|
|
self._hashes: Hashes = json_load(CACHE_DB, default_database, merge=False)
|
|
except json.JSONDecodeError:
|
|
# if we can't load the mapping file, delete all existing files,
|
|
# then reinitialize the image cache anew
|
|
cleanup = True
|
|
self._hashes = default_database.copy()
|
|
self._images: dict[ImageHash, Image] = {}
|
|
self._photos: dict[tuple[ImageHash, ImageSize], PhotoImage] = {}
|
|
self._lock = asyncio.Lock()
|
|
self._altered: bool = False
|
|
# cleanup the URLs
|
|
hash_counts: dict[ImageHash, int] = {}
|
|
now = datetime.now(timezone.utc)
|
|
for url, hash_dict in list(self._hashes.items()):
|
|
img_hash = hash_dict["hash"]
|
|
if img_hash not in hash_counts:
|
|
hash_counts[img_hash] = 0
|
|
if now >= hash_dict["expires"]:
|
|
del self._hashes[url]
|
|
self._altered = True
|
|
else:
|
|
hash_counts[img_hash] += 1
|
|
for img_hash, count in hash_counts.items():
|
|
if count == 0:
|
|
# hashes come with an extension already
|
|
CACHE_PATH.joinpath(img_hash).unlink(missing_ok=True)
|
|
# NOTE: The hashes are deleted from self._hashes above
|
|
if cleanup:
|
|
# This cleanups the cache folder from unused PNG files
|
|
orphans = [
|
|
file.name for file in CACHE_PATH.glob("*.png") if file.name not in hash_counts
|
|
]
|
|
for filename in orphans:
|
|
CACHE_PATH.joinpath(filename).unlink(missing_ok=True)
|
|
|
|
def save(self, *, force: bool = False) -> None:
|
|
if self._altered or force:
|
|
json_save(CACHE_DB, self._hashes, sort=True)
|
|
|
|
def _new_expires(self) -> datetime:
|
|
return datetime.now(timezone.utc) + self.LIFETIME
|
|
|
|
def _hash(self, image: Image) -> ImageHash:
|
|
pixel_data = list(
|
|
image.resize((10, 10), Image_module.Resampling.LANCZOS).convert('L').getdata()
|
|
)
|
|
avg_pixel = sum(pixel_data) / len(pixel_data)
|
|
bits = ''.join('1' if px >= avg_pixel else '0' for px in pixel_data)
|
|
return ImageHash(f"{int(bits, 2):x}.png")
|
|
|
|
async def get(self, url: URLType, size: ImageSize | None = None) -> PhotoImage:
|
|
async with self._lock:
|
|
image: Image | None = None
|
|
if url in self._hashes:
|
|
img_hash = self._hashes[url]["hash"]
|
|
self._hashes[url]["expires"] = self._new_expires()
|
|
if img_hash in self._images:
|
|
image = self._images[img_hash]
|
|
else:
|
|
try:
|
|
self._images[img_hash] = image = Image_module.open(CACHE_PATH / img_hash)
|
|
except FileNotFoundError:
|
|
pass
|
|
if image is None:
|
|
async with self._twitch.request("GET", url) as response:
|
|
image = Image_module.open(io.BytesIO(await response.read()))
|
|
img_hash = self._hash(image)
|
|
self._images[img_hash] = image
|
|
image.save(CACHE_PATH / img_hash)
|
|
self._hashes[url] = {
|
|
"hash": img_hash,
|
|
"expires": self._new_expires()
|
|
}
|
|
# NOTE: If self._hashes ever stops being updated in both above if cases,
|
|
# this will need to be moved
|
|
self._altered = True
|
|
if size is None:
|
|
size = image.size
|
|
photo_key = (img_hash, size)
|
|
if photo_key in self._photos:
|
|
return self._photos[photo_key]
|
|
if image.size != size:
|
|
image = image.resize(size, Image_module.Palette.ADAPTIVE)
|
|
self._photos[photo_key] = photo = PhotoImage(master=self._root, image=image)
|
|
return photo
|