diff --git a/build.spec b/build.spec index a42dd53..75d72d6 100644 --- a/build.spec +++ b/build.spec @@ -31,7 +31,12 @@ a = Analysis( hookspath=[], hooksconfig={}, noarchive=False, - hiddenimports=[], + hiddenimports=[ + "setuptools._distutils.log", + "setuptools._distutils.dir_util", + "setuptools._distutils.file_util", + "setuptools._distutils.archive_util", + ], runtime_hooks=[], cipher=block_cipher, win_no_prefer_redirects=False, diff --git a/gui.py b/gui.py index 8e605f5..620851c 100644 --- a/gui.py +++ b/gui.py @@ -1,18 +1,21 @@ from __future__ import annotations +import re import sys +import json import ctypes import asyncio import logging import webbrowser import tkinter as tk +from collections import abc from math import log10, ceil -from collections import abc, namedtuple +from subprocess import CREATE_NO_WINDOW from tkinter.font import Font, nametofont from functools import partial, cached_property from datetime import datetime, timedelta, timezone -from typing import Any, TypedDict, NoReturn, Generic, TYPE_CHECKING from tkinter import Tk, ttk, StringVar, DoubleVar, IntVar, PhotoImage +from typing import Any, Union, Tuple, TypedDict, NoReturn, Generic, TYPE_CHECKING import pystray import win32api @@ -20,13 +23,18 @@ import win32con import win32gui from yarl import URL from PIL import Image as Image_module +from undetected_chromedriver import ChromeOptions +from seleniumwire.undetected_chromedriver.v2 import Chrome +from selenium.common.exceptions import WebDriverException, NoSuchWindowException from translate import _ from cache import ImageCache -from exceptions import ExitRequest from utils import resource_path, Game, _T from registry import RegistryKey, ValueType -from constants import SELF_PATH, FORMATTER, WS_TOPICS_LIMIT, MAX_WEBSOCKETS, WINDOW_TITLE, State +from exceptions import MinerException, ExitRequest, LoginException +from constants import ( + CLIENT_ID, SELF_PATH, FORMATTER, WS_TOPICS_LIMIT, MAX_WEBSOCKETS, WINDOW_TITLE, State +) if TYPE_CHECKING: from twitch import Twitch @@ -35,9 +43,15 @@ if TYPE_CHECKING: from inventory import DropsCampaign, TimedDrop +TK_PADDING = Union[int, Tuple[int, int], Tuple[int, int, int], Tuple[int, int, int, int]] digits = ceil(log10(WS_TOPICS_LIMIT)) +###################### +# GUI ELEMENTS START # +###################### + + class _TKOutputHandler(logging.Handler): def __init__(self, output: GUIManager): super().__init__() @@ -161,7 +175,7 @@ class PlaceholderCombobox(PlaceholderEntry, ttk.Combobox): class PaddedListbox(tk.Listbox): - def __init__(self, master: ttk.Widget, *args, padding: tk._Padding = (0, 0, 0, 0), **kwargs): + def __init__(self, master: ttk.Widget, *args, padding: TK_PADDING = (0, 0, 0, 0), **kwargs): # we place the listbox inside a frame with the same background # this means we need to forward the 'grid' method to the frame, not the listbox self._frame = tk.Frame(master) @@ -210,7 +224,7 @@ class PaddedListbox(tk.Listbox): self._frame.configure(frame_options) # update padding if "padding" in options: - padding: tk._Padding = options.pop("padding") + padding: TK_PADDING = options.pop("padding") padx1: tk._ScreenUnits padx2: tk._ScreenUnits pady1: tk._ScreenUnits @@ -221,9 +235,9 @@ class PaddedListbox(tk.Listbox): padx1 = padx2 = pady1 = pady2 = padding elif len(padding) == 2: padx1 = padx2 = padding[0] - pady1 = pady2 = padding[1] # type: ignore + pady1 = pady2 = padding[1] elif len(padding) == 3: - padx1, padx2 = padding[0:2] # type: ignore + padx1, padx2 = padding[0], padding[1] pady1 = pady2 = padding[2] # type: ignore else: padx1, padx2, pady1, pady2 = padding # type: ignore @@ -309,7 +323,9 @@ class SelectMenu(tk.Menubutton, Generic[_T]): return self._menu_options[self.cget("text")] -# GUI ELEMENTS END / GUI DEFINITION START +########################################### +# GUI ELEMENTS END / GUI DEFINITION START # +########################################### class StatusBar: @@ -395,9 +411,6 @@ class WebsocketStatus: self._topics_var.set('\n'.join(topic_lines)) -LoginData = namedtuple("LoginData", ["username", "password", "token"]) - - class LoginForm: def __init__(self, manager: GUIManager, master: ttk.Widget): self._manager = manager @@ -409,14 +422,14 @@ class LoginForm: frame.rowconfigure(4, weight=1) ttk.Label(frame, text=_("gui", "login", "labels")).grid(column=0, row=0) ttk.Label(frame, textvariable=self._var, justify="center").grid(column=1, row=0) - self._login_entry = PlaceholderEntry(frame, placeholder=_("gui", "login", "username")) - self._login_entry.grid(column=0, row=1, columnspan=2) - self._pass_entry = PlaceholderEntry( - frame, placeholder=_("gui", "login", "password"), show='•' - ) - self._pass_entry.grid(column=0, row=2, columnspan=2) - self._token_entry = PlaceholderEntry(frame, placeholder=_("gui", "login", "twofa_code")) - self._token_entry.grid(column=0, row=3, columnspan=2) + # self._login_entry = PlaceholderEntry(frame, placeholder=_("gui", "login", "username")) + # self._login_entry.grid(column=0, row=1, columnspan=2) + # self._pass_entry = PlaceholderEntry( + # frame, placeholder=_("gui", "login", "password"), show='•' + # ) + # self._pass_entry.grid(column=0, row=2, columnspan=2) + # self._token_entry = PlaceholderEntry(frame, placeholder=_("gui", "login", "twofa_code")) + # self._token_entry.grid(column=0, row=3, columnspan=2) self._confirm = asyncio.Event() self._button = ttk.Button( frame, text=_("gui", "login", "button"), command=self._confirm.set, state="disabled" @@ -424,31 +437,130 @@ class LoginForm: self._button.grid(column=0, row=4, columnspan=2) self.update(_("gui", "login", "logged_out"), None) - def clear(self, login: bool = False, password: bool = False, token: bool = False): - clear_all = not login and not password and not token - if login or clear_all: - self._login_entry.clear() - if password or clear_all: - self._pass_entry.clear() - if token or clear_all: - self._token_entry.clear() + @staticmethod + def interceptor(request) -> None: + if ( + request.method == "POST" + and request.url == "https://passport.twitch.tv/protected_login" + ): + body = request.body.decode("utf-8") + data = json.loads(body) + data["client_id"] = CLIENT_ID + request.body = json.dumps(data).encode("utf-8") + del request.headers["Content-Length"] + request.headers["Content-Length"] = str(len(request.body)) - async def ask_login(self) -> LoginData: + async def ask_login(self) -> str: self.update(_("gui", "login", "required"), None) self._manager.print(_("gui", "login", "request")) self._confirm.clear() - self._button.config(state="normal") - # NOTE: we need this to allow for the closing window event to break the waiting here - done, pending = await asyncio.wait( - [self._confirm.wait(), self._manager.wait_until_closed()], - return_when=asyncio.FIRST_COMPLETED, - ) - for task in pending: - task.cancel() - if self._manager.close_requested: - raise ExitRequest() - self._button.config(state="disabled") - return LoginData(self._login_entry.get(), self._pass_entry.get(), self._token_entry.get()) + + # open the chrome browser on the Twitch's login page + # use a separate executor to void blocking the event loop + loop = asyncio.get_running_loop() + driver = None + try: + version_main = None + for attempt in range(2): + options = ChromeOptions() + options.add_argument("--log-level=3") + options.add_argument("--disable-web-security") + options.add_argument("--allow-running-insecure-content") + options.add_argument("--lang=en") + options.add_argument("--no-sandbox") + options.add_argument("--disable-gpu") + try: + driver_coro = loop.run_in_executor( + None, + lambda: Chrome( + options=options, + # use_subprocess=True, + suppress_welcome=True, + version_main=version_main, + service_creationflags=CREATE_NO_WINDOW, + ) + ) + done, pending = await asyncio.wait( + [driver_coro, self._manager.wait_until_closed()], + return_when=asyncio.FIRST_COMPLETED + ) + for task in pending: + task.cancel() + if self._manager.close_requested: + raise ExitRequest() + driver = next(iter(done)).result() + break + except WebDriverException as exc: + message = exc.msg + if ( + message is not None + and (match := re.search( + r'Chrome version ([\d]+)\nCurrent browser version is ((\d+)\.[\d.]+)', + message, + )) is not None + ): + if not attempt: + version_main = int(match.group(3)) + continue + else: + raise MinerException( + "Your Chrome browser is out of date\n" + f"Required version: {match.group(1)}\n" + f"Current version: {match.group(2)}" + ) from None + raise MinerException( + "An error occured while boostrapping the Chrome browser" + ) from exc + assert driver is not None + driver.request_interceptor = self.interceptor + page_coro = loop.run_in_executor(None, driver.get, "https://twitch.tv/login") + done, pending = await asyncio.wait( + [page_coro, self._manager.wait_until_closed()], + return_when=asyncio.FIRST_COMPLETED + ) + for task in pending: + task.cancel() + if self._manager.close_requested: + raise ExitRequest() + # enable the login button once the page finishes opening + self._button.config(state="normal") + + # auto login + # driver.find_element("id", "login-username").send_keys(username) + # driver.find_element("id", "password-input").send_keys(password) + # driver.find_element( + # "css selector", '[data-a-target="passport-login-button"]' + # ).click() + # token submit button css selectors + # Button: "screen="two_factor" target="submit_button" + # Input: + + # wait for the user to navigate away from the URL, indicating successful login + # alternatively, they can press on the login button + while ( + driver.current_url != "https://www.twitch.tv/?no-reload=true" + and not self._confirm.is_set() + ): + if self._manager.close_requested: + raise ExitRequest() + await asyncio.sleep(0.5) + + cookies = driver.get_cookies() + for cookie in cookies: + if cookie["name"] == "auth-token": + auth_token = cookie["value"] + break + else: + raise LoginException("Unable to extract authorization token") + except NoSuchWindowException: + driver = None + raise LoginException("Chrome window was closed, reopening...") + finally: + self._button.config(state="disabled") + if driver is not None: + driver.quit() + return auth_token def update(self, status: str, user_id: int | None): if user_id is not None: @@ -1643,6 +1755,11 @@ class HelpTab: ).grid(sticky="nsew") +########################################## +# GUI DEFINITION END / GUI MANAGER START # +########################################## + + class GUIManager: def __init__(self, twitch: Twitch): self._twitch: Twitch = twitch @@ -1887,6 +2004,11 @@ class GUIManager: self.output.print(*args, **kwargs) +################### +# GUI MANAGER END # +################### + + if __name__ == "__main__": # Everything below is for debug purposes only import aiohttp diff --git a/main.py b/main.py index 59b9948..bedba07 100644 --- a/main.py +++ b/main.py @@ -1,198 +1,204 @@ from __future__ import annotations -import io -import sys -import signal -import asyncio -import logging -import argparse -import traceback -import tkinter as tk -from tkinter import messagebox -from typing import IO, NoReturn +# import an additional thing for proper PyInstaller freeze support +from multiprocessing import freeze_support -# pre-import 3rd party libraries, handling missing ones with an exception -try: - import aiohttp # noqa -except ModuleNotFoundError as exc: - raise ImportError("You have to run 'pip install aiohttp' first") from exc -try: - import pystray # noqa -except ModuleNotFoundError as exc: - raise ImportError("You have to run 'pip install pystray' first") from exc -try: - import PIL # noqa -except ModuleNotFoundError as exc: - raise ImportError("You have to run 'pip install pillow' first") from exc -try: - import win32gui # noqa -except ModuleNotFoundError as exc: - raise ImportError("You have to run 'pip install pywin32' first") from exc +if __name__ == "__main__": + freeze_support() + import io + import sys + import signal + import asyncio + import logging + import argparse + import traceback + import tkinter as tk + from tkinter import messagebox + from typing import IO, NoReturn -from translate import _ -from twitch import Twitch -from settings import Settings -from version import __version__ -from exceptions import CaptchaRequired -from constants import SELF_PATH, FORMATTER, LOG_PATH, WINDOW_TITLE + # pre-import 3rd party libraries, handling missing ones with an exception + try: + import aiohttp # noqa + except ModuleNotFoundError as exc: + raise ImportError("You have to run 'pip install aiohttp' first") from exc + try: + import pystray # noqa + except ModuleNotFoundError as exc: + raise ImportError("You have to run 'pip install pystray' first") from exc + try: + import PIL # noqa + except ModuleNotFoundError as exc: + raise ImportError("You have to run 'pip install pillow' first") from exc + try: + import win32gui # noqa + except ModuleNotFoundError as exc: + raise ImportError("You have to run 'pip install pywin32' first") from exc + from translate import _ + from twitch import Twitch + from settings import Settings + from version import __version__ + from exceptions import CaptchaRequired + from constants import SELF_PATH, FORMATTER, LOG_PATH, WINDOW_TITLE -class Parser(argparse.ArgumentParser): - def __init__(self, *args, **kwargs) -> None: - super().__init__(*args, **kwargs) - self._message: io.StringIO = io.StringIO() + class Parser(argparse.ArgumentParser): + def __init__(self, *args, **kwargs) -> None: + super().__init__(*args, **kwargs) + self._message: io.StringIO = io.StringIO() - def _print_message(self, message: str, file: IO[str] | None = None) -> None: - self._message.write(message) - # print(message, file=self._message) + def _print_message(self, message: str, file: IO[str] | None = None) -> None: + self._message.write(message) + # print(message, file=self._message) - def exit(self, status: int = 0, message: str | None = None) -> NoReturn: - try: - super().exit(status, message) # sys.exit(2) - finally: - messagebox.showerror("Argument Parser Error", self._message.getvalue()) + def exit(self, status: int = 0, message: str | None = None) -> NoReturn: + try: + super().exit(status, message) # sys.exit(2) + finally: + messagebox.showerror("Argument Parser Error", self._message.getvalue()) + class ParsedArgs(argparse.Namespace): + _verbose: int + _debug_ws: bool + _debug_gql: bool + log: bool + tray: bool + no_run_check: bool -class ParsedArgs(argparse.Namespace): - _verbose: int - _debug_ws: bool - _debug_gql: bool - log: bool - tray: bool - no_run_check: bool + # TODO: replace int with union of literal values once typeshed updates + @property + def logging_level(self) -> int: + return { + 0: logging.ERROR, + 1: logging.WARNING, + 2: logging.INFO, + 3: logging.DEBUG, + }[min(self._verbose, 3)] - # TODO: replace int with union of literal values once typeshed updates - @property - def logging_level(self) -> int: - return { - 0: logging.ERROR, - 1: logging.WARNING, - 2: logging.INFO, - 3: logging.DEBUG, - }[min(self._verbose, 3)] + @property + def debug_ws(self) -> int: + """ + If the debug flag is True, return DEBUG. + If the main logging level is DEBUG, return INFO to avoid seeing raw messages. + Otherwise, return NOTSET to inherit the global logging level. + """ + if self._debug_ws: + return logging.DEBUG + elif self._verbose >= 3: + return logging.INFO + return logging.NOTSET - @property - def debug_ws(self) -> int: - """ - If the debug flag is True, return DEBUG. - If the main logging level is DEBUG, return INFO to avoid seeing raw messages. - Otherwise, return NOTSET to inherit the global logging level. - """ - if self._debug_ws: - return logging.DEBUG - elif self._verbose >= 3: - return logging.INFO - return logging.NOTSET + @property + def debug_gql(self) -> int: + if self._debug_gql: + return logging.DEBUG + elif self._verbose >= 3: + return logging.INFO + return logging.NOTSET - @property - def debug_gql(self) -> int: - if self._debug_gql: - return logging.DEBUG - elif self._verbose >= 3: - return logging.INFO - return logging.NOTSET - - -# handle input parameters -# NOTE: parser output is shown via message box -# we also need a dummy invisible window for the parser -root = tk.Tk() -root.overrideredirect(True) -root.withdraw() -root.update() -parser = Parser( - SELF_PATH.name, - description="A program that allows you to mine timed drops on Twitch.", -) -parser.add_argument("--version", action="version", version=f"v{__version__}") -parser.add_argument("-v", dest="_verbose", action="count", default=0) -parser.add_argument("--tray", action="store_true") -parser.add_argument("--log", action="store_true") -# undocumented debug args -parser.add_argument( - "--no-run-check", dest="no_run_check", action="store_true", help=argparse.SUPPRESS -) -parser.add_argument("--debug-ws", dest="_debug_ws", action="store_true", help=argparse.SUPPRESS) -parser.add_argument("--debug-gql", dest="_debug_gql", action="store_true", help=argparse.SUPPRESS) -args = parser.parse_args(namespace=ParsedArgs()) -# load settings -try: - settings = Settings(args) -except Exception: - messagebox.showerror( - "Settings error", - f"There was an error while loading the settings file:\n\n{traceback.format_exc()}" + # handle input parameters + # NOTE: parser output is shown via message box + # we also need a dummy invisible window for the parser + root = tk.Tk() + root.overrideredirect(True) + root.withdraw() + root.update() + parser = Parser( + SELF_PATH.name, + description="A program that allows you to mine timed drops on Twitch.", ) - sys.exit(4) -# dummy window isn't needed anymore -root.destroy() + parser.add_argument("--version", action="version", version=f"v{__version__}") + parser.add_argument("-v", dest="_verbose", action="count", default=0) + parser.add_argument("--tray", action="store_true") + parser.add_argument("--log", action="store_true") + # undocumented debug args + parser.add_argument( + "--no-run-check", dest="no_run_check", action="store_true", help=argparse.SUPPRESS + ) + parser.add_argument( + "--debug-ws", dest="_debug_ws", action="store_true", help=argparse.SUPPRESS + ) + parser.add_argument( + "--debug-gql", dest="_debug_gql", action="store_true", help=argparse.SUPPRESS + ) + args = parser.parse_args(namespace=ParsedArgs()) + # load settings + try: + settings = Settings(args) + except Exception: + messagebox.showerror( + "Settings error", + f"There was an error while loading the settings file:\n\n{traceback.format_exc()}" + ) + sys.exit(4) + # dummy window isn't needed anymore + root.destroy() -# check if we're not already running -try: - exists = win32gui.FindWindow(None, WINDOW_TITLE) -except AttributeError: - # we're not on Windows - continue - exists = False -if exists and not settings.no_run_check: - # already running - exit - sys.exit(3) + # check if we're not already running + try: + exists = win32gui.FindWindow(None, WINDOW_TITLE) + except AttributeError: + # we're not on Windows - continue + exists = False + if exists and not settings.no_run_check: + # already running - exit + sys.exit(3) -# set language -try: - _.set_language(settings.language) -except ValueError: - # this language doesn't exist - stick to English - pass + # set language + try: + _.set_language(settings.language) + except ValueError: + # this language doesn't exist - stick to English + pass -# handle logging stuff -if settings.logging_level > logging.DEBUG: - # redirect the root logger into a NullHandler, effectively ignoring all logging calls - # that aren't ours. This always runs, unless the main logging level is DEBUG or lower. - logging.getLogger().addHandler(logging.NullHandler()) -logger = logging.getLogger("TwitchDrops") -logger.setLevel(settings.logging_level) -if settings.log: - handler = logging.FileHandler(LOG_PATH) - handler.setFormatter(FORMATTER) - logger.addHandler(handler) -logging.getLogger("TwitchDrops.gql").setLevel(settings.debug_gql) -logging.getLogger("TwitchDrops.websocket").setLevel(settings.debug_ws) + # handle logging stuff + if settings.logging_level > logging.DEBUG: + # redirect the root logger into a NullHandler, effectively ignoring all logging calls + # that aren't ours. This always runs, unless the main logging level is DEBUG or lower. + logging.getLogger().addHandler(logging.NullHandler()) + logger = logging.getLogger("TwitchDrops") + logger.setLevel(settings.logging_level) + if settings.log: + handler = logging.FileHandler(LOG_PATH) + handler.setFormatter(FORMATTER) + logger.addHandler(handler) + logging.getLogger("TwitchDrops.gql").setLevel(settings.debug_gql) + logging.getLogger("TwitchDrops.websocket").setLevel(settings.debug_ws) -# client run -exit_status = 0 -loop = asyncio.new_event_loop() -asyncio.set_event_loop(loop) -client = Twitch(settings) -signal.signal(signal.SIGINT, lambda *_: client.gui.close()) -signal.signal(signal.SIGTERM, lambda *_: client.gui.close()) -try: - loop.run_until_complete(client.run()) -except CaptchaRequired: - exit_status = 1 - msg = _("error", "captcha") - client.prevent_close() - client.print(msg) -except Exception: - exit_status = 1 - msg = "Fatal error encountered:\n" - client.prevent_close() - client.print(msg) - client.print(traceback.format_exc()) -finally: - signal.signal(signal.SIGINT, signal.SIG_DFL) - signal.signal(signal.SIGTERM, signal.SIG_DFL) - client.print(_("gui", "status", "exiting")) - loop.run_until_complete(client.shutdown()) -if not client.gui.close_requested: - client.print(_("status", "terminated")) - client.gui.status.update(_("gui", "status", "terminated")) -loop.run_until_complete(client.gui.wait_until_closed()) -# save the application state -# NOTE: we have to do it after wait_until_closed, -# because the user can alter some settings between app termination and closing the window -client.save(force=True) -client.gui.stop() -client.gui.close_window() -loop.run_until_complete(loop.shutdown_asyncgens()) -loop.close() -sys.exit(exit_status) + # client run + exit_status = 0 + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + client = Twitch(settings) + signal.signal(signal.SIGINT, lambda *_: client.gui.close()) + signal.signal(signal.SIGTERM, lambda *_: client.gui.close()) + try: + loop.run_until_complete(client.run()) + except CaptchaRequired: + exit_status = 1 + msg = _("error", "captcha") + client.prevent_close() + client.print(msg) + except Exception: + exit_status = 1 + msg = "Fatal error encountered:\n" + client.prevent_close() + client.print(msg) + client.print(traceback.format_exc()) + finally: + signal.signal(signal.SIGINT, signal.SIG_DFL) + signal.signal(signal.SIGTERM, signal.SIG_DFL) + client.print(_("gui", "status", "exiting")) + loop.run_until_complete(client.shutdown()) + if not client.gui.close_requested: + client.print(_("status", "terminated")) + client.gui.status.update(_("gui", "status", "terminated")) + loop.run_until_complete(client.gui.wait_until_closed()) + # save the application state + # NOTE: we have to do it after wait_until_closed, + # because the user can alter some settings between app termination and closing the window + client.save(force=True) + client.gui.stop() + client.gui.close_window() + loop.run_until_complete(loop.shutdown_asyncgens()) + loop.close() + sys.exit(exit_status) diff --git a/requirements.txt b/requirements.txt index 4eebaed..e2cae20 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,3 +2,6 @@ aiohttp>2.0,<4.0 Pillow pystray pywin32 +selenium-wire +# use a fork that has an extra option of hiding the unneeded Chrome's console cmd window +git+https://github.com/sebdelsol/undetected-chromedriver diff --git a/twitch.py b/twitch.py index 4844b9c..1fc5ec2 100644 --- a/twitch.py +++ b/twitch.py @@ -22,9 +22,7 @@ from gui import GUIManager from channel import Channel from websocket import WebsocketPool from inventory import DropsCampaign -from exceptions import ( - MinerException, LoginException, CaptchaRequired, ExitRequest, ReloadRequest, RequestInvalid -) +from exceptions import MinerException, ExitRequest, LoginException, ReloadRequest, RequestInvalid from utils import ( CHARS_HEX_LOWER, timestamp, @@ -103,89 +101,18 @@ class _AuthState: async def _login(self) -> str: logger.debug("Login flow started") login_form: LoginForm = self._twitch.gui.login - gui_print = self._twitch.gui.print - payload: JsonType = { - "client_id": CLIENT_ID, - "undelete_user": False, - "remember_me": True, - # 'force_twitchguard': False, - } - - while True: - username, password, token = await login_form.ask_login() - payload["username"] = username - payload["password"] = password - # remove stale 2FA tokens, if present - payload.pop("authy_token", None) - payload.pop("twitchguard_code", None) - for attempt in range(2): - async with self._twitch.request( - "POST", "https://passport.twitch.tv/login", json=payload - ) as response: - login_response: JsonType = await response.json() - - # Feed this back in to avoid running into CAPTCHA if possible - if "captcha_proof" in login_response: - payload["captcha"] = {"proof": login_response["captcha_proof"]} - - # Error handling - if "error_code" in login_response: - error_code: int = login_response["error_code"] - logger.debug(f"Login error code: {error_code}") - if error_code in (1000, 5027): - # we've failed bois - session = await self._twitch.get_session() - jar = cast(aiohttp.CookieJar, session.cookie_jar) - assert BASE_URL.host is not None - jar.clear_domain(BASE_URL.host) - logger.debug("Login failed due to CAPTCHA") - raise CaptchaRequired() - elif error_code == 3001: - # wrong password you dummy - logger.debug("Login failed due to incorrect username or password") - gui_print(_("login", "incorrect_login_pass")) - login_form.clear(password=True) - break - elif error_code in ( - 3012, # Invalid authy token - 3023, # Invalid email code - ): - logger.debug("Login failed due to incorrect 2FA code") - if error_code == 3023: - gui_print(_("login", "incorrect_email_code")) - else: - gui_print(_("login", "incorrect_twofa_code")) - login_form.clear(token=True) - break - elif error_code in ( - 3011, # Authy token needed - 3022, # Email code needed - ): - # 2FA handling - logger.debug("2FA token required") - email = error_code == 3022 - if not token: - # user didn't provide a token, so ask them for it - if email: - gui_print(_("login", "email_code_required")) - else: - gui_print(_("login", "twofa_code_required")) - break - if email: - payload["twitchguard_code"] = token - else: - payload["authy_token"] = token - continue - else: - raise LoginException(login_response["error"]) - # Success handling - if "access_token" in login_response: - # we're in bois - self.access_token = cast(str, login_response["access_token"]) - logger.debug("Access token granted") - login_form.clear() - return self.access_token + for attempt in range(1, 6): # 5 attempts + try: + self.access_token = await login_form.ask_login() + break + except LoginException as exc: + if attempt < 5: + self._twitch.gui.print(exc.args[0]) + continue + raise # reraise on and past the 5th attempt + logger.debug("Access token granted") + return self.access_token def gql_headers(self, *, integrity: bool) -> JsonType: headers = {